Pular para o conteúdo principal

Introdução

A CrewAI oferece a capacidade de iniciar uma crew de forma assíncrona, permitindo que você comece a execução da crew de maneira não bloqueante. Esse recurso é especialmente útil quando você deseja executar múltiplas crews simultaneamente ou quando precisa realizar outras tarefas enquanto a crew está em execução. O CrewAI oferece duas abordagens para execução assíncrona:
MétodoTipoDescrição
akickoff()Async nativoAsync/await verdadeiro em toda a cadeia de execução
kickoff_async()Baseado em threadEnvolve execução síncrona em asyncio.to_thread
Para cargas de trabalho de alta concorrência, akickoff() é recomendado pois usa async nativo para execução de tasks, operações de memória e recuperação de conhecimento.

Execução Async Nativa com akickoff()

O método akickoff() fornece execução async nativa verdadeira, usando async/await em toda a cadeia de execução, incluindo execução de tasks, operações de memória e consultas de conhecimento.

Assinatura do Método

Code
async def akickoff(self, inputs: dict) -> CrewOutput:

Parâmetros

  • inputs (dict): Um dicionário contendo os dados de entrada necessários para as tarefas.

Retorno

  • CrewOutput: Um objeto que representa o resultado da execução da crew.

Exemplo: Execução Async Nativa de Crew

Code
import asyncio
from crewai import Crew, Agent, Task

# Criar um agente
coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

# Criar uma tarefa
data_analysis_task = Task(
    description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

# Criar uma crew
analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

# Execução async nativa
async def main():
    result = await analysis_crew.akickoff(inputs={"ages": [25, 30, 35, 40, 45]})
    print("Crew Result:", result)

asyncio.run(main())

Exemplo: Múltiplas Crews Async Nativas

Execute múltiplas crews concorrentemente usando asyncio.gather() com async nativo:
Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

task_1 = Task(
    description="Analyze the first dataset and calculate the average age. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

task_2 = Task(
    description="Analyze the second dataset and calculate the average age. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])

async def main():
    results = await asyncio.gather(
        crew_1.akickoff(inputs={"ages": [25, 30, 35, 40, 45]}),
        crew_2.akickoff(inputs={"ages": [20, 22, 24, 28, 30]})
    )

    for i, result in enumerate(results, 1):
        print(f"Crew {i} Result:", result)

asyncio.run(main())

Exemplo: Async Nativo para Múltiplas Entradas

Use akickoff_for_each() para executar sua crew contra múltiplas entradas concorrentemente com async nativo:
Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

data_analysis_task = Task(
    description="Analyze the dataset and calculate the average age. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

async def main():
    datasets = [
        {"ages": [25, 30, 35, 40, 45]},
        {"ages": [20, 22, 24, 28, 30]},
        {"ages": [30, 35, 40, 45, 50]}
    ]

    results = await analysis_crew.akickoff_for_each(datasets)

    for i, result in enumerate(results, 1):
        print(f"Dataset {i} Result:", result)

asyncio.run(main())

Async Baseado em Thread com kickoff_async()

O método kickoff_async() fornece execução async envolvendo o kickoff() síncrono em uma thread. Isso é útil para integração async mais simples ou compatibilidade retroativa.

Assinatura do Método

Code
async def kickoff_async(self, inputs: dict) -> CrewOutput:

Parâmetros

  • inputs (dict): Um dicionário contendo os dados de entrada necessários para as tarefas.

Retorno

  • CrewOutput: Um objeto que representa o resultado da execução da crew.

Exemplo: Execução Async Baseada em Thread

Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

data_analysis_task = Task(
    description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

async def async_crew_execution():
    result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    print("Crew Result:", result)

asyncio.run(async_crew_execution())

Exemplo: Múltiplas Crews Async Baseadas em Thread

Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

task_1 = Task(
    description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

task_2 = Task(
    description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])

async def async_multiple_crews():
    result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]})

    results = await asyncio.gather(result_1, result_2)

    for i, result in enumerate(results, 1):
        print(f"Crew {i} Result:", result)

asyncio.run(async_multiple_crews())

Streaming Assíncrono

Ambos os métodos async suportam streaming quando stream=True está definido na crew:
Code
import asyncio
from crewai import Crew, Agent, Task

agent = Agent(
    role="Researcher",
    goal="Research and summarize topics",
    backstory="You are an expert researcher."
)

task = Task(
    description="Research the topic: {topic}",
    agent=agent,
    expected_output="A comprehensive summary of the topic."
)

crew = Crew(
    agents=[agent],
    tasks=[task],
    stream=True  # Habilitar streaming
)

async def main():
    streaming_output = await crew.akickoff(inputs={"topic": "AI trends in 2024"})

    # Iteração async sobre chunks de streaming
    async for chunk in streaming_output:
        print(f"Chunk: {chunk.content}")

    # Acessar resultado final após streaming completar
    result = streaming_output.result
    print(f"Final result: {result.raw}")

asyncio.run(main())

Possíveis Casos de Uso

  • Geração Paralela de Conteúdo: Inicie múltiplas crews independentes de forma assíncrona, cada uma responsável por gerar conteúdo sobre temas diferentes. Por exemplo, uma crew pode pesquisar e redigir um artigo sobre tendências em IA, enquanto outra gera posts para redes sociais sobre o lançamento de um novo produto.
  • Tarefas Conjuntas de Pesquisa de Mercado: Lance múltiplas crews de forma assíncrona para realizar pesquisas de mercado em paralelo. Uma crew pode analisar tendências do setor, outra examinar estratégias de concorrentes e ainda outra avaliar o sentimento do consumidor.
  • Módulos Independentes de Planejamento de Viagem: Execute crews separadas para planejar diferentes aspectos de uma viagem de forma independente. Uma crew pode cuidar das opções de voo, outra das acomodações e uma terceira do planejamento das atividades.

Escolhendo entre akickoff() e kickoff_async()

Recursoakickoff()kickoff_async()
Modelo de execuçãoAsync/await nativoWrapper baseado em thread
Execução de tasksAsync com aexecute_sync()Síncrono em thread pool
Operações de memóriaAsyncSíncrono em thread pool
Recuperação de conhecimentoAsyncSíncrono em thread pool
Melhor paraAlta concorrência, cargas I/O-boundIntegração async simples
Suporte a streamingSimSim