메인 콘텐츠로 건너뛰기

소개

CrewAI는 crew를 비동기적으로 시작할 수 있는 기능을 제공합니다. 이를 통해 crew 실행을 블로킹(blocking) 없이 시작할 수 있습니다. 이 기능은 여러 개의 crew를 동시에 실행하거나 crew가 실행되는 동안 다른 작업을 수행해야 할 때 특히 유용합니다. CrewAI는 비동기 실행을 위해 두 가지 접근 방식을 제공합니다:
메서드타입설명
akickoff()네이티브 async전체 실행 체인에서 진정한 async/await 사용
kickoff_async()스레드 기반동기 실행을 asyncio.to_thread로 래핑
고동시성 워크로드의 경우 akickoff()가 권장됩니다. 이는 작업 실행, 메모리 작업, 지식 검색에 네이티브 async를 사용합니다.

akickoff()를 사용한 네이티브 비동기 실행

akickoff() 메서드는 작업 실행, 메모리 작업, 지식 쿼리를 포함한 전체 실행 체인에서 async/await를 사용하여 진정한 네이티브 비동기 실행을 제공합니다.

메서드 시그니처

Code
async def akickoff(self, inputs: dict) -> CrewOutput:

매개변수

  • inputs (dict): 작업에 필요한 입력 데이터를 포함하는 딕셔너리입니다.

반환

  • CrewOutput: crew 실행 결과를 나타내는 객체입니다.

예시: 네이티브 비동기 Crew 실행

Code
import asyncio
from crewai import Crew, Agent, Task

# 에이전트 생성
coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

# 작업 생성
data_analysis_task = Task(
    description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

# Crew 생성
analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

# 네이티브 비동기 실행
async def main():
    result = await analysis_crew.akickoff(inputs={"ages": [25, 30, 35, 40, 45]})
    print("Crew Result:", result)

asyncio.run(main())

예시: 여러 네이티브 비동기 Crew

asyncio.gather()를 사용하여 네이티브 async로 여러 crew를 동시에 실행:
Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

task_1 = Task(
    description="Analyze the first dataset and calculate the average age. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

task_2 = Task(
    description="Analyze the second dataset and calculate the average age. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])

async def main():
    results = await asyncio.gather(
        crew_1.akickoff(inputs={"ages": [25, 30, 35, 40, 45]}),
        crew_2.akickoff(inputs={"ages": [20, 22, 24, 28, 30]})
    )

    for i, result in enumerate(results, 1):
        print(f"Crew {i} Result:", result)

asyncio.run(main())

예시: 여러 입력에 대한 네이티브 비동기

akickoff_for_each()를 사용하여 네이티브 async로 여러 입력에 대해 crew를 동시에 실행:
Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

data_analysis_task = Task(
    description="Analyze the dataset and calculate the average age. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

async def main():
    datasets = [
        {"ages": [25, 30, 35, 40, 45]},
        {"ages": [20, 22, 24, 28, 30]},
        {"ages": [30, 35, 40, 45, 50]}
    ]

    results = await analysis_crew.akickoff_for_each(datasets)

    for i, result in enumerate(results, 1):
        print(f"Dataset {i} Result:", result)

asyncio.run(main())

kickoff_async()를 사용한 스레드 기반 비동기

kickoff_async() 메서드는 동기 kickoff()를 스레드로 래핑하여 비동기 실행을 제공합니다. 이는 더 간단한 비동기 통합이나 하위 호환성에 유용합니다.

메서드 시그니처

Code
async def kickoff_async(self, inputs: dict) -> CrewOutput:

매개변수

  • inputs (dict): 작업에 필요한 입력 데이터를 포함하는 딕셔너리입니다.

반환

  • CrewOutput: crew 실행 결과를 나타내는 객체입니다.

예시: 스레드 기반 비동기 실행

Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

data_analysis_task = Task(
    description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

async def async_crew_execution():
    result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    print("Crew Result:", result)

asyncio.run(async_crew_execution())

예시: 여러 스레드 기반 비동기 Crew

Code
import asyncio
from crewai import Crew, Agent, Task

coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

task_1 = Task(
    description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

task_2 = Task(
    description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])

async def async_multiple_crews():
    result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]})

    results = await asyncio.gather(result_1, result_2)

    for i, result in enumerate(results, 1):
        print(f"Crew {i} Result:", result)

asyncio.run(async_multiple_crews())

비동기 스트리밍

두 비동기 메서드 모두 crew에 stream=True가 설정된 경우 스트리밍을 지원합니다:
Code
import asyncio
from crewai import Crew, Agent, Task

agent = Agent(
    role="Researcher",
    goal="Research and summarize topics",
    backstory="You are an expert researcher."
)

task = Task(
    description="Research the topic: {topic}",
    agent=agent,
    expected_output="A comprehensive summary of the topic."
)

crew = Crew(
    agents=[agent],
    tasks=[task],
    stream=True  # 스트리밍 활성화
)

async def main():
    streaming_output = await crew.akickoff(inputs={"topic": "AI trends in 2024"})

    # 스트리밍 청크에 대한 비동기 반복
    async for chunk in streaming_output:
        print(f"Chunk: {chunk.content}")

    # 스트리밍 완료 후 최종 결과 접근
    result = streaming_output.result
    print(f"Final result: {result.raw}")

asyncio.run(main())

잠재적 사용 사례

  • 병렬 콘텐츠 생성: 여러 개의 독립적인 crew를 비동기적으로 시작하여, 각 crew가 다른 주제에 대한 콘텐츠 생성을 담당합니다. 예를 들어, 한 crew는 AI 트렌드에 대한 기사 조사 및 초안을 작성하는 반면, 또 다른 crew는 신제품 출시와 관련된 소셜 미디어 게시물을 생성할 수 있습니다.
  • 동시 시장 조사 작업: 여러 crew를 비동기적으로 시작하여 시장 조사를 병렬로 수행합니다. 한 crew는 업계 동향을 분석하고, 또 다른 crew는 경쟁사 전략을 조사하며, 또 다른 crew는 소비자 감정을 평가할 수 있습니다.
  • 독립적인 여행 계획 모듈: 각각 독립적으로 여행의 다양한 측면을 계획하도록 crew를 따로 실행합니다. 한 crew는 항공편 옵션을, 다른 crew는 숙박을, 세 번째 crew는 활동 계획을 담당할 수 있습니다.

akickoff()kickoff_async() 선택하기

기능akickoff()kickoff_async()
실행 모델네이티브 async/await스레드 기반 래퍼
작업 실행aexecute_sync()로 비동기스레드 풀에서 동기
메모리 작업비동기스레드 풀에서 동기
지식 검색비동기스레드 풀에서 동기
적합한 용도고동시성, I/O 바운드 워크로드간단한 비동기 통합
스트리밍 지원