Pular para o conteúdo principal

Visão geral

O CrewAI expõe um contrato de streaming baseado em frames para runtimes que precisam de mais do que chunks de texto simples. O contrato emite objetos StreamFrame ordenados para eventos de ciclo de vida de Flow, tokens de LLM diretos, atividade de ferramentas, mensagens de conversa e eventos personalizados. Use esta API ao criar uma UI, ponte de serviço, aplicativo de terminal ou runtime de implantação que precise de um fluxo estável de eventos estruturados enquanto um Flow, turno de chat ou chamada direta de LLM está em execução.

StreamFrame

Todo frame tem o mesmo envelope:
from crewai.types.streaming import StreamFrame

frame.id           # id único do frame
frame.seq          # ordem local da execução, quando disponível
frame.type         # tipo do evento de origem, como "flow_started"
frame.channel      # "llm", "flow", "tools", "messages", "lifecycle" ou "custom"
frame.namespace    # namespace de origem/runtime
frame.timestamp    # timestamp do evento
frame.parent_id    # id do evento pai, quando disponível
frame.previous_id  # id do evento anterior, quando disponível
frame.data         # payload do evento
frame.event        # alias para frame.data
frame.content      # texto imprimível para frames de token, caso contrário ""
O campo channel é a forma mais rápida de rotear frames em consumidores:
CanalContém
llmTokens e chunks de raciocínio de eventos de streaming de LLM
flowCiclo de vida do Flow, execução de métodos, roteamento e eventos de pausa/retomada
toolsEventos de uso de ferramentas
messagesEventos do transcript da conversa
lifecycleEventos de ciclo de vida do runtime que não pertencem a outro canal
customEventos que não mapeiam para um canal integrado
frame.type preserva o tipo do evento de origem, para que consumidores possam tratar eventos específicos dentro de um canal.

Transmitir um Flow

Defina stream=True em um Flow para fazer kickoff() retornar uma sessão de stream:
from crewai.flow import Flow, start


class ReportFlow(Flow):
    @start()
    def generate(self):
        return "done"


flow = ReportFlow(stream=True)
stream = flow.kickoff()

with stream:
    for chunk in stream:
        print(chunk.content, end="", flush=True)
        if chunk.type == "tool_usage_started":
            print(chunk.event["tool_name"])

result = stream.result
Você deve consumir o stream antes de ler stream.result. Acessar o resultado cedo demais gera um RuntimeError, para que consumidores não tratem uma execução parcial como concluída. Você também pode chamar flow.stream_events(...) diretamente quando quiser streaming para uma única invocação sem definir stream=True na instância do Flow.

Filtrar por canal

StreamSession expõe projeções por canal que preservam a ordem global dos frames dentro do canal selecionado:
stream = flow.stream_events()

with stream:
    for frame in stream.llm:
        print(frame.content, end="", flush=True)

result = stream.result
As projeções disponíveis são:
ProjeçãoFrames
stream.eventsTodos os frames
stream.llmFrames de LLM
stream.messagesFrames de mensagens de conversa
stream.flowFrames de Flow
stream.toolsFrames de ferramentas
stream.interleave([...])Um conjunto selecionado de canais
Use stream.interleave(["flow", "llm", "messages"]) quando um consumidor quiser apenas alguns canais, mas ainda precisar da ordem relativa entre eles.

Streaming assíncrono

Use astream() para consumidores assíncronos:
flow = ReportFlow()
stream = flow.astream()

async with stream:
    async for chunk in stream.events:
        print(chunk.channel, chunk.type, chunk.content)

result = stream.result
A sessão assíncrona tem as mesmas projeções da sessão síncrona.

Transmitir uma chamada direta de LLM

llm.call(...) ainda retorna o resultado final montado. Use llm.stream_events(...) quando quiser iterar pelos chunks conforme eles chegam, mantendo o payload estruturado do evento:
from crewai import LLM


llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
    messages=[
        {
            "role": "user",
            "content": "Explain CrewAI streaming in two short sentences.",
        }
    ]
)

with stream:
    for chunk in stream:
        print(chunk.content, end="", flush=True)

result = stream.result
llm.stream_events(...) ativa temporariamente o streaming para a chamada encapsulada e restaura a configuração anterior de stream do LLM depois. As integrações de provedores continuam emitindo os eventos de stream de LLM subjacentes; esse helper fornece uma API de iterador comum sobre esses eventos para todos os provedores de LLM.

Turnos conversacionais

Flows conversacionais podem transmitir um turno de usuário com stream_turn():
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState


@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
    conversational = True


flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")

with stream:
    for frame in stream.events:
        if frame.channel == "llm" and frame.type == "llm_stream_chunk":
            print(frame.content, end="", flush=True)

reply = stream.result
Durante stream_turn(), o caminho de resposta conversacional integrado ativa o streaming de tokens de LLM para esse turno e restaura a configuração anterior de stream do LLM depois. Handlers de rota personalizados que criam seus próprios agentes ou instâncias de LLM devem configurar esses LLMs para streaming se precisarem de saída em nível de token.

Limpeza

Use a sessão como gerenciador de contexto quando possível. Se um cliente se desconectar antes de o stream ser esgotado, feche a sessão explicitamente:
stream = flow.stream_events()

try:
    for frame in stream.events:
        print(frame.type)
finally:
    if not stream.is_exhausted:
        stream.close()
Para streams assíncronos, use await stream.aclose().

Streaming de chunks legado

O streaming de Crew com stream=True ainda retorna a API orientada a chunks CrewStreamingOutput descrita em Streaming da Execução de Crew. Chamadas diretas llm.call(...) ainda retornam o resultado final do LLM. O contrato de frames é destinado a runtimes que precisam de um envelope de evento estável em Flows, chamadas diretas de LLM, turnos conversacionais, ferramentas e mensagens.