الانتقال إلى المحتوى الرئيسي

نظرة عامة

تدفقات CrewAI هي ميزة قوية مصممة لتبسيط إنشاء وإدارة سير عمل الذكاء الاصطناعي. تتيح التدفقات للمطورين دمج وتنسيق مهام البرمجة وفرق Crew بكفاءة، مما يوفر إطار عمل متين لبناء أتمتة ذكاء اصطناعي متطورة. تتيح لك التدفقات إنشاء سير عمل منظم يعتمد على الأحداث. فهي توفر طريقة سلسة لربط مهام متعددة وإدارة الحالة والتحكم في تدفق التنفيذ في تطبيقات الذكاء الاصطناعي الخاصة بك. باستخدام التدفقات، يمكنك بسهولة تصميم وتنفيذ عمليات متعددة الخطوات تستفيد من الإمكانيات الكاملة لـ CrewAI.
  1. تبسيط إنشاء سير العمل: ربط فرق Crew والمهام المتعددة بسهولة لإنشاء سير عمل ذكاء اصطناعي معقد.
  2. إدارة الحالة: تجعل التدفقات إدارة ومشاركة الحالة بين المهام المختلفة في سير العمل أمرًا سهلًا للغاية.
  3. بنية تعتمد على الأحداث: مبنية على نموذج يعتمد على الأحداث، مما يتيح سير عمل ديناميكي وسريع الاستجابة.
  4. تحكم مرن في التدفق: تنفيذ المنطق الشرطي والحلقات والتفرع ضمن سير العمل.

البدء

لنقم بإنشاء تدفق بسيط حيث ستستخدم OpenAI لإنشاء مدينة عشوائية في مهمة واحدة ثم استخدام تلك المدينة لإنشاء حقيقة ممتعة في مهمة أخرى.
Code

from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion


class ExampleFlow(Flow):
    model = "gpt-4o-mini"

    @start()
    def generate_city(self):
        print("Starting flow")
        # Each flow state automatically gets a unique ID
        print(f"Flow State ID: {self.state['id']}")

        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": "Return the name of a random city in the world.",
                },
            ],
        )

        random_city = response["choices"][0]["message"]["content"]
        # Store the city in our state
        self.state["city"] = random_city
        print(f"Random City: {random_city}")

        return random_city

    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": f"Tell me a fun fact about {random_city}",
                },
            ],
        )

        fun_fact = response["choices"][0]["message"]["content"]
        # Store the fun fact in our state
        self.state["fun_fact"] = fun_fact
        return fun_fact



flow = ExampleFlow()
flow.plot()
result = flow.kickoff()

print(f"Generated fun fact: {result}")
Flow Visual image في المثال أعلاه، أنشأنا تدفقًا بسيطًا يولّد مدينة عشوائية باستخدام OpenAI ثم يولّد حقيقة ممتعة عن تلك المدينة. يتكون التدفق من مهمتين: generate_city و generate_fun_fact. مهمة generate_city هي نقطة البداية للتدفق، ومهمة generate_fun_fact تستمع لمخرجات مهمة generate_city. يتلقى كل مثيل من التدفق تلقائيًا معرّفًا فريدًا (UUID) في حالته، مما يساعد في تتبع وإدارة عمليات تنفيذ التدفق. يمكن للحالة أيضًا تخزين بيانات إضافية (مثل المدينة المولّدة والحقيقة الممتعة) التي تستمر طوال تنفيذ التدفق. عند تشغيل التدفق، سيقوم بما يلي:
  1. توليد معرّف فريد لحالة التدفق
  2. توليد مدينة عشوائية وتخزينها في الحالة
  3. توليد حقيقة ممتعة عن تلك المدينة وتخزينها في الحالة
  4. طباعة النتائج في وحدة التحكم
يمكن أن يكون المعرّف الفريد للحالة والبيانات المخزّنة مفيدًا لتتبع عمليات تنفيذ التدفق والحفاظ على السياق بين المهام. ملاحظة: تأكد من إعداد ملف .env لتخزين OPENAI_API_KEY الخاص بك. هذا المفتاح ضروري للمصادقة على طلبات OpenAI API.

@start()

يحدد المزخرف @start() نقاط الدخول للتدفق. يمكنك:
  • تعريف عدة نقاط بداية غير مشروطة: @start()
  • ربط البداية بدالة سابقة أو تسمية موجّه: @start("method_or_label")
  • توفير شرط قابل للاستدعاء للتحكم في وقت تنفيذ البداية
جميع دوال @start() المستوفية للشروط ستُنفَّذ (غالبًا بالتوازي) عند بدء أو استئناف التدفق.

@listen()

يُستخدم المزخرف @listen() لتحديد دالة كمستمع لمخرجات مهمة أخرى في التدفق. ستُنفَّذ الدالة المزخرفة بـ @listen() عندما تُصدر المهمة المحددة مخرجاتها. يمكن للدالة الوصول إلى مخرجات المهمة التي تستمع إليها كمعامل.

الاستخدام

يمكن استخدام المزخرف @listen() بعدة طرق:
  1. الاستماع لدالة بالاسم: يمكنك تمرير اسم الدالة التي تريد الاستماع إليها كسلسلة نصية. عند اكتمال تلك الدالة، سيتم تشغيل دالة المستمع.
    Code
    @listen("generate_city")
    def generate_fun_fact(self, random_city):
        # Implementation
    
  2. الاستماع لدالة مباشرة: يمكنك تمرير الدالة نفسها. عند اكتمال تلك الدالة، سيتم تشغيل دالة المستمع.
    Code
    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        # Implementation
    

مخرجات التدفق

الوصول إلى مخرجات التدفق والتعامل معها أمر أساسي لدمج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر. توفر تدفقات CrewAI آليات مباشرة لاسترداد المخرجات النهائية والوصول إلى النتائج الوسيطة وإدارة الحالة العامة للتدفق.

استرداد المخرجات النهائية

عند تشغيل تدفق، يتم تحديد المخرجات النهائية بواسطة آخر دالة تكتمل. تُعيد دالة kickoff() مخرجات هذه الدالة الأخيرة. إليك كيفية الوصول إلى المخرجات النهائية:
from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        return "Output from first_method"

    @listen(first_method)
    def second_method(self, first_output):
        return f"Second method received: {first_output}"


flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()

print("---- Final Output ----")
print(final_output)
Flow Visual image في هذا المثال، second_method هي آخر دالة تكتمل، لذا ستكون مخرجاتها هي المخرجات النهائية للتدفق. ستُعيد دالة kickoff() المخرجات النهائية، التي تُطبع بعد ذلك في وحدة التحكم. ستولّد دالة plot() ملف HTML الذي سيساعدك على فهم التدفق.

الوصول إلى الحالة وتحديثها

بالإضافة إلى استرداد المخرجات النهائية، يمكنك أيضًا الوصول إلى الحالة وتحديثها داخل التدفق. يمكن استخدام الحالة لتخزين ومشاركة البيانات بين الدوال المختلفة في التدفق. بعد تشغيل التدفق، يمكنك الوصول إلى الحالة لاسترداد أي معلومات تمت إضافتها أو تحديثها أثناء التنفيذ. إليك مثال على كيفية تحديث الحالة والوصول إليها:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
Flow Visual image في هذا المثال، يتم تحديث الحالة بواسطة كل من first_method و second_method. بعد تشغيل التدفق، يمكنك الوصول إلى الحالة النهائية لرؤية التحديثات التي أجرتها هذه الدوال. من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر، مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق.

إدارة حالة التدفق

إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة، مما يتيح للمطورين اختيار النهج الأنسب لاحتياجات تطبيقاتهم.

إدارة الحالة غير المهيكلة

في إدارة الحالة غير المهيكلة، يتم تخزين جميع الحالات في خاصية state لفئة Flow. يوفر هذا النهج مرونة، مما يمكّن المطورين من إضافة أو تعديل خصائص الحالة أثناء التشغيل دون تحديد مخطط صارم. حتى مع الحالات غير المهيكلة، تولّد تدفقات CrewAI تلقائيًا معرّفًا فريدًا (UUID) لكل مثيل حالة وتحافظ عليه.
Code
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state['counter'] = 0
        self.state['message'] = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image ملاحظة: يتم توليد حقل id تلقائيًا والحفاظ عليه طوال تنفيذ التدفق. لا تحتاج إلى إدارته أو تعيينه يدويًا، وسيتم الحفاظ عليه حتى عند تحديث الحالة ببيانات جديدة. النقاط الرئيسية:
  • المرونة: يمكنك إضافة خصائص ديناميكيًا إلى self.state دون قيود محددة مسبقًا.
  • البساطة: مثالي لسير العمل البسيط حيث يكون هيكل الحالة بسيطًا أو متغيرًا بشكل كبير.

إدارة الحالة المهيكلة

تستفيد إدارة الحالة المهيكلة من مخططات محددة مسبقًا لضمان الاتساق وسلامة الأنواع عبر سير العمل. باستخدام نماذج مثل BaseModel من Pydantic، يمكن للمطورين تحديد الشكل الدقيق للحالة، مما يتيح تحققًا أفضل وإكمالًا تلقائيًا في بيئات التطوير. تتلقى كل حالة في تدفقات CrewAI تلقائيًا معرّفًا فريدًا (UUID) للمساعدة في تتبع وإدارة مثيلات الحالة. يتم توليد هذا المعرّف وإدارته تلقائيًا بواسطة نظام التدفق.
Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    # Note: 'id' field is automatically added to all states
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        # Access the auto-generated ID if needed
        print(f"State ID: {self.state.id}")
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
Flow Visual image النقاط الرئيسية:
  • مخطط محدد: يحدد ExampleState هيكل الحالة بوضوح، مما يعزز قابلية قراءة الكود وصيانته.
  • سلامة الأنواع: يضمن استخدام Pydantic التزام خصائص الحالة بالأنواع المحددة، مما يقلل من أخطاء وقت التشغيل.
  • الإكمال التلقائي: يمكن لبيئات التطوير المتكاملة توفير إكمال تلقائي أفضل وفحص أخطاء بناءً على نموذج الحالة المحدد.

الاختيار بين إدارة الحالة غير المهيكلة والمهيكلة

  • استخدم إدارة الحالة غير المهيكلة عندما:
    • يكون حالة سير العمل بسيطة أو ديناميكية للغاية.
    • تكون المرونة أولوية على تعريفات الحالة الصارمة.
    • يكون النماذج الأولية السريعة مطلوبة دون عبء تحديد المخططات.
  • استخدم إدارة الحالة المهيكلة عندما:
    • يتطلب سير العمل هيكل حالة محدد جيدًا ومتسق.
    • تكون سلامة الأنواع والتحقق مهمتين لموثوقية تطبيقك.
    • تريد الاستفادة من ميزات بيئة التطوير المتكاملة مثل الإكمال التلقائي وفحص الأنواع لتجربة مطور أفضل.
من خلال توفير خيارات إدارة الحالة غير المهيكلة والمهيكلة، تمكّن تدفقات CrewAI المطورين من بناء سير عمل ذكاء اصطناعي مرن ومتين في آن واحد، ملبيةً مجموعة واسعة من متطلبات التطبيقات.

استمرارية التدفق

يتيح مزخرف @persist الاستمرارية التلقائية للحالة في تدفقات CrewAI، مما يسمح لك بالحفاظ على حالة التدفق عبر عمليات إعادة التشغيل أو تنفيذات سير العمل المختلفة. يمكن تطبيق هذا المزخرف على مستوى الفئة أو مستوى الدالة، مما يوفر مرونة في كيفية إدارة استمرارية الحالة.

الاستمرارية على مستوى الفئة

عند التطبيق على مستوى الفئة، يقوم مزخرف @persist باستمرارية حالات جميع دوال التدفق تلقائيًا:
@persist  # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
    @start()
    def initialize_flow(self):
        # This method will automatically have its state persisted
        self.state.counter = 1
        print("Initialized flow. State ID:", self.state.id)

    @listen(initialize_flow)
    def next_step(self):
        # The state (including self.state.id) is automatically reloaded
        self.state.counter += 1
        print("Flow state is persisted. Counter:", self.state.counter)

الاستمرارية على مستوى الدالة

للتحكم الأكثر دقة، يمكنك تطبيق @persist على دوال محددة:
class AnotherFlow(Flow[dict]):
    @persist  # Persists only this method's state
    @start()
    def begin(self):
        if "runs" not in self.state:
            self.state["runs"] = 0
        self.state["runs"] += 1
        print("Method-level persisted runs:", self.state["runs"])

كيف تعمل

  1. تعريف الحالة الفريد
    • تتلقى كل حالة تدفق UUID فريد تلقائيًا
    • يتم الحفاظ على المعرّف عبر تحديثات الحالة واستدعاءات الدوال
    • يدعم كلًا من الحالات المهيكلة (Pydantic BaseModel) وغير المهيكلة (القاموس)
  2. واجهة SQLite الافتراضية
    • SQLiteFlowPersistence هي واجهة التخزين الافتراضية
    • يتم حفظ الحالات تلقائيًا في قاعدة بيانات SQLite محلية
    • معالجة أخطاء متينة تضمن رسائل واضحة في حالة فشل عمليات قاعدة البيانات
  3. معالجة الأخطاء
    • رسائل خطأ شاملة لعمليات قاعدة البيانات
    • تحقق تلقائي من الحالة أثناء الحفظ والتحميل
    • ملاحظات واضحة عند مواجهة مشاكل في عمليات الاستمرارية

اعتبارات مهمة

  • أنواع الحالة: يتم دعم كل من الحالات المهيكلة (Pydantic BaseModel) وغير المهيكلة (القاموس)
  • المعرّف التلقائي: يتم إضافة حقل id تلقائيًا إذا لم يكن موجودًا
  • استعادة الحالة: يمكن للتدفقات الفاشلة أو المُعاد تشغيلها إعادة تحميل حالتها السابقة تلقائيًا
  • التنفيذ المخصص: يمكنك توفير تنفيذ FlowPersistence الخاص بك لاحتياجات التخزين المتخصصة

المزايا التقنية

  1. تحكم دقيق من خلال الوصول المنخفض المستوى
    • وصول مباشر لعمليات الاستمرارية لحالات الاستخدام المتقدمة
    • تحكم دقيق عبر مزخرفات الاستمرارية على مستوى الدوال
    • قدرات مدمجة لفحص الحالة وتصحيح الأخطاء
    • رؤية كاملة لتغييرات الحالة وعمليات الاستمرارية
  2. موثوقية معززة
    • استعادة تلقائية للحالة بعد أعطال النظام أو إعادة التشغيل
    • تحديثات حالة قائمة على المعاملات لسلامة البيانات
    • معالجة أخطاء شاملة مع رسائل خطأ واضحة
    • تحقق متين أثناء عمليات حفظ وتحميل الحالة
  3. بنية قابلة للتوسع
    • واجهة استمرارية قابلة للتخصيص من خلال واجهة FlowPersistence
    • دعم لحلول تخزين متخصصة تتجاوز SQLite
    • متوافقة مع كل من الحالات المهيكلة (Pydantic) وغير المهيكلة (dict)
    • تكامل سلس مع أنماط تدفق CrewAI الحالية
تركز بنية نظام الاستمرارية على الدقة التقنية وخيارات التخصيص، مما يتيح للمطورين الحفاظ على التحكم الكامل في إدارة الحالة مع الاستفادة من ميزات الموثوقية المدمجة.

التحكم في التدفق

المنطق الشرطي: or

تتيح لك دالة or_ في التدفقات الاستماع لعدة دوال وتشغيل دالة المستمع عندما تُصدر أي من الدوال المحددة مخرجاتها.
from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):

    @start()
    def start_method(self):
        return "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        return "Hello from the second method"

    @listen(or_(start_method, second_method))
    def logger(self, result):
        print(f"Logger: {result}")



flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image عند تشغيل هذا التدفق، سيتم تشغيل دالة logger بواسطة مخرجات إما start_method أو second_method. تُستخدم دالة or_ للاستماع لعدة دوال وتشغيل دالة المستمع عندما تُصدر أي من الدوال المحددة مخرجاتها.

المنطق الشرطي: and

تتيح لك دالة and_ في التدفقات الاستماع لعدة دوال وتشغيل دالة المستمع فقط عندما تُصدر جميع الدوال المحددة مخرجاتها.
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.plot()
flow.kickoff()
Flow Visual image عند تشغيل هذا التدفق، سيتم تشغيل دالة logger فقط عندما يُصدر كل من start_method و second_method مخرجاتهما. تُستخدم دالة and_ للاستماع لعدة دوال وتشغيل دالة المستمع فقط عندما تُصدر جميع الدوال المحددة مخرجاتها.

الموجّه

يتيح لك مزخرف @router() في التدفقات تحديد منطق توجيه شرطي بناءً على مخرجات دالة. يمكنك تحديد مسارات مختلفة بناءً على مخرجات الدالة، مما يتيح لك التحكم في تدفق التنفيذ ديناميكيًا.
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image في المثال أعلاه، تولّد start_method قيمة منطقية عشوائية وتعيّنها في الحالة. تستخدم second_method مزخرف @router() لتحديد منطق توجيه شرطي بناءً على قيمة المنطقية. إذا كانت القيمة True، تُعيد الدالة "success"، وإذا كانت False، تُعيد "failed". تستمع third_method و fourth_method لمخرجات second_method وتُنفَّذ بناءً على القيمة المُعادة. عند تشغيل هذا التدفق، ستتغير المخرجات بناءً على القيمة المنطقية العشوائية المولّدة بواسطة start_method.

الإنسان في الحلقة (التغذية الراجعة البشرية)

يتطلب مزخرف @human_feedback CrewAI الإصدار 1.8.0 أو أعلى.
يتيح مزخرف @human_feedback سير عمل يتضمن تدخلًا بشريًا من خلال إيقاف تنفيذ التدفق مؤقتًا لجمع تغذية راجعة من إنسان. هذا مفيد لبوابات الموافقة ومراجعة الجودة ونقاط القرار التي تتطلب حكمًا بشريًا.
Code
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult

class ReviewFlow(Flow):
    @start()
    @human_feedback(
        message="Do you approve this content?",
        emit=["approved", "rejected", "needs_revision"],
        llm="gpt-4o-mini",
        default_outcome="needs_revision",
    )
    def generate_content(self):
        return "Content to be reviewed..."

    @listen("approved")
    def on_approval(self, result: HumanFeedbackResult):
        print(f"Approved! Feedback: {result.feedback}")

    @listen("rejected")
    def on_rejection(self, result: HumanFeedbackResult):
        print(f"Rejected. Reason: {result.feedback}")
عند تحديد emit، يتم تفسير التغذية الراجعة الحرة للإنسان بواسطة LLM وتُختصر إلى إحدى النتائج المحددة، والتي تُشغل بعد ذلك مزخرف @listen المقابل. يمكنك أيضًا استخدام @human_feedback دون توجيه لجمع التغذية الراجعة ببساطة:
Code
@start()
@human_feedback(message="Any comments on this output?")
def my_method(self):
    return "Output for review"

@listen(my_method)
def next_step(self, result: HumanFeedbackResult):
    # Access feedback via result.feedback
    # Access original output via result.output
    pass
يمكنك الوصول إلى جميع التغذيات الراجعة المُجمّعة أثناء التدفق عبر self.last_human_feedback (الأحدث) أو self.human_feedback_history (جميع التغذيات الراجعة كقائمة). للحصول على دليل كامل حول التغذية الراجعة البشرية في التدفقات، بما في ذلك التغذية الراجعة غير المتزامنة/غير الحاجبة مع مزودين مخصصين (Slack، webhooks، إلخ)، انظر التغذية الراجعة البشرية في التدفقات.

إضافة Agents إلى التدفقات

يمكن دمج Agents بسلاسة في تدفقاتك، مما يوفر بديلًا خفيف الوزن لفرق Crew الكاملة عندما تحتاج إلى تنفيذ مهام أبسط وأكثر تركيزًا. إليك مثال على كيفية استخدام Agent ضمن تدفق لإجراء أبحاث السوق:
import asyncio
from typing import Any, Dict, List

from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field

from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start


# Define a structured output format
class MarketAnalysis(BaseModel):
    key_trends: List[str] = Field(description="List of identified market trends")
    market_size: str = Field(description="Estimated market size")
    competitors: List[str] = Field(description="Major competitors in the space")


# Define flow state
class MarketResearchState(BaseModel):
    product: str = ""
    analysis: MarketAnalysis | None = None


# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
    @start()
    def initialize_research(self) -> Dict[str, Any]:
        print(f"Starting market research for {self.state.product}")
        return {"product": self.state.product}

    @listen(initialize_research)
    async def analyze_market(self) -> Dict[str, Any]:
        # Create an Agent for market research
        analyst = Agent(
            role="Market Research Analyst",
            goal=f"Analyze the market for {self.state.product}",
            backstory="You are an experienced market analyst with expertise in "
            "identifying market trends and opportunities.",
            tools=[SerperDevTool()],
            verbose=True,
        )

        # Define the research query
        query = f"""
        Research the market for {self.state.product}. Include:
        1. Key market trends
        2. Market size
        3. Major competitors

        Format your response according to the specified structure.
        """

        # Execute the analysis with structured output format
        result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
        if result.pydantic:
            print("result", result.pydantic)
        else:
            print("result", result)

        # Return the analysis to update the state
        return {"analysis": result.pydantic}

    @listen(analyze_market)
    def present_results(self, analysis) -> None:
        print("\nMarket Analysis Results")
        print("=====================")

        if isinstance(analysis, dict):
            # If we got a dict with 'analysis' key, extract the actual analysis object
            market_analysis = analysis.get("analysis")
        else:
            market_analysis = analysis

        if market_analysis and isinstance(market_analysis, MarketAnalysis):
            print("\nKey Market Trends:")
            for trend in market_analysis.key_trends:
                print(f"- {trend}")

            print(f"\nMarket Size: {market_analysis.market_size}")

            print("\nMajor Competitors:")
            for competitor in market_analysis.competitors:
                print(f"- {competitor}")
        else:
            print("No structured analysis data available.")
            print("Raw analysis:", analysis)


# Usage example
async def run_flow():
    flow = MarketResearchFlow()
    flow.plot("MarketResearchFlowPlot")
    result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
    return result


# Run the flow
if __name__ == "__main__":
    asyncio.run(run_flow())
Flow Visual image يوضح هذا المثال عدة ميزات رئيسية لاستخدام Agents في التدفقات:
  1. المخرجات المهيكلة: استخدام نماذج Pydantic لتحديد تنسيق المخرجات المتوقع (MarketAnalysis) يضمن سلامة الأنواع والبيانات المهيكلة في جميع أنحاء التدفق.
  2. إدارة الحالة: تحافظ حالة التدفق (MarketResearchState) على السياق بين الخطوات وتخزّن كلًا من المدخلات والمخرجات.
  3. تكامل الأدوات: يمكن لـ Agents استخدام أدوات (مثل WebsiteSearchTool) لتعزيز قدراتهم.

إضافة فرق Crew إلى التدفقات

إنشاء تدفق مع فرق Crew متعددة في CrewAI أمر مباشر. يمكنك إنشاء مشروع CrewAI جديد يتضمن جميع الهيكلية اللازمة لإنشاء تدفق مع فرق Crew متعددة عن طريق تشغيل الأمر التالي:
crewai create flow name_of_flow
سيولّد هذا الأمر مشروع CrewAI جديد مع هيكل المجلدات اللازم. يتضمن المشروع المولّد فريق Crew مُعد مسبقًا يُسمى poem_crew ويعمل بالفعل. يمكنك استخدام هذا الفريق كقالب بنسخه ولصقه وتعديله لإنشاء فرق أخرى.

هيكل المجلدات

بعد تشغيل أمر crewai create flow name_of_flow، سترى هيكل مجلدات مشابه للتالي:
المجلد/الملفالوصف
name_of_flow/المجلد الجذر للتدفق.
├── crews/يحتوي على مجلدات لفرق Crew المحددة.
│ └── poem_crew/مجلد لـ “poem_crew” مع إعداداته وسكربتاته.
│ ├── config/مجلد ملفات الإعداد لـ “poem_crew”.
│ │ ├── agents.yamlملف YAML يحدد الـ Agents لـ “poem_crew”.
│ │ └── tasks.yamlملف YAML يحدد المهام لـ “poem_crew”.
│ ├── poem_crew.pyسكربت وظائف “poem_crew”.
├── tools/مجلد للأدوات الإضافية المُستخدمة في التدفق.
│ └── custom_tool.pyتنفيذ أداة مخصصة.
├── main.pyالسكربت الرئيسي لتشغيل التدفق.
├── README.mdوصف المشروع والتعليمات.
├── pyproject.tomlملف إعداد تبعيات المشروع والإعدادات.
└── .gitignoreيحدد الملفات والمجلدات المراد تجاهلها في التحكم بالإصدارات.

بناء فرق Crew الخاصة بك

في مجلد crews، يمكنك تحديد فرق Crew متعددة. سيكون لكل فريق مجلده الخاص الذي يحتوي على ملفات الإعداد وملف تعريف الفريق. على سبيل المثال، يحتوي مجلد poem_crew على:
  • config/agents.yaml: يحدد الـ Agents للفريق.
  • config/tasks.yaml: يحدد المهام للفريق.
  • poem_crew.py: يحتوي على تعريف الفريق، بما في ذلك الـ Agents والمهام والفريق نفسه.
يمكنك نسخ ولصق وتعديل poem_crew لإنشاء فرق أخرى.

ربط فرق Crew في main.py

ملف main.py هو حيث تنشئ التدفق وتربط فرق Crew معًا. يمكنك تحديد التدفق باستخدام فئة Flow والمزخرفات @start و @listen لتحديد تدفق التنفيذ. إليك مثال على كيفية ربط poem_crew في ملف main.py:
Code
#!/usr/bin/env python
from random import randint

from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew


class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""

class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})

        print("Poem generated", result.raw)
        self.state.poem = result.raw

    @listen(generate_poem)
    def save_poem(self):
        print("Saving poem")
        with open("poem.txt", "w") as f:
            f.write(self.state.poem)

def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot("PoemFlowPlot")

if __name__ == "__main__":
    kickoff()
    plot()
في هذا المثال، تحدد فئة PoemFlow تدفقًا يولّد عدد الجمل، ويستخدم PoemCrew لتوليد قصيدة، ثم يحفظ القصيدة في ملف. يتم بدء التدفق باستدعاء دالة kickoff(). سيتم توليد PoemFlowPlot بواسطة دالة plot(). Flow Visual image

تشغيل التدفق

(اختياري) قبل تشغيل التدفق، يمكنك تثبيت التبعيات بتشغيل:
crewai install
بمجرد تثبيت جميع التبعيات، تحتاج إلى تفعيل البيئة الافتراضية بتشغيل:
source .venv/bin/activate
بعد تفعيل البيئة الافتراضية، يمكنك تشغيل التدفق بتنفيذ أحد الأوامر التالية:
crewai flow kickoff
أو
uv run kickoff
سيُنفَّذ التدفق، ويجب أن ترى المخرجات في وحدة التحكم.

رسم التدفقات

يمكن أن يوفر تصوير سير عمل الذكاء الاصطناعي رؤى قيمة حول هيكل ومسارات تنفيذ تدفقاتك. تقدم CrewAI أداة تصوير قوية تتيح لك إنشاء رسوم بيانية تفاعلية لتدفقاتك، مما يسهّل فهم وتحسين سير عمل الذكاء الاصطناعي.

ما هي الرسوم البيانية؟

الرسوم البيانية في CrewAI هي تمثيلات بصرية لسير عمل الذكاء الاصطناعي. تعرض المهام المختلفة واتصالاتها وتدفق البيانات بينها. يساعد هذا التصوير في فهم تسلسل العمليات وتحديد الاختناقات وضمان توافق منطق سير العمل مع توقعاتك.

كيفية إنشاء رسم بياني

توفر CrewAI طريقتين مريحتين لإنشاء رسوم بيانية لتدفقاتك:

الخيار 1: استخدام دالة plot()

إذا كنت تعمل مباشرة مع مثيل تدفق، يمكنك إنشاء رسم بياني باستدعاء دالة plot() على كائن التدفق. ستُنشئ هذه الدالة ملف HTML يحتوي على الرسم البياني التفاعلي لتدفقك.
Code
# Assuming you have a flow instance
flow.plot("my_flow_plot")
سيُنشئ هذا ملفًا باسم my_flow_plot.html في مجلدك الحالي. يمكنك فتح هذا الملف في متصفح ويب لعرض الرسم البياني التفاعلي.

الخيار 2: استخدام سطر الأوامر

إذا كنت تعمل ضمن مشروع CrewAI منظم، يمكنك إنشاء رسم بياني باستخدام سطر الأوامر. هذا مفيد بشكل خاص للمشاريع الأكبر حيث تريد تصوير إعداد التدفق بالكامل.
crewai flow plot
سيُنشئ هذا الأمر ملف HTML مع الرسم البياني لتدفقك، مشابهًا لدالة plot(). سيتم حفظ الملف في مجلد مشروعك، ويمكنك فتحه في متصفح ويب لاستكشاف التدفق.

فهم الرسم البياني

سيعرض الرسم البياني المولّد عُقدًا تمثل المهام في تدفقك، مع حواف موجّهة تشير إلى تدفق التنفيذ. الرسم البياني تفاعلي، مما يتيح لك التكبير والتصغير والتمرير فوق العقد لرؤية تفاصيل إضافية. من خلال تصوير تدفقاتك، يمكنك الحصول على فهم أوضح لهيكل سير العمل، مما يسهّل تصحيح الأخطاء وتحسين عمليات الذكاء الاصطناعي والتواصل بشأنها مع الآخرين.

الخلاصة

رسم تدفقاتك هو ميزة قوية في CrewAI تعزز قدرتك على تصميم وإدارة سير عمل الذكاء الاصطناعي المعقدة. سواء اخترت استخدام دالة plot() أو سطر الأوامر، فإن إنشاء الرسوم البيانية سيوفر لك تمثيلًا بصريًا لسير عملك، مما يساعد في التطوير والعرض.

الخطوات التالية

إذا كنت مهتمًا باستكشاف أمثلة إضافية للتدفقات، لدينا مجموعة متنوعة من التوصيات في مستودع الأمثلة. إليك أربعة أمثلة تدفق محددة، كل منها يعرض حالات استخدام فريدة لمساعدتك في مطابقة نوع مشكلتك الحالية مع مثال محدد:
  1. تدفق الرد التلقائي على البريد الإلكتروني: يوضح هذا المثال حلقة لا نهائية حيث تعمل مهمة خلفية باستمرار لأتمتة ردود البريد الإلكتروني. إنها حالة استخدام رائعة للمهام التي تحتاج إلى التنفيذ بشكل متكرر دون تدخل يدوي. عرض المثال
  2. تدفق تقييم العملاء المحتملين: يعرض هذا التدفق إضافة تغذية راجعة بشرية والتعامل مع فروع شرطية مختلفة باستخدام الموجّه. إنه مثال ممتاز لكيفية دمج اتخاذ القرارات الديناميكية والرقابة البشرية في سير عملك. عرض المثال
  3. تدفق كتابة كتاب: يتفوق هذا المثال في ربط فرق Crew متعددة معًا، حيث تُستخدم مخرجات فريق واحد بواسطة فريق آخر. على وجه التحديد، يقوم فريق واحد بوضع مخطط لكتاب كامل، ويقوم فريق آخر بإنشاء فصول بناءً على المخطط. في النهاية، يتم ربط كل شيء لإنتاج كتاب كامل. هذا التدفق مثالي للعمليات المعقدة متعددة الخطوات التي تتطلب تنسيقًا بين مهام مختلفة. عرض المثال
  4. تدفق مساعد الاجتماعات: يوضح هذا التدفق كيفية بث حدث واحد لتشغيل إجراءات متابعة متعددة. على سبيل المثال، بعد اكتمال اجتماع، يمكن للتدفق تحديث لوحة Trello وإرسال رسالة Slack وحفظ النتائج. إنه مثال رائع للتعامل مع نتائج متعددة من حدث واحد، مما يجعله مثاليًا لإدارة المهام الشاملة وأنظمة الإشعارات. عرض المثال
من خلال استكشاف هذه الأمثلة، يمكنك الحصول على رؤى حول كيفية الاستفادة من تدفقات CrewAI لحالات استخدام متنوعة، من أتمتة المهام المتكررة إلى إدارة العمليات المعقدة متعددة الخطوات مع اتخاذ القرارات الديناميكية والتغذية الراجعة البشرية. أيضًا، شاهد فيديو YouTube الخاص بنا حول كيفية استخدام التدفقات في CrewAI أدناه!

تشغيل التدفقات

هناك طريقتان لتشغيل التدفق:

استخدام واجهة Flow API

يمكنك تشغيل تدفق برمجيًا عن طريق إنشاء مثيل من فئة التدفق واستدعاء دالة kickoff():
flow = ExampleFlow()
result = flow.kickoff()

بث تنفيذ التدفق

للحصول على رؤية فورية لتنفيذ التدفق، يمكنك تفعيل البث لتلقي المخرجات فور توليدها:
class StreamingFlow(Flow):
    stream = True  # Enable streaming

    @start()
    def research(self):
        # Your flow implementation
        pass

# Iterate over streaming output
flow = StreamingFlow()
streaming = flow.kickoff()
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access final result
result = streaming.result
تعرّف على المزيد حول البث في دليل بث تنفيذ التدفق.

الذاكرة في التدفقات

يتمتع كل تدفق تلقائيًا بإمكانية الوصول إلى نظام الذاكرة الموحد في CrewAI. يمكنك تخزين الذكريات واسترجاعها واستخراجها مباشرة داخل أي دالة تدفق باستخدام ثلاث دوال مساعدة مدمجة.

الدوال المدمجة

الدالةالوصف
self.remember(content, **kwargs)تخزين المحتوى في الذاكرة. تقبل scope و categories و metadata و importance اختياريًا.
self.recall(query, **kwargs)استرجاع الذكريات ذات الصلة. تقبل scope و categories و limit و depth اختياريًا.
self.extract_memories(content)تفكيك النص الخام إلى عبارات ذاكرة منفصلة ومستقلة.
يتم إنشاء مثيل Memory() افتراضي تلقائيًا عند تهيئة التدفق. يمكنك أيضًا تمرير مثيل مخصص:
from crewai.flow.flow import Flow
from crewai import Memory

custom_memory = Memory(
    recency_weight=0.5,
    recency_half_life_days=7,
    embedder={"provider": "ollama", "config": {"model_name": "mxbai-embed-large"}},
)

flow = MyFlow(memory=custom_memory)

مثال: تدفق البحث والتحليل

from crewai.flow.flow import Flow, listen, start


class ResearchAnalysisFlow(Flow):
    @start()
    def gather_data(self):
        # Simulate research findings
        findings = (
            "PostgreSQL handles 10k concurrent connections with connection pooling. "
            "MySQL caps at around 5k. MongoDB scales horizontally but adds complexity."
        )

        # Extract atomic facts and remember each one
        memories = self.extract_memories(findings)
        for mem in memories:
            self.remember(mem, scope="/research/databases")

        return findings

    @listen(gather_data)
    def analyze(self, raw_findings):
        # Recall relevant past research (from this run or previous runs)
        past = self.recall("database performance and scaling", limit=10, depth="shallow")

        context_lines = [f"- {m.record.content}" for m in past]
        context = "\n".join(context_lines) if context_lines else "No prior context."

        return {
            "new_findings": raw_findings,
            "prior_context": context,
            "total_memories": len(past),
        }


flow = ResearchAnalysisFlow()
result = flow.kickoff()
print(result)
نظرًا لأن الذاكرة تستمر عبر عمليات التشغيل (مدعومة بـ LanceDB على القرص)، فإن خطوة analyze ستستدعي النتائج من عمليات التنفيذ السابقة أيضًا — مما يتيح تدفقات تتعلم وتراكم المعرفة بمرور الوقت. انظر وثائق الذاكرة لمزيد من التفاصيل حول النطاقات والشرائح والتسجيل المركب وإعداد المُضمِّن والمزيد.

استخدام CLI

بدءًا من الإصدار 0.103.0، يمكنك تشغيل التدفقات باستخدام أمر crewai run:
crewai run
يكتشف هذا الأمر تلقائيًا ما إذا كان مشروعك تدفقًا (بناءً على إعداد type = "flow" في pyproject.toml الخاص بك) ويشغّله وفقًا لذلك. هذه هي الطريقة الموصى بها لتشغيل التدفقات من سطر الأوامر. للتوافق مع الإصدارات السابقة، يمكنك أيضًا استخدام:
crewai flow kickoff
ومع ذلك، فإن أمر crewai run هو الطريقة المفضلة الآن لأنه يعمل لكل من فرق Crew والتدفقات.