Pular para o conteúdo principal

Visão Geral

O decorador @human_feedback permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback. Isso é particularmente valioso para:
  • Garantia de qualidade: Revisar conteúdo gerado por IA antes de ser usado downstream
  • Portões de decisão: Deixar humanos tomarem decisões críticas em fluxos automatizados
  • Fluxos de aprovação: Implementar padrões de aprovar/rejeitar/revisar
  • Refinamento interativo: Coletar feedback para melhorar saídas iterativamente

Início Rápido

Aqui está a maneira mais simples de adicionar feedback humano a um flow:
Code
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback

class SimpleReviewFlow(Flow):
    @start()
    @human_feedback(message="Por favor, revise este conteúdo:")
    def generate_content(self):
        return "Este é um conteúdo gerado por IA que precisa de revisão."

    @listen(generate_content)
    def process_feedback(self, result):
        print(f"Conteúdo: {result.output}")
        print(f"Humano disse: {result.feedback}")

flow = SimpleReviewFlow()
flow.kickoff()
Quando este flow é executado, ele irá:
  1. Executar generate_content e retornar a string
  2. Exibir a saída para o usuário com a mensagem de solicitação
  3. Aguardar o usuário digitar o feedback (ou pressionar Enter para pular)
  4. Passar um objeto HumanFeedbackResult para process_feedback

O Decorador @human_feedback

Parâmetros

ParâmetroTipoObrigatórioDescrição
messagestrSimA mensagem mostrada ao humano junto com a saída do método
emitSequence[str]NãoLista de possíveis outcomes. O feedback é mapeado para um destes, que dispara decoradores @listen
llmstr | BaseLLMQuando emit especificadoLLM usado para interpretar o feedback e mapear para um outcome
default_outcomestrNãoOutcome a usar se nenhum feedback for fornecido. Deve estar em emit
metadatadictNãoDados adicionais para integrações enterprise
providerHumanFeedbackProviderNãoProvider customizado para feedback assíncrono/não-bloqueante. Veja Feedback Humano Assíncrono

Uso Básico (Sem Roteamento)

Quando você não especifica emit, o decorador simplesmente coleta o feedback e passa um HumanFeedbackResult para o próximo listener:
Code
@start()
@human_feedback(message="O que você acha desta análise?")
def analyze_data(self):
    return "Resultados da análise: Receita aumentou 15%, custos diminuíram 8%"

@listen(analyze_data)
def handle_feedback(self, result):
    # result é um HumanFeedbackResult
    print(f"Análise: {result.output}")
    print(f"Feedback: {result.feedback}")

Roteamento com emit

Quando você especifica emit, o decorador se torna um roteador. O feedback livre do humano é interpretado por um LLM e mapeado para um dos outcomes especificados:
Code
@start()
@human_feedback(
    message="Você aprova este conteúdo para publicação?",
    emit=["approved", "rejected", "needs_revision"],
    llm="gpt-4o-mini",
    default_outcome="needs_revision",
)
def review_content(self):
    return "Rascunho do post do blog aqui..."

@listen("approved")
def publish(self, result):
    print(f"Publicando! Usuário disse: {result.feedback}")

@listen("rejected")
def discard(self, result):
    print(f"Descartando. Motivo: {result.feedback}")

@listen("needs_revision")
def revise(self, result):
    print(f"Revisando baseado em: {result.feedback}")
O LLM usa saídas estruturadas (function calling) quando disponível para garantir que a resposta seja um dos seus outcomes especificados. Isso torna o roteamento confiável e previsível.

HumanFeedbackResult

O dataclass HumanFeedbackResult contém todas as informações sobre uma interação de feedback humano:
Code
from crewai.flow.human_feedback import HumanFeedbackResult

@dataclass
class HumanFeedbackResult:
    output: Any              # A saída original do método mostrada ao humano
    feedback: str            # O texto bruto do feedback do humano
    outcome: str | None      # O outcome mapeado (se emit foi especificado)
    timestamp: datetime      # Quando o feedback foi recebido
    method_name: str         # Nome do método decorado
    metadata: dict           # Qualquer metadata passado ao decorador

Acessando em Listeners

Quando um listener é disparado por um método @human_feedback com emit, ele recebe o HumanFeedbackResult:
Code
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
    print(f"Saída original: {result.output}")
    print(f"Feedback do usuário: {result.feedback}")
    print(f"Outcome: {result.outcome}")  # "approved"
    print(f"Recebido em: {result.timestamp}")

Acessando o Histórico de Feedback

A classe Flow fornece dois atributos para acessar o feedback humano:

last_human_feedback

Retorna o HumanFeedbackResult mais recente:
Code
@listen(some_method)
def check_feedback(self):
    if self.last_human_feedback:
        print(f"Último feedback: {self.last_human_feedback.feedback}")

human_feedback_history

Uma lista de todos os objetos HumanFeedbackResult coletados durante o flow:
Code
@listen(final_step)
def summarize(self):
    print(f"Total de feedbacks coletados: {len(self.human_feedback_history)}")
    for i, fb in enumerate(self.human_feedback_history):
        print(f"{i+1}. {fb.method_name}: {fb.outcome or 'sem roteamento'}")
Cada HumanFeedbackResult é adicionado a human_feedback_history, então múltiplos passos de feedback não sobrescrevem uns aos outros. Use esta lista para acessar todo o feedback coletado durante o flow.

Exemplo Completo: Fluxo de Aprovação de Conteúdo

Aqui está um exemplo completo implementando um fluxo de revisão e aprovação de conteúdo:
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
from pydantic import BaseModel


class ContentState(BaseModel):
    topic: str = ""
    draft: str = ""
    final_content: str = ""
    revision_count: int = 0


class ContentApprovalFlow(Flow[ContentState]):
    """Um flow que gera conteúdo e obtém aprovação humana."""

    @start()
    def get_topic(self):
        self.state.topic = input("Sobre qual tópico devo escrever? ")
        return self.state.topic

    @listen(get_topic)
    def generate_draft(self, topic):
        # Em uso real, isso chamaria um LLM
        self.state.draft = f"# {topic}\n\nEste é um rascunho sobre {topic}..."
        return self.state.draft

    @listen(generate_draft)
    @human_feedback(
        message="Por favor, revise este rascunho. Responda 'approved', 'rejected', ou forneça feedback de revisão:",
        emit=["approved", "rejected", "needs_revision"],
        llm="gpt-4o-mini",
        default_outcome="needs_revision",
    )
    def review_draft(self, draft):
        return draft

    @listen("approved")
    def publish_content(self, result: HumanFeedbackResult):
        self.state.final_content = result.output
        print("\n✅ Conteúdo aprovado e publicado!")
        print(f"Comentário do revisor: {result.feedback}")
        return "published"

    @listen("rejected")
    def handle_rejection(self, result: HumanFeedbackResult):
        print("\n❌ Conteúdo rejeitado")
        print(f"Motivo: {result.feedback}")
        return "rejected"

    @listen("needs_revision")
    def revise_content(self, result: HumanFeedbackResult):
        self.state.revision_count += 1
        print(f"\n📝 Revisão #{self.state.revision_count} solicitada")
        print(f"Feedback: {result.feedback}")

        # Em um flow real, você pode voltar para generate_draft
        # Para este exemplo, apenas reconhecemos
        return "revision_requested"


# Executar o flow
flow = ContentApprovalFlow()
result = flow.kickoff()
print(f"\nFlow concluído. Revisões solicitadas: {flow.state.revision_count}")

Combinando com Outros Decoradores

O decorador @human_feedback funciona com outros decoradores de flow. Coloque-o como o decorador mais interno (mais próximo da função):
Code
# Correto: @human_feedback é o mais interno (mais próximo da função)
@start()
@human_feedback(message="Revise isto:")
def my_start_method(self):
    return "content"

@listen(other_method)
@human_feedback(message="Revise isto também:")
def my_listener(self, data):
    return f"processed: {data}"
Coloque @human_feedback como o decorador mais interno (último/mais próximo da função) para que ele envolva o método diretamente e possa capturar o valor de retorno antes de passar para o sistema de flow.

Melhores Práticas

1. Escreva Mensagens de Solicitação Claras

O parâmetro message é o que o humano vê. Torne-o acionável:
Code
# ✅ Bom - claro e acionável
@human_feedback(message="Este resumo captura com precisão os pontos-chave? Responda 'sim' ou explique o que está faltando:")

# ❌ Ruim - vago
@human_feedback(message="Revise isto:")

2. Escolha Outcomes Significativos

Ao usar emit, escolha outcomes que mapeiem naturalmente para respostas humanas:
Code
# ✅ Bom - outcomes em linguagem natural
emit=["approved", "rejected", "needs_more_detail"]

# ❌ Ruim - técnico ou pouco claro
emit=["state_1", "state_2", "state_3"]

3. Sempre Forneça um Outcome Padrão

Use default_outcome para lidar com casos onde usuários pressionam Enter sem digitar:
Code
@human_feedback(
    message="Aprovar? (pressione Enter para solicitar revisão)",
    emit=["approved", "needs_revision"],
    llm="gpt-4o-mini",
    default_outcome="needs_revision",  # Padrão seguro
)

4. Use o Histórico de Feedback para Trilhas de Auditoria

Acesse human_feedback_history para criar logs de auditoria:
Code
@listen(final_step)
def create_audit_log(self):
    log = []
    for fb in self.human_feedback_history:
        log.append({
            "step": fb.method_name,
            "outcome": fb.outcome,
            "feedback": fb.feedback,
            "timestamp": fb.timestamp.isoformat(),
        })
    return log

5. Trate Feedback Roteado e Não Roteado

Ao projetar flows, considere se você precisa de roteamento:
CenárioUse
Revisão simples, só precisa do texto do feedbackSem emit
Precisa ramificar para caminhos diferentes baseado na respostaUse emit
Portões de aprovação com aprovar/rejeitar/revisarUse emit
Coletando comentários apenas para loggingSem emit

Feedback Humano Assíncrono (Não-Bloqueante - Human in the loop)

Por padrão, @human_feedback bloqueia a execução aguardando entrada no console. Para aplicações de produção, você pode precisar de feedback assíncrono/não-bloqueante que se integre com sistemas externos como Slack, email, webhooks ou APIs.

A Abstração de Provider

Use o parâmetro provider para especificar uma estratégia customizada de coleta de feedback:
Code
from crewai.flow import Flow, start, human_feedback, HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext

class WebhookProvider(HumanFeedbackProvider):
    """Provider que pausa o flow e aguarda callback de webhook."""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
        # Notifica sistema externo (ex: envia mensagem Slack, cria ticket)
        self.send_notification(context)

        # Pausa execução - framework cuida da persistência automaticamente
        raise HumanFeedbackPending(
            context=context,
            callback_info={"webhook_url": f"{self.webhook_url}/{context.flow_id}"}
        )

class ReviewFlow(Flow):
    @start()
    @human_feedback(
        message="Revise este conteúdo:",
        emit=["approved", "rejected"],
        llm="gpt-4o-mini",
        provider=WebhookProvider("https://myapp.com/api"),
    )
    def generate_content(self):
        return "Conteúdo gerado por IA..."

    @listen("approved")
    def publish(self, result):
        return "Publicado!"
O framework de flow persiste automaticamente o estado quando HumanFeedbackPending é lançado. Seu provider só precisa notificar o sistema externo e lançar a exceção—não são necessárias chamadas manuais de persistência.

Tratando Flows Pausados

Ao usar um provider assíncrono, kickoff() retorna um objeto HumanFeedbackPending em vez de lançar uma exceção:
Code
flow = ReviewFlow()
result = flow.kickoff()

if isinstance(result, HumanFeedbackPending):
    # Flow está pausado, estado é automaticamente persistido
    print(f"Aguardando feedback em: {result.callback_info['webhook_url']}")
    print(f"Flow ID: {result.context.flow_id}")
else:
    # Conclusão normal
    print(f"Flow concluído: {result}")

Retomando um Flow Pausado

Quando o feedback chega (ex: via webhook), retome o flow:
Code
# Handler síncrono:
def handle_feedback_webhook(flow_id: str, feedback: str):
    flow = ReviewFlow.from_pending(flow_id)
    result = flow.resume(feedback)
    return result

# Handler assíncrono (FastAPI, aiohttp, etc.):
async def handle_feedback_webhook(flow_id: str, feedback: str):
    flow = ReviewFlow.from_pending(flow_id)
    result = await flow.resume_async(feedback)
    return result

Tipos Principais

TipoDescrição
HumanFeedbackProviderProtocolo para providers de feedback customizados
PendingFeedbackContextContém todas as informações necessárias para retomar um flow pausado
HumanFeedbackPendingRetornado por kickoff() quando o flow está pausado para feedback
ConsoleProviderProvider padrão de entrada bloqueante no console

PendingFeedbackContext

O contexto contém tudo necessário para retomar:
Code
@dataclass
class PendingFeedbackContext:
    flow_id: str           # Identificador único desta execução de flow
    flow_class: str        # Nome qualificado completo da classe
    method_name: str       # Método que disparou o feedback
    method_output: Any     # Saída mostrada ao humano
    message: str           # A mensagem de solicitação
    emit: list[str] | None # Outcomes possíveis para roteamento
    default_outcome: str | None
    metadata: dict         # Metadata customizado
    llm: str | None        # LLM para mapeamento de outcome
    requested_at: datetime

Exemplo Completo de Flow Assíncrono

Code
from crewai.flow import (
    Flow, start, listen, human_feedback,
    HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
)

class SlackNotificationProvider(HumanFeedbackProvider):
    """Provider que envia notificações Slack e pausa para feedback assíncrono."""

    def __init__(self, channel: str):
        self.channel = channel

    def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
        # Envia notificação Slack (implemente você mesmo)
        slack_thread_id = self.post_to_slack(
            channel=self.channel,
            message=f"Revisão necessária:\n\n{context.method_output}\n\n{context.message}",
        )

        # Pausa execução - framework cuida da persistência automaticamente
        raise HumanFeedbackPending(
            context=context,
            callback_info={
                "slack_channel": self.channel,
                "thread_id": slack_thread_id,
            }
        )

class ContentPipeline(Flow):
    @start()
    @human_feedback(
        message="Aprova este conteúdo para publicação?",
        emit=["approved", "rejected", "needs_revision"],
        llm="gpt-4o-mini",
        default_outcome="needs_revision",
        provider=SlackNotificationProvider("#content-reviews"),
    )
    def generate_content(self):
        return "Conteúdo de blog post gerado por IA..."

    @listen("approved")
    def publish(self, result):
        print(f"Publicando! Revisor disse: {result.feedback}")
        return {"status": "published"}

    @listen("rejected")
    def archive(self, result):
        print(f"Arquivado. Motivo: {result.feedback}")
        return {"status": "archived"}

    @listen("needs_revision")
    def queue_revision(self, result):
        print(f"Na fila para revisão: {result.feedback}")
        return {"status": "revision_needed"}


# Iniciando o flow (vai pausar e aguardar resposta do Slack)
def start_content_pipeline():
    flow = ContentPipeline()
    result = flow.kickoff()

    if isinstance(result, HumanFeedbackPending):
        return {"status": "pending", "flow_id": result.context.flow_id}

    return result


# Retomando quando webhook do Slack dispara (handler síncrono)
def on_slack_feedback(flow_id: str, slack_message: str):
    flow = ContentPipeline.from_pending(flow_id)
    result = flow.resume(slack_message)
    return result


# Se seu handler é assíncrono (FastAPI, aiohttp, Slack Bolt async, etc.)
async def on_slack_feedback_async(flow_id: str, slack_message: str):
    flow = ContentPipeline.from_pending(flow_id)
    result = await flow.resume_async(slack_message)
    return result
Se você está usando um framework web assíncrono (FastAPI, aiohttp, Slack Bolt modo async), use await flow.resume_async() em vez de flow.resume(). Chamar resume() de dentro de um event loop em execução vai lançar um RuntimeError.

Melhores Práticas para Feedback Assíncrono

  1. Verifique o tipo de retorno: kickoff() retorna HumanFeedbackPending quando pausado—não precisa de try/except
  2. Use o método resume correto: Use resume() em código síncrono, await resume_async() em código assíncrono
  3. Armazene informações de callback: Use callback_info para armazenar URLs de webhook, IDs de tickets, etc.
  4. Implemente idempotência: Seu handler de resume deve ser idempotente por segurança
  5. Persistência automática: O estado é automaticamente salvo quando HumanFeedbackPending é lançado e usa SQLiteFlowPersistence por padrão
  6. Persistência customizada: Passe uma instância de persistência customizada para from_pending() se necessário

Documentação Relacionada