الانتقال إلى المحتوى الرئيسي

نظرة عامة

يوفر CrewAI عقد بث قائمًا على الإطارات للأنظمة التي تحتاج إلى أكثر من أجزاء نصية بسيطة. يصدر العقد كائنات StreamFrame مرتبة لأحداث دورة حياة Flow، وتوكنات LLM المباشرة، ونشاط الأدوات، ورسائل المحادثة، والأحداث المخصصة. استخدم هذه الواجهة عندما تبني واجهة مستخدم، أو جسر خدمة، أو تطبيق طرفية، أو وقت تشغيل نشر يحتاج إلى تدفق ثابت من الأحداث المهيكلة أثناء تشغيل Flow أو دورة محادثة أو استدعاء LLM مباشر.

StreamFrame

لكل إطار نفس الغلاف:
from crewai.types.streaming import StreamFrame

frame.id           # معرف إطار فريد
frame.seq          # ترتيب محلي للتنفيذ، عند توفره
frame.type         # نوع الحدث المصدر، مثل "flow_started"
frame.channel      # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace    # نطاق المصدر/وقت التشغيل
frame.timestamp    # طابع وقت الحدث
frame.parent_id    # معرف الحدث الأب، عند توفره
frame.previous_id  # معرف الحدث السابق، عند توفره
frame.data         # حمولة الحدث
frame.event        # اسم بديل لـ frame.data
frame.content      # نص قابل للطباعة لإطارات التوكن، وإلا ""
حقل channel هو أسرع طريقة لتوجيه الإطارات في المستهلكين:
القناةتحتوي على
llmتوكنات وأجزاء التفكير من أحداث بث LLM
flowدورة حياة Flow، وتنفيذ الدوال، والتوجيه، وأحداث الإيقاف/الاستئناف
toolsأحداث استخدام الأدوات
messagesأحداث سجل المحادثة
lifecycleأحداث دورة حياة وقت التشغيل التي لا تخص قناة أخرى
customأحداث لا تُطابق قناة مدمجة
يحافظ frame.type على نوع الحدث المصدر، حتى يتمكن المستهلكون من التعامل مع أحداث محددة داخل القناة.

بث Flow

عيّن stream=True على Flow لجعل kickoff() يعيد جلسة stream:
from crewai.flow import Flow, start


class ReportFlow(Flow):
    @start()
    def generate(self):
        return "done"


flow = ReportFlow(stream=True)
stream = flow.kickoff()

with stream:
    for chunk in stream:
        print(chunk.content, end="", flush=True)
        if chunk.type == "tool_usage_started":
            print(chunk.event["tool_name"])

result = stream.result
يجب استهلاك stream قبل قراءة stream.result. يؤدي الوصول إلى النتيجة مبكرًا إلى رفع RuntimeError حتى لا يتعامل المستهلكون بالخطأ مع تشغيل جزئي على أنه مكتمل. يمكنك أيضًا استدعاء flow.stream_events(...) مباشرة عندما تريد البث لاستدعاء واحد بدون تعيين stream=True على مثيل Flow.

التصفية حسب القناة

يوفر StreamSession إسقاطات حسب القناة تحافظ على ترتيب الإطارات العالمي داخل القناة المحددة:
stream = flow.stream_events()

with stream:
    for frame in stream.llm:
        print(frame.content, end="", flush=True)

result = stream.result
الإسقاطات المتاحة هي:
الإسقاطالإطارات
stream.eventsكل الإطارات
stream.llmإطارات LLM
stream.messagesإطارات رسائل المحادثة
stream.flowإطارات Flow
stream.toolsإطارات الأدوات
stream.interleave([...])مجموعة مختارة من القنوات
استخدم stream.interleave(["flow", "llm", "messages"]) عندما يريد المستهلك بعض القنوات فقط لكنه ما زال يحتاج إلى ترتيبها النسبي.

البث غير المتزامن

استخدم astream() للمستهلكين غير المتزامنين:
flow = ReportFlow()
stream = flow.astream()

async with stream:
    async for chunk in stream.events:
        print(chunk.channel, chunk.type, chunk.content)

result = stream.result
تملك الجلسة غير المتزامنة نفس إسقاطات الجلسة المتزامنة.

بث استدعاء LLM مباشر

ما زال llm.call(...) يعيد النتيجة النهائية المجمعة. استخدم llm.stream_events(...) عندما تريد التكرار على الأجزاء فور وصولها مع الحفاظ على حمولة الحدث المهيكلة:
from crewai import LLM


llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
    messages=[
        {
            "role": "user",
            "content": "Explain CrewAI streaming in two short sentences.",
        }
    ]
)

with stream:
    for chunk in stream:
        print(chunk.content, end="", flush=True)

result = stream.result
يفعل llm.stream_events(...) البث مؤقتًا للاستدعاء المغلف ثم يستعيد إعداد stream السابق في LLM بعد ذلك. تستمر تكاملات المزودين في إصدار أحداث بث LLM الأساسية؛ يوفر هذا المساعد واجهة مكرر مشتركة فوق تلك الأحداث لكل مزودي LLM.

دورات المحادثة

يمكن للتدفقات المحادثية بث دورة مستخدم واحدة باستخدام stream_turn():
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState


@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
    conversational = True


flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")

with stream:
    for frame in stream.events:
        if frame.channel == "llm" and frame.type == "llm_stream_chunk":
            print(frame.content, end="", flush=True)

reply = stream.result
أثناء stream_turn()، يفعّل مسار الإجابة المحادثية المدمج بث توكنات LLM لذلك الدور ثم يستعيد إعداد stream السابق في LLM بعد ذلك. يجب على معالجات المسارات المخصصة التي تنشئ Agents أو مثيلات LLM خاصة بها تهيئة تلك النماذج للبث إذا احتاجت إلى إخراج على مستوى التوكن.

التنظيف

استخدم الجلسة كمدير سياق عندما يكون ذلك ممكنًا. إذا انقطع اتصال العميل قبل استهلاك stream بالكامل، فأغلق الجلسة صراحة:
stream = flow.stream_events()

try:
    for frame in stream.events:
        print(frame.type)
finally:
    if not stream.is_exhausted:
        stream.close()
للتدفقات غير المتزامنة، استخدم await stream.aclose().

بث الأجزاء القديم

ما زال بث Crew مع stream=True يعيد واجهة CrewStreamingOutput المعتمدة على الأجزاء والموضحة في بث تنفيذ Crew. وما زالت استدعاءات llm.call(...) المباشرة تعيد نتيجة LLM النهائية. عقد الإطارات مخصص لأوقات التشغيل التي تحتاج إلى غلاف حدث ثابت عبر Flows، واستدعاءات LLM المباشرة، ودورات المحادثة، والأدوات، والرسائل.