Visão Geral
O decorador @human_feedback permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
Isso é particularmente valioso para:
- Garantia de qualidade: Revisar conteúdo gerado por IA antes de ser usado downstream
- Portões de decisão: Deixar humanos tomarem decisões críticas em fluxos automatizados
- Fluxos de aprovação: Implementar padrões de aprovar/rejeitar/revisar
- Refinamento interativo: Coletar feedback para melhorar saídas iterativamente
Início Rápido
Aqui está a maneira mais simples de adicionar feedback humano a um flow:
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback
class SimpleReviewFlow(Flow):
@start()
@human_feedback(message="Por favor, revise este conteúdo:")
def generate_content(self):
return "Este é um conteúdo gerado por IA que precisa de revisão."
@listen(generate_content)
def process_feedback(self, result):
print(f"Conteúdo: {result.output}")
print(f"Humano disse: {result.feedback}")
flow = SimpleReviewFlow()
flow.kickoff()
Quando este flow é executado, ele irá:
- Executar
generate_content e retornar a string
- Exibir a saída para o usuário com a mensagem de solicitação
- Aguardar o usuário digitar o feedback (ou pressionar Enter para pular)
- Passar um objeto
HumanFeedbackResult para process_feedback
O Decorador @human_feedback
Parâmetros
| Parâmetro | Tipo | Obrigatório | Descrição |
|---|
message | str | Sim | A mensagem mostrada ao humano junto com a saída do método |
emit | Sequence[str] | Não | Lista de possíveis outcomes. O feedback é mapeado para um destes, que dispara decoradores @listen |
llm | str | BaseLLM | Quando emit especificado | LLM usado para interpretar o feedback e mapear para um outcome |
default_outcome | str | Não | Outcome a usar se nenhum feedback for fornecido. Deve estar em emit |
metadata | dict | Não | Dados adicionais para integrações enterprise |
provider | HumanFeedbackProvider | Não | Provider customizado para feedback assíncrono/não-bloqueante. Veja Feedback Humano Assíncrono |
Uso Básico (Sem Roteamento)
Quando você não especifica emit, o decorador simplesmente coleta o feedback e passa um HumanFeedbackResult para o próximo listener:
@start()
@human_feedback(message="O que você acha desta análise?")
def analyze_data(self):
return "Resultados da análise: Receita aumentou 15%, custos diminuíram 8%"
@listen(analyze_data)
def handle_feedback(self, result):
# result é um HumanFeedbackResult
print(f"Análise: {result.output}")
print(f"Feedback: {result.feedback}")
Quando você especifica emit, o decorador se torna um roteador. O feedback livre do humano é interpretado por um LLM e mapeado para um dos outcomes especificados:
@start()
@human_feedback(
message="Você aprova este conteúdo para publicação?",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def review_content(self):
return "Rascunho do post do blog aqui..."
@listen("approved")
def publish(self, result):
print(f"Publicando! Usuário disse: {result.feedback}")
@listen("rejected")
def discard(self, result):
print(f"Descartando. Motivo: {result.feedback}")
@listen("needs_revision")
def revise(self, result):
print(f"Revisando baseado em: {result.feedback}")
O LLM usa saídas estruturadas (function calling) quando disponível para garantir que a resposta seja um dos seus outcomes especificados. Isso torna o roteamento confiável e previsível.
HumanFeedbackResult
O dataclass HumanFeedbackResult contém todas as informações sobre uma interação de feedback humano:
from crewai.flow.human_feedback import HumanFeedbackResult
@dataclass
class HumanFeedbackResult:
output: Any # A saída original do método mostrada ao humano
feedback: str # O texto bruto do feedback do humano
outcome: str | None # O outcome mapeado (se emit foi especificado)
timestamp: datetime # Quando o feedback foi recebido
method_name: str # Nome do método decorado
metadata: dict # Qualquer metadata passado ao decorador
Acessando em Listeners
Quando um listener é disparado por um método @human_feedback com emit, ele recebe o HumanFeedbackResult:
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
print(f"Saída original: {result.output}")
print(f"Feedback do usuário: {result.feedback}")
print(f"Outcome: {result.outcome}") # "approved"
print(f"Recebido em: {result.timestamp}")
Acessando o Histórico de Feedback
A classe Flow fornece dois atributos para acessar o feedback humano:
last_human_feedback
Retorna o HumanFeedbackResult mais recente:
@listen(some_method)
def check_feedback(self):
if self.last_human_feedback:
print(f"Último feedback: {self.last_human_feedback.feedback}")
human_feedback_history
Uma lista de todos os objetos HumanFeedbackResult coletados durante o flow:
@listen(final_step)
def summarize(self):
print(f"Total de feedbacks coletados: {len(self.human_feedback_history)}")
for i, fb in enumerate(self.human_feedback_history):
print(f"{i+1}. {fb.method_name}: {fb.outcome or 'sem roteamento'}")
Cada HumanFeedbackResult é adicionado a human_feedback_history, então múltiplos passos de feedback não sobrescrevem uns aos outros. Use esta lista para acessar todo o feedback coletado durante o flow.
Exemplo Completo: Fluxo de Aprovação de Conteúdo
Aqui está um exemplo completo implementando um fluxo de revisão e aprovação de conteúdo:
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
from pydantic import BaseModel
class ContentState(BaseModel):
topic: str = ""
draft: str = ""
final_content: str = ""
revision_count: int = 0
class ContentApprovalFlow(Flow[ContentState]):
"""Um flow que gera conteúdo e obtém aprovação humana."""
@start()
def get_topic(self):
self.state.topic = input("Sobre qual tópico devo escrever? ")
return self.state.topic
@listen(get_topic)
def generate_draft(self, topic):
# Em uso real, isso chamaria um LLM
self.state.draft = f"# {topic}\n\nEste é um rascunho sobre {topic}..."
return self.state.draft
@listen(generate_draft)
@human_feedback(
message="Por favor, revise este rascunho. Responda 'approved', 'rejected', ou forneça feedback de revisão:",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def review_draft(self, draft):
return draft
@listen("approved")
def publish_content(self, result: HumanFeedbackResult):
self.state.final_content = result.output
print("\n✅ Conteúdo aprovado e publicado!")
print(f"Comentário do revisor: {result.feedback}")
return "published"
@listen("rejected")
def handle_rejection(self, result: HumanFeedbackResult):
print("\n❌ Conteúdo rejeitado")
print(f"Motivo: {result.feedback}")
return "rejected"
@listen("needs_revision")
def revise_content(self, result: HumanFeedbackResult):
self.state.revision_count += 1
print(f"\n📝 Revisão #{self.state.revision_count} solicitada")
print(f"Feedback: {result.feedback}")
# Em um flow real, você pode voltar para generate_draft
# Para este exemplo, apenas reconhecemos
return "revision_requested"
# Executar o flow
flow = ContentApprovalFlow()
result = flow.kickoff()
print(f"\nFlow concluído. Revisões solicitadas: {flow.state.revision_count}")
O decorador @human_feedback funciona com outros decoradores de flow. Coloque-o como o decorador mais interno (mais próximo da função):
# Correto: @human_feedback é o mais interno (mais próximo da função)
@start()
@human_feedback(message="Revise isto:")
def my_start_method(self):
return "content"
@listen(other_method)
@human_feedback(message="Revise isto também:")
def my_listener(self, data):
return f"processed: {data}"
Coloque @human_feedback como o decorador mais interno (último/mais próximo da função) para que ele envolva o método diretamente e possa capturar o valor de retorno antes de passar para o sistema de flow.
Melhores Práticas
1. Escreva Mensagens de Solicitação Claras
O parâmetro message é o que o humano vê. Torne-o acionável:
# ✅ Bom - claro e acionável
@human_feedback(message="Este resumo captura com precisão os pontos-chave? Responda 'sim' ou explique o que está faltando:")
# ❌ Ruim - vago
@human_feedback(message="Revise isto:")
2. Escolha Outcomes Significativos
Ao usar emit, escolha outcomes que mapeiem naturalmente para respostas humanas:
# ✅ Bom - outcomes em linguagem natural
emit=["approved", "rejected", "needs_more_detail"]
# ❌ Ruim - técnico ou pouco claro
emit=["state_1", "state_2", "state_3"]
3. Sempre Forneça um Outcome Padrão
Use default_outcome para lidar com casos onde usuários pressionam Enter sem digitar:
@human_feedback(
message="Aprovar? (pressione Enter para solicitar revisão)",
emit=["approved", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision", # Padrão seguro
)
4. Use o Histórico de Feedback para Trilhas de Auditoria
Acesse human_feedback_history para criar logs de auditoria:
@listen(final_step)
def create_audit_log(self):
log = []
for fb in self.human_feedback_history:
log.append({
"step": fb.method_name,
"outcome": fb.outcome,
"feedback": fb.feedback,
"timestamp": fb.timestamp.isoformat(),
})
return log
5. Trate Feedback Roteado e Não Roteado
Ao projetar flows, considere se você precisa de roteamento:
| Cenário | Use |
|---|
| Revisão simples, só precisa do texto do feedback | Sem emit |
| Precisa ramificar para caminhos diferentes baseado na resposta | Use emit |
| Portões de aprovação com aprovar/rejeitar/revisar | Use emit |
| Coletando comentários apenas para logging | Sem emit |
Feedback Humano Assíncrono (Não-Bloqueante - Human in the loop)
Por padrão, @human_feedback bloqueia a execução aguardando entrada no console. Para aplicações de produção, você pode precisar de feedback assíncrono/não-bloqueante que se integre com sistemas externos como Slack, email, webhooks ou APIs.
A Abstração de Provider
Use o parâmetro provider para especificar uma estratégia customizada de coleta de feedback:
from crewai.flow import Flow, start, human_feedback, HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
class WebhookProvider(HumanFeedbackProvider):
"""Provider que pausa o flow e aguarda callback de webhook."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
# Notifica sistema externo (ex: envia mensagem Slack, cria ticket)
self.send_notification(context)
# Pausa execução - framework cuida da persistência automaticamente
raise HumanFeedbackPending(
context=context,
callback_info={"webhook_url": f"{self.webhook_url}/{context.flow_id}"}
)
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Revise este conteúdo:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
provider=WebhookProvider("https://myapp.com/api"),
)
def generate_content(self):
return "Conteúdo gerado por IA..."
@listen("approved")
def publish(self, result):
return "Publicado!"
O framework de flow persiste automaticamente o estado quando HumanFeedbackPending é lançado. Seu provider só precisa notificar o sistema externo e lançar a exceção—não são necessárias chamadas manuais de persistência.
Tratando Flows Pausados
Ao usar um provider assíncrono, kickoff() retorna um objeto HumanFeedbackPending em vez de lançar uma exceção:
flow = ReviewFlow()
result = flow.kickoff()
if isinstance(result, HumanFeedbackPending):
# Flow está pausado, estado é automaticamente persistido
print(f"Aguardando feedback em: {result.callback_info['webhook_url']}")
print(f"Flow ID: {result.context.flow_id}")
else:
# Conclusão normal
print(f"Flow concluído: {result}")
Retomando um Flow Pausado
Quando o feedback chega (ex: via webhook), retome o flow:
# Handler síncrono:
def handle_feedback_webhook(flow_id: str, feedback: str):
flow = ReviewFlow.from_pending(flow_id)
result = flow.resume(feedback)
return result
# Handler assíncrono (FastAPI, aiohttp, etc.):
async def handle_feedback_webhook(flow_id: str, feedback: str):
flow = ReviewFlow.from_pending(flow_id)
result = await flow.resume_async(feedback)
return result
Tipos Principais
| Tipo | Descrição |
|---|
HumanFeedbackProvider | Protocolo para providers de feedback customizados |
PendingFeedbackContext | Contém todas as informações necessárias para retomar um flow pausado |
HumanFeedbackPending | Retornado por kickoff() quando o flow está pausado para feedback |
ConsoleProvider | Provider padrão de entrada bloqueante no console |
PendingFeedbackContext
O contexto contém tudo necessário para retomar:
@dataclass
class PendingFeedbackContext:
flow_id: str # Identificador único desta execução de flow
flow_class: str # Nome qualificado completo da classe
method_name: str # Método que disparou o feedback
method_output: Any # Saída mostrada ao humano
message: str # A mensagem de solicitação
emit: list[str] | None # Outcomes possíveis para roteamento
default_outcome: str | None
metadata: dict # Metadata customizado
llm: str | None # LLM para mapeamento de outcome
requested_at: datetime
Exemplo Completo de Flow Assíncrono
from crewai.flow import (
Flow, start, listen, human_feedback,
HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
)
class SlackNotificationProvider(HumanFeedbackProvider):
"""Provider que envia notificações Slack e pausa para feedback assíncrono."""
def __init__(self, channel: str):
self.channel = channel
def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
# Envia notificação Slack (implemente você mesmo)
slack_thread_id = self.post_to_slack(
channel=self.channel,
message=f"Revisão necessária:\n\n{context.method_output}\n\n{context.message}",
)
# Pausa execução - framework cuida da persistência automaticamente
raise HumanFeedbackPending(
context=context,
callback_info={
"slack_channel": self.channel,
"thread_id": slack_thread_id,
}
)
class ContentPipeline(Flow):
@start()
@human_feedback(
message="Aprova este conteúdo para publicação?",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
provider=SlackNotificationProvider("#content-reviews"),
)
def generate_content(self):
return "Conteúdo de blog post gerado por IA..."
@listen("approved")
def publish(self, result):
print(f"Publicando! Revisor disse: {result.feedback}")
return {"status": "published"}
@listen("rejected")
def archive(self, result):
print(f"Arquivado. Motivo: {result.feedback}")
return {"status": "archived"}
@listen("needs_revision")
def queue_revision(self, result):
print(f"Na fila para revisão: {result.feedback}")
return {"status": "revision_needed"}
# Iniciando o flow (vai pausar e aguardar resposta do Slack)
def start_content_pipeline():
flow = ContentPipeline()
result = flow.kickoff()
if isinstance(result, HumanFeedbackPending):
return {"status": "pending", "flow_id": result.context.flow_id}
return result
# Retomando quando webhook do Slack dispara (handler síncrono)
def on_slack_feedback(flow_id: str, slack_message: str):
flow = ContentPipeline.from_pending(flow_id)
result = flow.resume(slack_message)
return result
# Se seu handler é assíncrono (FastAPI, aiohttp, Slack Bolt async, etc.)
async def on_slack_feedback_async(flow_id: str, slack_message: str):
flow = ContentPipeline.from_pending(flow_id)
result = await flow.resume_async(slack_message)
return result
Se você está usando um framework web assíncrono (FastAPI, aiohttp, Slack Bolt modo async), use await flow.resume_async() em vez de flow.resume(). Chamar resume() de dentro de um event loop em execução vai lançar um RuntimeError.
Melhores Práticas para Feedback Assíncrono
- Verifique o tipo de retorno:
kickoff() retorna HumanFeedbackPending quando pausado—não precisa de try/except
- Use o método resume correto: Use
resume() em código síncrono, await resume_async() em código assíncrono
- Armazene informações de callback: Use
callback_info para armazenar URLs de webhook, IDs de tickets, etc.
- Implemente idempotência: Seu handler de resume deve ser idempotente por segurança
- Persistência automática: O estado é automaticamente salvo quando
HumanFeedbackPending é lançado e usa SQLiteFlowPersistence por padrão
- Persistência customizada: Passe uma instância de persistência customizada para
from_pending() se necessário
Documentação Relacionada