메인 콘텐츠로 건너뛰기

개요

@human_feedback 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다. 이는 특히 다음과 같은 경우에 유용합니다:
  • 품질 보증: AI가 생성한 콘텐츠를 다운스트림에서 사용하기 전에 검토
  • 결정 게이트: 자동화된 워크플로우에서 인간이 중요한 결정을 내리도록 허용
  • 승인 워크플로우: 승인/거부/수정 패턴 구현
  • 대화형 개선: 출력을 반복적으로 개선하기 위해 피드백 수집

빠른 시작

Flow에 인간 피드백을 추가하는 가장 간단한 방법은 다음과 같습니다:
Code
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback

class SimpleReviewFlow(Flow):
    @start()
    @human_feedback(message="이 콘텐츠를 검토해 주세요:")
    def generate_content(self):
        return "검토가 필요한 AI 생성 콘텐츠입니다."

    @listen(generate_content)
    def process_feedback(self, result):
        print(f"콘텐츠: {result.output}")
        print(f"인간의 의견: {result.feedback}")

flow = SimpleReviewFlow()
flow.kickoff()
이 Flow를 실행하면:
  1. generate_content를 실행하고 문자열을 반환합니다
  2. 요청 메시지와 함께 사용자에게 출력을 표시합니다
  3. 사용자가 피드백을 입력할 때까지 대기합니다 (또는 Enter를 눌러 건너뜁니다)
  4. HumanFeedbackResult 객체를 process_feedback에 전달합니다

@human_feedback 데코레이터

매개변수

매개변수타입필수설명
messagestr메서드 출력과 함께 인간에게 표시되는 메시지
emitSequence[str]아니오가능한 outcome 목록. 피드백이 이 중 하나로 매핑되어 @listen 데코레이터를 트리거합니다
llmstr | BaseLLMemit 지정 시피드백을 해석하고 outcome에 매핑하는 데 사용되는 LLM
default_outcomestr아니오피드백이 제공되지 않을 때 사용할 outcome. emit에 있어야 합니다
metadatadict아니오엔터프라이즈 통합을 위한 추가 데이터
providerHumanFeedbackProvider아니오비동기/논블로킹 피드백을 위한 커스텀 프로바이더. 비동기 인간 피드백 참조

기본 사용법 (라우팅 없음)

emit을 지정하지 않으면, 데코레이터는 단순히 피드백을 수집하고 다음 리스너에 HumanFeedbackResult를 전달합니다:
Code
@start()
@human_feedback(message="이 분석에 대해 어떻게 생각하시나요?")
def analyze_data(self):
    return "분석 결과: 매출 15% 증가, 비용 8% 감소"

@listen(analyze_data)
def handle_feedback(self, result):
    # result는 HumanFeedbackResult입니다
    print(f"분석: {result.output}")
    print(f"피드백: {result.feedback}")

emit을 사용한 라우팅

emit을 지정하면, 데코레이터는 라우터가 됩니다. 인간의 자유 형식 피드백이 LLM에 의해 해석되어 지정된 outcome 중 하나로 매핑됩니다:
Code
@start()
@human_feedback(
    message="이 콘텐츠의 출판을 승인하시겠습니까?",
    emit=["approved", "rejected", "needs_revision"],
    llm="gpt-4o-mini",
    default_outcome="needs_revision",
)
def review_content(self):
    return "블로그 게시물 초안 내용..."

@listen("approved")
def publish(self, result):
    print(f"출판 중! 사용자 의견: {result.feedback}")

@listen("rejected")
def discard(self, result):
    print(f"폐기됨. 이유: {result.feedback}")

@listen("needs_revision")
def revise(self, result):
    print(f"다음을 기반으로 수정 중: {result.feedback}")
LLM은 가능한 경우 구조화된 출력(function calling)을 사용하여 응답이 지정된 outcome 중 하나임을 보장합니다. 이로 인해 라우팅이 신뢰할 수 있고 예측 가능해집니다.

HumanFeedbackResult

HumanFeedbackResult 데이터클래스는 인간 피드백 상호작용에 대한 모든 정보를 포함합니다:
Code
from crewai.flow.human_feedback import HumanFeedbackResult

@dataclass
class HumanFeedbackResult:
    output: Any              # 인간에게 표시된 원래 메서드 출력
    feedback: str            # 인간의 원시 피드백 텍스트
    outcome: str | None      # 매핑된 outcome (emit이 지정된 경우)
    timestamp: datetime      # 피드백이 수신된 시간
    method_name: str         # 데코레이터된 메서드의 이름
    metadata: dict           # 데코레이터에 전달된 모든 메타데이터

리스너에서 접근하기

emit이 있는 @human_feedback 메서드에 의해 리스너가 트리거되면, HumanFeedbackResult를 받습니다:
Code
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
    print(f"원래 출력: {result.output}")
    print(f"사용자 피드백: {result.feedback}")
    print(f"Outcome: {result.outcome}")  # "approved"
    print(f"수신 시간: {result.timestamp}")

피드백 히스토리 접근하기

Flow 클래스는 인간 피드백에 접근하기 위한 두 가지 속성을 제공합니다:

last_human_feedback

가장 최근의 HumanFeedbackResult를 반환합니다:
Code
@listen(some_method)
def check_feedback(self):
    if self.last_human_feedback:
        print(f"마지막 피드백: {self.last_human_feedback.feedback}")

human_feedback_history

Flow 동안 수집된 모든 HumanFeedbackResult 객체의 리스트입니다:
Code
@listen(final_step)
def summarize(self):
    print(f"수집된 총 피드백: {len(self.human_feedback_history)}")
    for i, fb in enumerate(self.human_feedback_history):
        print(f"{i+1}. {fb.method_name}: {fb.outcome or '라우팅 없음'}")
HumanFeedbackResulthuman_feedback_history에 추가되므로, 여러 피드백 단계가 서로 덮어쓰지 않습니다. 이 리스트를 사용하여 Flow 동안 수집된 모든 피드백에 접근하세요.

완전한 예제: 콘텐츠 승인 워크플로우

콘텐츠 검토 및 승인 워크플로우를 구현하는 전체 예제입니다:
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
from pydantic import BaseModel


class ContentState(BaseModel):
    topic: str = ""
    draft: str = ""
    final_content: str = ""
    revision_count: int = 0


class ContentApprovalFlow(Flow[ContentState]):
    """콘텐츠를 생성하고 인간의 승인을 받는 Flow입니다."""

    @start()
    def get_topic(self):
        self.state.topic = input("어떤 주제에 대해 글을 쓸까요? ")
        return self.state.topic

    @listen(get_topic)
    def generate_draft(self, topic):
        # 실제 사용에서는 LLM을 호출합니다
        self.state.draft = f"# {topic}\n\n{topic}에 대한 초안입니다..."
        return self.state.draft

    @listen(generate_draft)
    @human_feedback(
        message="이 초안을 검토해 주세요. 'approved', 'rejected'로 답하거나 수정 피드백을 제공해 주세요:",
        emit=["approved", "rejected", "needs_revision"],
        llm="gpt-4o-mini",
        default_outcome="needs_revision",
    )
    def review_draft(self, draft):
        return draft

    @listen("approved")
    def publish_content(self, result: HumanFeedbackResult):
        self.state.final_content = result.output
        print("\n✅ 콘텐츠가 승인되어 출판되었습니다!")
        print(f"검토자 코멘트: {result.feedback}")
        return "published"

    @listen("rejected")
    def handle_rejection(self, result: HumanFeedbackResult):
        print("\n❌ 콘텐츠가 거부되었습니다")
        print(f"이유: {result.feedback}")
        return "rejected"

    @listen("needs_revision")
    def revise_content(self, result: HumanFeedbackResult):
        self.state.revision_count += 1
        print(f"\n📝 수정 #{self.state.revision_count} 요청됨")
        print(f"피드백: {result.feedback}")

        # 실제 Flow에서는 generate_draft로 돌아갈 수 있습니다
        # 이 예제에서는 단순히 확인합니다
        return "revision_requested"


# Flow 실행
flow = ContentApprovalFlow()
result = flow.kickoff()
print(f"\nFlow 완료. 요청된 수정: {flow.state.revision_count}")

다른 데코레이터와 결합하기

@human_feedback 데코레이터는 다른 Flow 데코레이터와 함께 작동합니다. 가장 안쪽 데코레이터(함수에 가장 가까운)로 배치하세요:
Code
# 올바름: @human_feedback이 가장 안쪽(함수에 가장 가까움)
@start()
@human_feedback(message="이것을 검토해 주세요:")
def my_start_method(self):
    return "content"

@listen(other_method)
@human_feedback(message="이것도 검토해 주세요:")
def my_listener(self, data):
    return f"processed: {data}"
@human_feedback를 가장 안쪽 데코레이터(마지막/함수에 가장 가까움)로 배치하여 메서드를 직접 래핑하고 Flow 시스템에 전달하기 전에 반환 값을 캡처할 수 있도록 하세요.

모범 사례

1. 명확한 요청 메시지 작성

message 매개변수는 인간이 보는 것입니다. 실행 가능하게 만드세요:
Code
# ✅ 좋음 - 명확하고 실행 가능
@human_feedback(message="이 요약이 핵심 포인트를 정확하게 캡처했나요? '예'로 답하거나 무엇이 빠졌는지 설명해 주세요:")

# ❌ 나쁨 - 모호함
@human_feedback(message="이것을 검토해 주세요:")

2. 의미 있는 Outcome 선택

emit을 사용할 때, 인간의 응답에 자연스럽게 매핑되는 outcome을 선택하세요:
Code
# ✅ 좋음 - 자연어 outcome
emit=["approved", "rejected", "needs_more_detail"]

# ❌ 나쁨 - 기술적이거나 불명확
emit=["state_1", "state_2", "state_3"]

3. 항상 기본 Outcome 제공

사용자가 입력 없이 Enter를 누르는 경우를 처리하기 위해 default_outcome을 사용하세요:
Code
@human_feedback(
    message="승인하시겠습니까? (수정 요청하려면 Enter 누르세요)",
    emit=["approved", "needs_revision"],
    llm="gpt-4o-mini",
    default_outcome="needs_revision",  # 안전한 기본값
)

4. 감사 추적을 위한 피드백 히스토리 사용

감사 로그를 생성하기 위해 human_feedback_history에 접근하세요:
Code
@listen(final_step)
def create_audit_log(self):
    log = []
    for fb in self.human_feedback_history:
        log.append({
            "step": fb.method_name,
            "outcome": fb.outcome,
            "feedback": fb.feedback,
            "timestamp": fb.timestamp.isoformat(),
        })
    return log

5. 라우팅된 피드백과 라우팅되지 않은 피드백 모두 처리

Flow를 설계할 때, 라우팅이 필요한지 고려하세요:
시나리오사용
간단한 검토, 피드백 텍스트만 필요emit 없음
응답에 따라 다른 경로로 분기 필요emit 사용
승인/거부/수정이 있는 승인 게이트emit 사용
로깅만을 위한 코멘트 수집emit 없음

비동기 인간 피드백 (논블로킹)

기본적으로 @human_feedback은 콘솔 입력을 기다리며 실행을 차단합니다. 프로덕션 애플리케이션에서는 Slack, 이메일, 웹훅 또는 API와 같은 외부 시스템과 통합되는 비동기/논블로킹 피드백이 필요할 수 있습니다.

Provider 추상화

커스텀 피드백 수집 전략을 지정하려면 provider 매개변수를 사용하세요:
Code
from crewai.flow import Flow, start, human_feedback, HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext

class WebhookProvider(HumanFeedbackProvider):
    """웹훅 콜백을 기다리며 Flow를 일시 중지하는 Provider."""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
        # 외부 시스템에 알림 (예: Slack 메시지 전송, 티켓 생성)
        self.send_notification(context)

        # 실행 일시 중지 - 프레임워크가 자동으로 영속성 처리
        raise HumanFeedbackPending(
            context=context,
            callback_info={"webhook_url": f"{self.webhook_url}/{context.flow_id}"}
        )

class ReviewFlow(Flow):
    @start()
    @human_feedback(
        message="이 콘텐츠를 검토해 주세요:",
        emit=["approved", "rejected"],
        llm="gpt-4o-mini",
        provider=WebhookProvider("https://myapp.com/api"),
    )
    def generate_content(self):
        return "AI가 생성한 콘텐츠..."

    @listen("approved")
    def publish(self, result):
        return "출판됨!"
Flow 프레임워크는 HumanFeedbackPending이 발생하면 자동으로 상태를 영속화합니다. Provider는 외부 시스템에 알리고 예외를 발생시키기만 하면 됩니다—수동 영속성 호출이 필요하지 않습니다.

일시 중지된 Flow 처리

비동기 provider를 사용하면 kickoff()는 예외를 발생시키는 대신 HumanFeedbackPending 객체를 반환합니다:
Code
flow = ReviewFlow()
result = flow.kickoff()

if isinstance(result, HumanFeedbackPending):
    # Flow가 일시 중지됨, 상태가 자동으로 영속화됨
    print(f"피드백 대기 중: {result.callback_info['webhook_url']}")
    print(f"Flow ID: {result.context.flow_id}")
else:
    # 정상 완료
    print(f"Flow 완료: {result}")

일시 중지된 Flow 재개

피드백이 도착하면 (예: 웹훅을 통해) Flow를 재개합니다:
Code
# 동기 핸들러:
def handle_feedback_webhook(flow_id: str, feedback: str):
    flow = ReviewFlow.from_pending(flow_id)
    result = flow.resume(feedback)
    return result

# 비동기 핸들러 (FastAPI, aiohttp 등):
async def handle_feedback_webhook(flow_id: str, feedback: str):
    flow = ReviewFlow.from_pending(flow_id)
    result = await flow.resume_async(feedback)
    return result

주요 타입

타입설명
HumanFeedbackProvider커스텀 피드백 provider를 위한 프로토콜
PendingFeedbackContext일시 중지된 Flow를 재개하는 데 필요한 모든 정보 포함
HumanFeedbackPendingFlow가 피드백을 위해 일시 중지되면 kickoff()에서 반환됨
ConsoleProvider기본 블로킹 콘솔 입력 provider

PendingFeedbackContext

컨텍스트는 재개에 필요한 모든 것을 포함합니다:
Code
@dataclass
class PendingFeedbackContext:
    flow_id: str           # 이 Flow 실행의 고유 식별자
    flow_class: str        # 정규화된 클래스 이름
    method_name: str       # 피드백을 트리거한 메서드
    method_output: Any     # 인간에게 표시된 출력
    message: str           # 요청 메시지
    emit: list[str] | None # 라우팅을 위한 가능한 outcome
    default_outcome: str | None
    metadata: dict         # 커스텀 메타데이터
    llm: str | None        # outcome 매핑을 위한 LLM
    requested_at: datetime

완전한 비동기 Flow 예제

Code
from crewai.flow import (
    Flow, start, listen, human_feedback,
    HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
)

class SlackNotificationProvider(HumanFeedbackProvider):
    """Slack 알림을 보내고 비동기 피드백을 위해 일시 중지하는 Provider."""

    def __init__(self, channel: str):
        self.channel = channel

    def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
        # Slack 알림 전송 (직접 구현)
        slack_thread_id = self.post_to_slack(
            channel=self.channel,
            message=f"검토 필요:\n\n{context.method_output}\n\n{context.message}",
        )

        # 실행 일시 중지 - 프레임워크가 자동으로 영속성 처리
        raise HumanFeedbackPending(
            context=context,
            callback_info={
                "slack_channel": self.channel,
                "thread_id": slack_thread_id,
            }
        )

class ContentPipeline(Flow):
    @start()
    @human_feedback(
        message="이 콘텐츠의 출판을 승인하시겠습니까?",
        emit=["approved", "rejected", "needs_revision"],
        llm="gpt-4o-mini",
        default_outcome="needs_revision",
        provider=SlackNotificationProvider("#content-reviews"),
    )
    def generate_content(self):
        return "AI가 생성한 블로그 게시물 콘텐츠..."

    @listen("approved")
    def publish(self, result):
        print(f"출판 중! 검토자 의견: {result.feedback}")
        return {"status": "published"}

    @listen("rejected")
    def archive(self, result):
        print(f"보관됨. 이유: {result.feedback}")
        return {"status": "archived"}

    @listen("needs_revision")
    def queue_revision(self, result):
        print(f"수정 대기열에 추가됨: {result.feedback}")
        return {"status": "revision_needed"}


# Flow 시작 (Slack 응답을 기다리며 일시 중지)
def start_content_pipeline():
    flow = ContentPipeline()
    result = flow.kickoff()

    if isinstance(result, HumanFeedbackPending):
        return {"status": "pending", "flow_id": result.context.flow_id}

    return result


# Slack 웹훅이 실행될 때 재개 (동기 핸들러)
def on_slack_feedback(flow_id: str, slack_message: str):
    flow = ContentPipeline.from_pending(flow_id)
    result = flow.resume(slack_message)
    return result


# 핸들러가 비동기인 경우 (FastAPI, aiohttp, Slack Bolt 비동기 등)
async def on_slack_feedback_async(flow_id: str, slack_message: str):
    flow = ContentPipeline.from_pending(flow_id)
    result = await flow.resume_async(slack_message)
    return result
비동기 웹 프레임워크(FastAPI, aiohttp, Slack Bolt 비동기 모드)를 사용하는 경우 flow.resume() 대신 await flow.resume_async()를 사용하세요. 실행 중인 이벤트 루프 내에서 resume()을 호출하면 RuntimeError가 발생합니다.

비동기 피드백 모범 사례

  1. 반환 타입 확인: kickoff()는 일시 중지되면 HumanFeedbackPending을 반환합니다—try/except가 필요하지 않습니다
  2. 올바른 resume 메서드 사용: 동기 코드에서는 resume(), 비동기 코드에서는 await resume_async() 사용
  3. 콜백 정보 저장: callback_info를 사용하여 웹훅 URL, 티켓 ID 등을 저장
  4. 멱등성 구현: 안전을 위해 resume 핸들러는 멱등해야 합니다
  5. 자동 영속성: HumanFeedbackPending이 발생하면 상태가 자동으로 저장되며 기본적으로 SQLiteFlowPersistence 사용
  6. 커스텀 영속성: 필요한 경우 from_pending()에 커스텀 영속성 인스턴스 전달

관련 문서