مقدمة
يوفر CrewAI القدرة على تشغيل طاقم بشكل غير متزامن، مما يتيح لك بدء تنفيذ الطاقم بطريقة غير حاجبة.
هذه الميزة مفيدة بشكل خاص عندما تريد تشغيل عدة أطقم بشكل متزامن أو عندما تحتاج إلى أداء مهام أخرى أثناء تنفيذ الطاقم.
يقدم CrewAI نهجين للتنفيذ غير المتزامن:
| الطريقة | النوع | الوصف |
|---|
akickoff() | غير متزامن أصلي | async/await أصلي عبر سلسلة التنفيذ بالكامل |
kickoff_async() | قائم على الخيوط | يغلف التنفيذ المتزامن في asyncio.to_thread |
لأحمال العمل عالية التزامن، يُوصى باستخدام akickoff() لأنه يستخدم async أصلي لتنفيذ المهام وعمليات الذاكرة واسترجاع المعرفة.
التنفيذ غير المتزامن الأصلي مع akickoff()
توفر طريقة akickoff() تنفيذاً غير متزامن أصلياً حقيقياً، باستخدام async/await عبر سلسلة التنفيذ بالكامل بما في ذلك تنفيذ المهام وعمليات الذاكرة واستعلامات المعرفة.
توقيع الطريقة
async def akickoff(self, inputs: dict) -> CrewOutput:
المعاملات
inputs (dict): قاموس يحتوي على بيانات الإدخال المطلوبة للمهام.
القيمة المُرجعة
CrewOutput: كائن يمثل نتيجة تنفيذ الطاقم.
مثال: تنفيذ طاقم غير متزامن أصلي
import asyncio
from crewai import Crew, Agent, Task
# Create an agent
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
# Create a task
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
# Create a crew
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
# Native async execution
async def main():
result = await analysis_crew.akickoff(inputs={"ages": [25, 30, 35, 40, 45]})
print("Crew Result:", result)
asyncio.run(main())
مثال: عدة أطقم غير متزامنة أصلية
تشغيل عدة أطقم بشكل متزامن باستخدام asyncio.gather() مع async أصلي:
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
task_1 = Task(
description="Analyze the first dataset and calculate the average age. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
task_2 = Task(
description="Analyze the second dataset and calculate the average age. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])
async def main():
results = await asyncio.gather(
crew_1.akickoff(inputs={"ages": [25, 30, 35, 40, 45]}),
crew_2.akickoff(inputs={"ages": [20, 22, 24, 28, 30]})
)
for i, result in enumerate(results, 1):
print(f"Crew {i} Result:", result)
asyncio.run(main())
مثال: async أصلي لمدخلات متعددة
استخدم akickoff_for_each() لتنفيذ طاقمك على مدخلات متعددة بشكل متزامن مع async أصلي:
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
data_analysis_task = Task(
description="Analyze the dataset and calculate the average age. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
async def main():
datasets = [
{"ages": [25, 30, 35, 40, 45]},
{"ages": [20, 22, 24, 28, 30]},
{"ages": [30, 35, 40, 45, 50]}
]
results = await analysis_crew.akickoff_for_each(datasets)
for i, result in enumerate(results, 1):
print(f"Dataset {i} Result:", result)
asyncio.run(main())
التنفيذ غير المتزامن القائم على الخيوط مع kickoff_async()
توفر طريقة kickoff_async() تنفيذاً غير متزامن عن طريق تغليف kickoff() المتزامن في خيط. هذا مفيد للتكامل البسيط مع async أو للتوافق مع الإصدارات السابقة.
توقيع الطريقة
async def kickoff_async(self, inputs: dict) -> CrewOutput:
المعاملات
inputs (dict): قاموس يحتوي على بيانات الإدخال المطلوبة للمهام.
القيمة المُرجعة
CrewOutput: كائن يمثل نتيجة تنفيذ الطاقم.
مثال: تنفيذ غير متزامن قائم على الخيوط
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
async def async_crew_execution():
result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
print("Crew Result:", result)
asyncio.run(async_crew_execution())
مثال: عدة أطقم غير متزامنة قائمة على الخيوط
import asyncio
from crewai import Crew, Agent, Task
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
task_1 = Task(
description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
task_2 = Task(
description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
)
crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])
async def async_multiple_crews():
result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]})
results = await asyncio.gather(result_1, result_2)
for i, result in enumerate(results, 1):
print(f"Crew {i} Result:", result)
asyncio.run(async_multiple_crews())
البث غير المتزامن
تدعم كلتا الطريقتين غير المتزامنتين البث عند تعيين stream=True على الطاقم:
import asyncio
from crewai import Crew, Agent, Task
agent = Agent(
role="Researcher",
goal="Research and summarize topics",
backstory="You are an expert researcher."
)
task = Task(
description="Research the topic: {topic}",
agent=agent,
expected_output="A comprehensive summary of the topic."
)
crew = Crew(
agents=[agent],
tasks=[task],
stream=True # Enable streaming
)
async def main():
streaming_output = await crew.akickoff(inputs={"topic": "AI trends in 2024"})
# Async iteration over streaming chunks
async for chunk in streaming_output:
print(f"Chunk: {chunk.content}")
# Access final result after streaming completes
result = streaming_output.result
print(f"Final result: {result.raw}")
asyncio.run(main())
حالات الاستخدام المحتملة
-
توليد المحتوى بالتوازي: تشغيل عدة أطقم مستقلة بشكل غير متزامن، كل منها مسؤول عن توليد محتوى حول مواضيع مختلفة. على سبيل المثال، قد يبحث طاقم ويصوغ مقالاً عن اتجاهات الذكاء الاصطناعي، بينما يولد طاقم آخر منشورات وسائل التواصل الاجتماعي حول إطلاق منتج جديد.
-
مهام أبحاث السوق المتزامنة: إطلاق عدة أطقم بشكل غير متزامن لإجراء أبحاث السوق بالتوازي. قد يحلل طاقم اتجاهات الصناعة، بينما يفحص آخر استراتيجيات المنافسين، ويقيّم ثالث مشاعر المستهلكين.
-
وحدات تخطيط السفر المستقلة: تنفيذ أطقم منفصلة للتخطيط المستقل لجوانب مختلفة من رحلة. قد يتعامل طاقم مع خيارات الرحلات الجوية، وآخر مع الإقامة، وثالث يخطط للأنشطة.
الاختيار بين akickoff() و kickoff_async()
| الميزة | akickoff() | kickoff_async() |
|---|
| نموذج التنفيذ | async/await أصلي | غلاف قائم على الخيوط |
| تنفيذ المهام | غير متزامن مع aexecute_sync() | متزامن في مجمع الخيوط |
| عمليات الذاكرة | غير متزامنة | متزامنة في مجمع الخيوط |
| استرجاع المعرفة | غير متزامن | متزامن في مجمع الخيوط |
| الأفضل لـ | أحمال العمل عالية التزامن والمرتبطة بالإدخال/الإخراج | التكامل البسيط مع async |
| دعم البث | نعم | نعم |