Introduction
CrewAI provides the ability to kickoff a crew asynchronously, allowing you to start the crew execution in a non-blocking manner.
This feature is particularly useful when you want to run multiple crews concurrently or when you need to perform other tasks while the crew is executing.
CrewAI offers two approaches for async execution:
| Method | Type | Description |
|---|
akickoff() | Native async | True async/await throughout the entire execution chain |
kickoff_async() | Thread-based | Wraps synchronous execution in asyncio.to_thread |
For high-concurrency workloads, akickoff() is recommended as it uses native async for task execution, memory operations, and knowledge retrieval.
Native Async Execution with akickoff()
The akickoff() method provides true native async execution, using async/await throughout the entire execution chain including task execution, memory operations, and knowledge queries.
Method Signature
async def akickoff(self, inputs: dict) -> CrewOutput:
Parameters
inputs (dict): A dictionary containing the input data required for the tasks.
Returns
CrewOutput: An object representing the result of the crew execution.
Example: Native Async Crew Execution
import asyncio
from crewai import Crew, Agent, Task
# Create an agent
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
# Create a task
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
# Create a crew
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
# Native async execution
async def main():
result = await analysis_crew.akickoff(inputs={"ages": [25, 30, 35, 40, 45]})
print("Crew Result:", result)
asyncio.run(main())
Example: Multiple Native Async Crews
Run multiple crews concurrently using asyncio.gather() with native async:
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
task_1 = Task(
description="Analyze the first dataset and calculate the average age. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
task_2 = Task(
description="Analyze the second dataset and calculate the average age. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])
async def main():
results = await asyncio.gather(
crew_1.akickoff(inputs={"ages": [25, 30, 35, 40, 45]}),
crew_2.akickoff(inputs={"ages": [20, 22, 24, 28, 30]})
)
for i, result in enumerate(results, 1):
print(f"Crew {i} Result:", result)
asyncio.run(main())
Use akickoff_for_each() to execute your crew against multiple inputs concurrently with native async:
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
data_analysis_task = Task(
description="Analyze the dataset and calculate the average age. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
async def main():
datasets = [
{"ages": [25, 30, 35, 40, 45]},
{"ages": [20, 22, 24, 28, 30]},
{"ages": [30, 35, 40, 45, 50]}
]
results = await analysis_crew.akickoff_for_each(datasets)
for i, result in enumerate(results, 1):
print(f"Dataset {i} Result:", result)
asyncio.run(main())
Thread-Based Async with kickoff_async()
The kickoff_async() method provides async execution by wrapping the synchronous kickoff() in a thread. This is useful for simpler async integration or backward compatibility.
Method Signature
async def kickoff_async(self, inputs: dict) -> CrewOutput:
Parameters
inputs (dict): A dictionary containing the input data required for the tasks.
Returns
CrewOutput: An object representing the result of the crew execution.
Example: Thread-Based Async Execution
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
async def async_crew_execution():
result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
print("Crew Result:", result)
asyncio.run(async_crew_execution())
Example: Multiple Thread-Based Async Crews
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
task_1 = Task(
description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
task_2 = Task(
description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])
async def async_multiple_crews():
result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]})
results = await asyncio.gather(result_1, result_2)
for i, result in enumerate(results, 1):
print(f"Crew {i} Result:", result)
asyncio.run(async_multiple_crews())
Async Streaming
Both async methods support streaming when stream=True is set on the crew:
import asyncio
from crewai import Crew, Agent, Task
agent = Agent(
role="Researcher",
goal="Research and summarize topics",
backstory="You are an expert researcher."
)
task = Task(
description="Research the topic: {topic}",
agent=agent,
expected_output="A comprehensive summary of the topic."
)
crew = Crew(
agents=[agent],
tasks=[task],
stream=True # Enable streaming
)
async def main():
streaming_output = await crew.akickoff(inputs={"topic": "AI trends in 2024"})
# Async iteration over streaming chunks
async for chunk in streaming_output:
print(f"Chunk: {chunk.content}")
# Access final result after streaming completes
result = streaming_output.result
print(f"Final result: {result.raw}")
asyncio.run(main())
Potential Use Cases
-
Parallel Content Generation: Kickoff multiple independent crews asynchronously, each responsible for generating content on different topics. For example, one crew might research and draft an article on AI trends, while another crew generates social media posts about a new product launch.
-
Concurrent Market Research Tasks: Launch multiple crews asynchronously to conduct market research in parallel. One crew might analyze industry trends, while another examines competitor strategies, and yet another evaluates consumer sentiment.
-
Independent Travel Planning Modules: Execute separate crews to independently plan different aspects of a trip. One crew might handle flight options, another handles accommodation, and a third plans activities.
Choosing Between akickoff() and kickoff_async()
| Feature | akickoff() | kickoff_async() |
|---|
| Execution model | Native async/await | Thread-based wrapper |
| Task execution | Async with aexecute_sync() | Sync in thread pool |
| Memory operations | Async | Sync in thread pool |
| Knowledge retrieval | Async | Sync in thread pool |
| Best for | High-concurrency, I/O-bound workloads | Simple async integration |
| Streaming support | Yes | Yes |