Skip to main content

مقدمة

تدعم تدفقات CrewAI بث المخرجات، مما يتيح لك استلام تحديثات فورية أثناء تنفيذ تدفقك. تمكّنك هذه الميزة من بناء تطبيقات متجاوبة تعرض النتائج تدريجياً وتوفر تحديثات تقدم حية وتخلق تجربة مستخدم أفضل لسير العمل طويلة التشغيل.

كيف يعمل بث التدفق

عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM داخل التدفق. يقدم البث أجزاء منظمة تحتوي على المحتوى وسياق المهمة ومعلومات الوكيل مع تقدم التنفيذ.

تفعيل البث

لتفعيل البث، عيّن خاصية stream إلى True في فئة التدفق الخاصة بك:
Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class ResearchFlow(Flow):
    stream = True  # Enable streaming for the entire flow

    @start()
    def initialize(self):
        return {"topic": "AI trends"}

    @listen(initialize)
    def research_topic(self, data):
        researcher = Agent(
            role="Research Analyst",
            goal="Research topics thoroughly",
            backstory="Expert researcher with analytical skills",
        )

        task = Task(
            description="Research {topic} and provide insights",
            expected_output="Detailed research findings",
            agent=researcher,
        )

        crew = Crew(
            agents=[researcher],
            tasks=[task],
        )

        return crew.kickoff(inputs=data)

البث المتزامن

عند استدعاء kickoff() على تدفق مع تفعيل البث، يُرجع كائن FlowStreamingOutput يمكنك التكرار عليه:
Code
flow = ResearchFlow()

# Start streaming execution
streaming = flow.kickoff()

# Iterate over chunks as they arrive
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")

معلومات جزء البث

يوفر كل جزء سياقاً حول مصدره في التدفق:
Code
streaming = flow.kickoff()

for chunk in streaming:
    print(f"Agent: {chunk.agent_role}")
    print(f"Task: {chunk.task_name}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT or TOOL_CALL

الوصول إلى خصائص البث

يوفر كائن FlowStreamingOutput خصائص وطرق مفيدة:
Code
streaming = flow.kickoff()

# Iterate and collect chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result}")

البث غير المتزامن

للتطبيقات غير المتزامنة، استخدم kickoff_async() مع التكرار غير المتزامن:
Code
import asyncio

async def stream_flow():
    flow = ResearchFlow()

    # Start async streaming
    streaming = await flow.kickoff_async()

    # Async iteration over chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result}")

asyncio.run(stream_flow())

البث مع التدفقات متعددة الخطوات

يعمل البث بسلاسة عبر خطوات تدفق متعددة، بما في ذلك التدفقات التي تنفذ أطقم متعددة:
Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class MultiStepFlow(Flow):
    stream = True

    @start()
    def research_phase(self):
        """First crew: Research the topic."""
        researcher = Agent(
            role="Research Analyst",
            goal="Gather comprehensive information",
            backstory="Expert at finding relevant information",
        )

        task = Task(
            description="Research AI developments in healthcare",
            expected_output="Research findings on AI in healthcare",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state["research"] = result.raw
        return result.raw

    @listen(research_phase)
    def analysis_phase(self, research_data):
        """Second crew: Analyze the research."""
        analyst = Agent(
            role="Data Analyst",
            goal="Analyze information and extract insights",
            backstory="Expert at identifying patterns and trends",
        )

        task = Task(
            description="Analyze this research: {research}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"research": research_data})


# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()

current_step = ""
for chunk in streaming:
    # Track which flow step is executing
    if chunk.task_name != current_step:
        current_step = chunk.task_name
        print(f"\n\n=== {chunk.task_name} ===\n")

    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal analysis: {result}")

مثال عملي: لوحة معلومات التقدم

إليك مثالاً كاملاً يوضح كيفية بناء لوحة معلومات تقدم مع البث:
Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

class ResearchPipeline(Flow):
    stream = True

    @start()
    def gather_data(self):
        researcher = Agent(
            role="Data Gatherer",
            goal="Collect relevant information",
            backstory="Skilled at finding quality sources",
        )

        task = Task(
            description="Gather data on renewable energy trends",
            expected_output="Collection of relevant data points",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        self.state["data"] = result.raw
        return result.raw

    @listen(gather_data)
    def analyze_data(self, data):
        analyst = Agent(
            role="Data Analyst",
            goal="Extract meaningful insights",
            backstory="Expert at data analysis",
        )

        task = Task(
            description="Analyze: {data}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"data": data})


async def run_with_dashboard():
    flow = ResearchPipeline()

    print("="*60)
    print("RESEARCH PIPELINE DASHBOARD")
    print("="*60)

    streaming = await flow.kickoff_async()

    current_agent = ""
    current_task = ""
    chunk_count = 0

    async for chunk in streaming:
        chunk_count += 1

        # Display phase transitions
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            current_agent = chunk.agent_role
            print(f"\n\n📋 Phase: {current_task}")
            print(f"👤 Agent: {current_agent}")
            print("-" * 60)

        # Display text output
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Display tool usage
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")

    # Show completion summary
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("PIPELINE COMPLETE")
    print(f"{'='*60}")
    print(f"Total chunks: {chunk_count}")
    print(f"Final output length: {len(str(result))} characters")

asyncio.run(run_with_dashboard())

البث مع إدارة الحالة

يعمل البث بشكل طبيعي مع إدارة حالة التدفق:
Code
from pydantic import BaseModel

class AnalysisState(BaseModel):
    topic: str = ""
    research: str = ""
    insights: str = ""

class StatefulStreamingFlow(Flow[AnalysisState]):
    stream = True

    @start()
    def research(self):
        # State is available during streaming
        topic = self.state.topic
        print(f"Researching: {topic}")

        researcher = Agent(
            role="Researcher",
            goal="Research topics thoroughly",
            backstory="Expert researcher",
        )

        task = Task(
            description=f"Research {topic}",
            expected_output="Research findings",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state.research = result.raw
        return result.raw

    @listen(research)
    def analyze(self, research):
        # Access updated state
        print(f"Analyzing {len(self.state.research)} chars of research")

        analyst = Agent(
            role="Analyst",
            goal="Extract insights",
            backstory="Expert analyst",
        )

        task = Task(
            description="Analyze: {research}",
            expected_output="Key insights",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        result = crew.kickoff(inputs={"research": research})

        self.state.insights = result.raw
        return result.raw


# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})

for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")

حالات الاستخدام

بث التدفق ذو قيمة خاصة لـ:
  • سير العمل متعددة المراحل: عرض التقدم عبر مراحل البحث والتحليل والتوليف
  • خطوط الأنابيب المعقدة: توفير رؤية لتدفقات معالجة البيانات طويلة التشغيل
  • التطبيقات التفاعلية: بناء واجهات مستخدم متجاوبة تعرض النتائج الوسيطة
  • المراقبة والتصحيح: مراقبة تنفيذ التدفق وتفاعلات الأطقم في الوقت الفعلي
  • تتبع التقدم: إظهار المرحلة الحالية من سير العمل للمستخدمين
  • لوحات المعلومات الحية: إنشاء واجهات مراقبة لتدفقات الإنتاج

أنواع أجزاء البث

مثل بث الطاقم، يمكن أن تكون أجزاء التدفق من أنواع مختلفة:

أجزاء TEXT

محتوى نصي قياسي من استجابات LLM:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)

أجزاء TOOL_CALL

معلومات حول استدعاءات الأدوات داخل التدفق:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
        print(f"\nTool: {chunk.tool_call.tool_name}")
        print(f"Args: {chunk.tool_call.arguments}")

معالجة الأخطاء

التعامل مع الأخطاء بأناقة أثناء البث:
Code
flow = ResearchFlow()
streaming = flow.kickoff()

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess! Result: {result}")

except Exception as e:
    print(f"\nError during flow execution: {e}")
    if streaming.is_completed:
        print("Streaming completed but flow encountered an error")

ملاحظات مهمة

  • يفعّل البث تلقائياً بث LLM لأي أطقم مستخدمة داخل التدفق
  • يجب التكرار عبر جميع الأجزاء قبل الوصول إلى خاصية .result
  • يعمل البث مع كل من حالة التدفق المنظمة وغير المنظمة
  • يلتقط بث التدفق المخرجات من جميع الأطقم واستدعاءات LLM في التدفق
  • يتضمن كل جزء سياقاً حول الوكيل والمهمة التي ولدته
  • يضيف البث حملاً ضئيلاً لتنفيذ التدفق

الدمج مع تصور التدفق

يمكنك دمج البث مع تصور التدفق لتوفير صورة كاملة:
Code
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow")  # Creates HTML visualization

# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
من خلال الاستفادة من بث التدفق، يمكنك بناء تطبيقات متطورة ومتجاوبة توفر للمستخدمين رؤية فورية لسير العمل المعقدة متعددة المراحل، مما يجعل أتمتة الذكاء الاصطناعي الخاصة بك أكثر شفافية وجاذبية.