Skip to main content

소개

CrewAI는 crew 실행 중 실시간 출력을 스트리밍하는 기능을 제공하여, 전체 프로세스가 완료될 때까지 기다리지 않고 결과가 생성되는 대로 표시할 수 있습니다. 이 기능은 대화형 애플리케이션을 구축하거나, 사용자 피드백을 제공하거나, 장시간 실행되는 프로세스를 모니터링할 때 특히 유용합니다.

스트리밍 작동 방식

스트리밍이 활성화되면 CrewAI는 LLM 응답과 도구 호출을 실시간으로 캡처하여, 어떤 task와 agent가 실행 중인지에 대한 컨텍스트를 포함한 구조화된 청크로 패키징합니다. 이러한 청크를 실시간으로 반복 처리하고 실행이 완료되면 최종 결과에 접근할 수 있습니다.

스트리밍 활성화

스트리밍을 활성화하려면 crew를 생성할 때 stream 파라미터를 True로 설정하세요:
Code
from crewai import Agent, Crew, Task

# 에이전트와 태스크 생성
researcher = Agent(
    role="Research Analyst",
    goal="Gather comprehensive information on topics",
    backstory="You are an experienced researcher with excellent analytical skills.",
)

task = Task(
    description="Research the latest developments in AI",
    expected_output="A detailed report on recent AI advancements",
    agent=researcher,
)

# 스트리밍 활성화
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True  # 스트리밍 출력 활성화
)

동기 스트리밍

스트리밍이 활성화된 crew에서 kickoff()를 호출하면, 청크가 도착할 때마다 반복 처리할 수 있는 CrewStreamingOutput 객체가 반환됩니다:
Code
# 스트리밍 실행 시작
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})

# 청크가 도착할 때마다 반복
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# 스트리밍 완료 후 최종 결과 접근
result = streaming.result
print(f"\n\n최종 출력: {result.raw}")

스트림 청크 정보

각 청크는 실행에 대한 풍부한 컨텍스트를 제공합니다:
Code
streaming = crew.kickoff(inputs={"topic": "AI"})

for chunk in streaming:
    print(f"Task: {chunk.task_name} (인덱스 {chunk.task_index})")
    print(f"Agent: {chunk.agent_role}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT 또는 TOOL_CALL
    if chunk.tool_call:
        print(f"Tool: {chunk.tool_call.tool_name}")
        print(f"Arguments: {chunk.tool_call.arguments}")

스트리밍 결과 접근

CrewStreamingOutput 객체는 여러 유용한 속성을 제공합니다:
Code
streaming = crew.kickoff(inputs={"topic": "AI"})

# 청크 반복 및 수집
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# 반복 완료 후
print(f"\n완료됨: {streaming.is_completed}")
print(f"전체 텍스트: {streaming.get_full_text()}")
print(f"전체 청크 수: {len(streaming.chunks)}")
print(f"최종 결과: {streaming.result.raw}")

비동기 스트리밍

비동기 애플리케이션의 경우, 비동기 반복과 함께 akickoff()(네이티브 async) 또는 kickoff_async()(스레드 기반)를 사용할 수 있습니다:

akickoff()를 사용한 네이티브 Async

akickoff() 메서드는 전체 체인에서 진정한 네이티브 async 실행을 제공합니다:
Code
import asyncio

async def stream_crew():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    # 네이티브 async 스트리밍 시작
    streaming = await crew.akickoff(inputs={"topic": "AI"})

    # 청크에 대한 비동기 반복
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # 최종 결과 접근
    result = streaming.result
    print(f"\n\n최종 출력: {result.raw}")

asyncio.run(stream_crew())

kickoff_async()를 사용한 스레드 기반 Async

더 간단한 async 통합이나 하위 호환성을 위해:
Code
import asyncio

async def stream_crew():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    # 스레드 기반 async 스트리밍 시작
    streaming = await crew.kickoff_async(inputs={"topic": "AI"})

    # 청크에 대한 비동기 반복
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # 최종 결과 접근
    result = streaming.result
    print(f"\n\n최종 출력: {result.raw}")

asyncio.run(stream_crew())
고동시성 워크로드의 경우, 태스크 실행, 메모리 작업, 지식 검색에 네이티브 async를 사용하는 akickoff()가 권장됩니다. 자세한 내용은 Crew 비동기 시작 가이드를 참조하세요.

kickoff_for_each를 사용한 스트리밍

kickoff_for_each()로 여러 입력에 대해 crew를 실행할 때, 동기 또는 비동기 여부에 따라 스트리밍이 다르게 작동합니다:

동기 kickoff_for_each

동기 kickoff_for_each()를 사용하면, 각 입력에 대해 하나씩 CrewStreamingOutput 객체의 리스트가 반환됩니다:
Code
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True
)

inputs_list = [
    {"topic": "AI in healthcare"},
    {"topic": "AI in finance"}
]

# 스트리밍 출력 리스트 반환
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)

# 각 스트리밍 출력에 대해 반복
for i, streaming in enumerate(streaming_outputs):
    print(f"\n=== 입력 {i + 1} ===")
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\n\n결과 {i + 1}: {result.raw}")

비동기 kickoff_for_each_async

비동기 kickoff_for_each_async()를 사용하면, 모든 crew의 청크가 동시에 도착하는 대로 반환하는 단일 CrewStreamingOutput이 반환됩니다:
Code
import asyncio

async def stream_multiple_crews():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    inputs_list = [
        {"topic": "AI in healthcare"},
        {"topic": "AI in finance"}
    ]

    # 모든 crew에 대한 단일 스트리밍 출력 반환
    streaming = await crew.kickoff_for_each_async(inputs=inputs_list)

    # 모든 crew의 청크가 생성되는 대로 도착
    async for chunk in streaming:
        print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)

    # 모든 결과 접근
    results = streaming.results  # CrewOutput 객체 리스트
    for i, result in enumerate(results):
        print(f"\n\n결과 {i + 1}: {result.raw}")

asyncio.run(stream_multiple_crews())

스트림 청크 타입

청크는 chunk_type 필드로 표시되는 다양한 타입을 가질 수 있습니다:

TEXT 청크

LLM 응답의 표준 텍스트 콘텐츠:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)

TOOL_CALL 청크

수행 중인 도구 호출에 대한 정보:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL:
        print(f"\n도구 호출: {chunk.tool_call.tool_name}")
        print(f"인자: {chunk.tool_call.arguments}")

실용적인 예시: 스트리밍을 사용한 UI 구축

다음은 스트리밍을 사용한 대화형 애플리케이션을 구축하는 방법을 보여주는 완전한 예시입니다:
Code
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

async def interactive_research():
    # 스트리밍이 활성화된 crew 생성
    researcher = Agent(
        role="Research Analyst",
        goal="Provide detailed analysis on any topic",
        backstory="You are an expert researcher with broad knowledge.",
    )

    task = Task(
        description="Research and analyze: {topic}",
        expected_output="A comprehensive analysis with key insights",
        agent=researcher,
    )

    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True,
        verbose=False
    )

    # 사용자 입력 받기
    topic = input("연구할 주제를 입력하세요: ")

    print(f"\n{'='*60}")
    print(f"연구 중: {topic}")
    print(f"{'='*60}\n")

    # 스트리밍 실행 시작
    streaming = await crew.kickoff_async(inputs={"topic": topic})

    current_task = ""
    async for chunk in streaming:
        # 태스크 전환 표시
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            print(f"\n[{chunk.agent_role}] 작업 중: {chunk.task_name}")
            print("-" * 60)

        # 텍스트 청크 표시
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # 도구 호출 표시
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 도구 사용: {chunk.tool_call.tool_name}")

    # 최종 결과 표시
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("분석 완료!")
    print(f"{'='*60}")
    print(f"\n토큰 사용량: {result.token_usage}")

asyncio.run(interactive_research())

사용 사례

스트리밍은 다음과 같은 경우에 특히 유용합니다:
  • 대화형 애플리케이션: 에이전트가 작업하는 동안 사용자에게 실시간 피드백 제공
  • 장시간 실행 태스크: 연구, 분석 또는 콘텐츠 생성의 진행 상황 표시
  • 디버깅 및 모니터링: 에이전트 동작과 의사 결정을 실시간으로 관찰
  • 사용자 경험: 점진적인 결과를 표시하여 체감 지연 시간 감소
  • 라이브 대시보드: crew 실행 상태를 표시하는 모니터링 인터페이스 구축

중요 사항

  • 스트리밍은 crew의 모든 에이전트에 대해 자동으로 LLM 스트리밍을 활성화합니다
  • .result 속성에 접근하기 전에 모든 청크를 반복해야 합니다
  • 스트리밍을 사용하는 kickoff_for_each_async()의 경우, 모든 출력을 가져오려면 .results(복수형)를 사용하세요
  • 스트리밍은 최소한의 오버헤드를 추가하며 실제로 체감 성능을 향상시킬 수 있습니다
  • 각 청크는 풍부한 UI를 위한 전체 컨텍스트(태스크, 에이전트, 청크 타입)를 포함합니다

오류 처리

스트리밍 실행 중 오류 처리:
Code
streaming = crew.kickoff(inputs={"topic": "AI"})

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\n성공: {result.raw}")

except Exception as e:
    print(f"\n스트리밍 중 오류 발생: {e}")
    if streaming.is_completed:
        print("스트리밍은 완료되었지만 오류가 발생했습니다")
스트리밍을 활용하면 CrewAI로 더 반응성이 좋고 대화형인 애플리케이션을 구축하여 사용자에게 에이전트 실행과 결과에 대한 실시간 가시성을 제공할 수 있습니다.