개요

CrewAI Flows는 AI 워크플로우의 생성 및 관리를 간소화하기 위해 설계된 강력한 기능입니다. Flows를 사용하면 개발자는 다양한 코딩 작업과 각 Crew를 효율적으로 결합하고 조정할 수 있어, 정교한 AI 자동화를 구축할 수 있는 견고한 프레임워크를 제공합니다. Flows는 구조화된 이벤트 기반 워크플로우를 생성할 수 있게 해줍니다. 이를 통해 여러 작업을 원활하게 연결하고, 상태를 관리하며, AI 애플리케이션에서 실행 흐름을 제어할 수 있습니다. Flows를 사용하면 CrewAI의 전체 역량을 활용하는 다단계 프로세스를 손쉽게 설계하고 구현할 수 있습니다.
  1. 간편한 워크플로우 생성: 여러 Crew와 작업을 손쉽게 연결하여 복잡한 AI 워크플로우를 만듭니다.
  2. 상태 관리: Flows를 통해 워크플로우 내의 다양한 작업 간에 상태를 쉽고 효율적으로 관리 및 공유할 수 있습니다.
  3. 이벤트 기반 아키텍처: 이벤트 기반 모델을 기반으로 하여, 역동적이고 반응성 높은 워크플로우를 구현할 수 있습니다.
  4. 유연한 제어 흐름: 워크플로우 내에서 조건문, 반복문, 분기 등을 구현할 수 있습니다.

시작하기

OpenAI를 사용하여 한 작업에서 무작위 도시를 생성하고, 그 도시를 사용해 다른 작업에서 재미있는 사실을 생성하는 간단한 Flow를 만들어보겠습니다.
Code

from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion


class ExampleFlow(Flow):
    model = "gpt-4o-mini"

    @start()
    def generate_city(self):
        print("Starting flow")
        # Each flow state automatically gets a unique ID
        print(f"Flow State ID: {self.state['id']}")

        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": "Return the name of a random city in the world.",
                },
            ],
        )

        random_city = response["choices"][0]["message"]["content"]
        # Store the city in our state
        self.state["city"] = random_city
        print(f"Random City: {random_city}")

        return random_city

    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": f"Tell me a fun fact about {random_city}",
                },
            ],
        )

        fun_fact = response["choices"][0]["message"]["content"]
        # Store the fun fact in our state
        self.state["fun_fact"] = fun_fact
        return fun_fact



flow = ExampleFlow()
flow.plot()
result = flow.kickoff()

print(f"Generated fun fact: {result}")
Flow Visual image 위 예제에서는 OpenAI를 사용하여 무작위 도시를 생성하고, 해당 도시에 대한 재미있는 사실을 생성하는 간단한 Flow를 만들었습니다. 이 Flow는 generate_citygenerate_fun_fact라는 두 가지 작업으로 구성되어 있습니다. generate_city 작업이 Flow의 시작점이며, generate_fun_fact 작업이 generate_city 작업의 출력을 감지합니다. 각 Flow 인스턴스는 상태(state)에 자동으로 고유 식별자(UUID)를 부여 받아, 흐름 실행을 추적하고 관리하는 데 도움이 됩니다. 상태에는 실행 중에 유지되는 추가 데이터(예: 생성된 도시와 재미있는 사실)도 저장할 수 있습니다. Flow를 실행하면 다음과 같은 과정을 따릅니다:
  1. 상태를 위한 고유 ID를 생성
  2. 무작위 도시를 생성하여 상태에 저장
  3. 해당 도시에 대한 재미있는 사실을 생성하여 상태에 저장
  4. 결과를 콘솔에 출력
상태의 고유 ID와 저장된 데이터는 흐름 실행을 추적하고, 작업 간의 컨텍스트를 유지하는 데 유용합니다. 참고: OpenAI API 요청 인증을 위해 OPENAI_API_KEY.env 파일에 설정해야 합니다. 이 키는 필수입니다.

@start()

@start() 데코레이터는 메서드를 Flow의 시작 지점으로 표시하는 데 사용됩니다. Flow가 시작되면 @start()로 데코레이트된 모든 메서드가 병렬로 실행됩니다. 하나의 Flow에서 여러 개의 start 메서드를 가질 수 있으며, Flow가 시작될 때 이들은 모두 실행됩니다.

@listen()

@listen() 데코레이터는 Flow 내에서 다른 태스크의 출력을 수신하는 리스너로 메서드를 표시하는 데 사용됩니다. @listen()으로 데코레이션된 메서드는 지정된 태스크가 출력을 내보낼 때 실행됩니다. 이 메서드는 자신이 리스닝하고 있는 태스크의 출력을 인자로 접근할 수 있습니다.

사용법

@listen() 데코레이터는 여러 가지 방법으로 사용할 수 있습니다:
  1. 메서드 이름으로 리스닝하기: 리스닝하고자 하는 메서드의 이름을 문자열로 전달할 수 있습니다. 해당 메서드가 완료되면, 리스너 메서드가 트리거됩니다.
    Code
    @listen("generate_city")
    def generate_fun_fact(self, random_city):
        # Implementation
    
  2. 메서드 자체로 리스닝하기: 메서드 자체를 전달할 수도 있습니다. 해당 메서드가 완료되면, 리스너 메서드가 트리거됩니다.
    Code
    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        # Implementation
    

Flow 출력

Flow의 출력을 접근하고 다루는 것은 AI 워크플로우를 더 큰 애플리케이션이나 시스템에 통합하는 데 필수적입니다. CrewAI Flow는 최종 출력물을 쉽게 가져오고, 중간 결과에 접근하며, Flow의 전체 상태를 관리할 수 있는 직관적인 메커니즘을 제공합니다.

최종 출력값 가져오기

Flow를 실행하면, 최종 출력값은 마지막으로 완료된 메서드에 의해 결정됩니다. kickoff() 메서드는 이 마지막 메서드의 결과를 반환합니다. 최종 출력값을 확인하는 방법은 다음과 같습니다:
from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        return "Output from first_method"

    @listen(first_method)
    def second_method(self, first_output):
        return f"Second method received: {first_output}"


flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()

print("---- Final Output ----")
print(final_output)
Flow Visual image 이 예제에서 second_method가 마지막으로 완료된 메서드이므로, 해당 메서드의 결과가 Flow의 최종 출력값이 됩니다.
kickoff() 메서드는 이 최종 출력값을 반환하며, 이 값은 콘솔에 출력됩니다.
plot() 메서드는 HTML 파일을 생성하며, 이를 통해 flow를 쉽게 이해할 수 있습니다.

상태에 접근하고 업데이트하기

최종 출력을 가져오는 것 외에도, Flow 내에서 상태(state)에 접근하고 업데이트할 수 있습니다. 상태는 Flow의 다양한 메소드 간 데이터를 저장하고 공유하는 데 사용할 수 있습니다. Flow가 실행된 후에는, 실행 중에 추가되거나 업데이트된 정보를 조회하기 위해 상태에 접근할 수 있습니다. 다음은 상태를 업데이트하고 접근하는 방법의 예시입니다:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
Flow Visual image 이 예시에서 상태는 first_methodsecond_method 모두에 의해 업데이트됩니다. Flow가 실행된 후, 이러한 메소드들에 의해 수행된 업데이트 내용을 확인하려면 최종 상태에 접근할 수 있습니다. 최종 메소드의 출력이 반환되고 상태에 접근할 수 있도록 함으로써, CrewAI Flow는 AI 워크플로우의 결과를 더 큰 애플리케이션이나 시스템에 쉽게 통합할 수 있게 하며, Flow 실행 과정 전반에 걸쳐 상태를 유지하고 접근하면서도 이를 용이하게 만듭니다.

플로우 상태 관리

상태를 효과적으로 관리하는 것은 신뢰할 수 있고 유지 보수가 용이한 AI 워크플로를 구축하는 데 매우 중요합니다. CrewAI 플로우는 비정형 및 정형 상태 관리를 위한 강력한 메커니즘을 제공하여, 개발자가 자신의 애플리케이션에 가장 적합한 접근 방식을 선택할 수 있도록 합니다.

비구조적 상태 관리

비구조적 상태 관리에서는 모든 상태가 Flow 클래스의 state 속성에 저장됩니다.
이 방식은 엄격한 스키마를 정의하지 않고도 개발자가 상태 속성을 즉석에서 추가하거나 수정할 수 있는 유연성을 제공합니다.
비구조적 상태에서도 CrewAI Flows는 각 상태 인스턴스에 대한 고유 식별자(UUID)를 자동으로 생성하고 유지합니다.
Code
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state['counter'] = 0
        self.state['message'] = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image 참고: id 필드는 흐름의 실행 전체에 걸쳐 자동으로 생성되어 보존됩니다. 이를 직접 관리하거나 설정할 필요가 없으며, 새로운 데이터로 상태를 업데이트할 때도 자동으로 유지됩니다. 핵심 포인트:
  • 유연성: self.state에 미리 정해진 제약 없이 속성을 동적으로 추가할 수 있습니다.
  • 단순성: 상태 구조가 최소이거나 크게 달라지는 단순한 워크플로우에 이상적입니다.

구조화된 상태 관리

구조화된 상태 관리는 미리 정의된 스키마를 활용하여 워크플로 전반에 걸쳐 일관성과 타입 안전성을 보장합니다. Pydantic의 BaseModel과 같은 모델을 사용하면 상태의 정확한 형태를 정의할 수 있어, 개발 환경에서 더 나은 검증 및 자동 완성이 가능합니다. CrewAI Flows의 각 상태는 인스턴스 추적 및 관리를 돕기 위해 자동으로 고유 식별자(UUID)를 할당받습니다. 이 ID는 Flow 시스템에 의해 자동으로 생성되고 관리됩니다.
Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    # Note: 'id' field is automatically added to all states
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        # Access the auto-generated ID if needed
        print(f"State ID: {self.state.id}")
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
Flow Visual image 핵심 포인트:
  • 정의된 스키마: ExampleState는 상태 구조를 명확히 정의하여 코드 가독성과 유지보수성을 향상시킵니다.
  • 타입 안전성: Pydantic을 활용하면 상태의 속성이 지정된 타입을 준수하도록 보장하여 런타임 오류를 줄일 수 있습니다.
  • 자동 완성: IDE에서 정의된 상태 모델을 기반으로 더 나은 자동 완성과 오류 확인이 가능합니다.

비구조적 상태 관리와 구조적 상태 관리 선택하기

  • 비구조적 상태 관리를 사용할 때:
    • 워크플로의 상태가 단순하거나 매우 동적일 때.
    • 엄격한 상태 정의보다 유연성이 우선시될 때.
    • 스키마 정의의 오버헤드 없이 빠른 프로토타이핑이 필요할 때.
  • 구조적 상태 관리를 사용할 때:
    • 워크플로에 잘 정의되고 일관된 상태 구조가 필요할 때.
    • 애플리케이션의 신뢰성을 위해 타입 안전성과 검증이 중요할 때.
    • 더 나은 개발자 경험을 위해 IDE의 자동 완성 및 타입 체크 기능을 활용하고자 할 때.
CrewAI Flows는 비구조적 및 구조적 상태 관리 옵션을 모두 제공함으로써, 개발자들이 다양한 애플리케이션 요구 사항에 맞춰 유연하면서도 견고한 AI 워크플로를 구축할 수 있도록 지원합니다.

플로우 지속성

@persist 데코레이터는 CrewAI 플로우에서 자동 상태 지속성을 활성화하여, 플로우 상태를 재시작이나 다른 워크플로우 실행 간에도 유지할 수 있도록 합니다. 이 데코레이터는 클래스 수준이나 메서드 수준 모두에 적용할 수 있어, 상태 지속성을 관리하는 데 유연성을 제공합니다.

클래스 레벨 영속성

클래스 레벨에서 @persist 데코레이터를 적용하면 모든 flow 메서드 상태가 자동으로 영속됩니다:
@persist  # 기본적으로 SQLiteFlowPersistence 사용
class MyFlow(Flow[MyState]):
    @start()
    def initialize_flow(self):
        # 이 메서드는 상태가 자동으로 영속됩니다
        self.state.counter = 1
        print("Initialized flow. State ID:", self.state.id)

    @listen(initialize_flow)
    def next_step(self):
        # 상태(self.state.id 포함)는 자동으로 다시 로드됩니다
        self.state.counter += 1
        print("Flow state is persisted. Counter:", self.state.counter)

메서드 수준의 지속성

더 세밀한 제어를 위해, @persist를 특정 메서드에 적용할 수 있습니다:
class AnotherFlow(Flow[dict]):
    @persist  # Persists only this method's state
    @start()
    def begin(self):
        if "runs" not in self.state:
            self.state["runs"] = 0
        self.state["runs"] += 1
        print("Method-level persisted runs:", self.state["runs"])

작동 방식

  1. 고유 상태 식별
    • 각 flow 상태에는 자동으로 고유한 UUID가 할당됩니다.
    • 이 ID는 상태 업데이트 및 메소드 호출 시에도 유지됩니다.
    • 구조화된 상태(Pydantic BaseModel)와 비구조화된 상태(딕셔너리) 모두를 지원합니다.
  2. 기본 SQLite 백엔드
    • SQLiteFlowPersistence는 기본 저장 백엔드입니다.
    • 상태는 자동으로 로컬 SQLite 데이터베이스에 저장됩니다.
    • 데이터베이스 작업 실패 시 명확한 메시지를 제공하는 견고한 오류 처리가 제공됩니다.
  3. 오류 처리
    • 데이터베이스 작업에 대한 포괄적인 오류 메시지가 제공됩니다.
    • 저장 및 로드 중에 상태가 자동으로 검증됩니다.
    • 지속성 작업에 문제가 발생할 경우 명확한 피드백을 제공합니다.

중요한 고려사항

  • 상태 유형: 구조화된(Pydantic BaseModel) 상태와 비구조화된(딕셔너리) 상태 모두 지원됩니다
  • 자동 ID: id 필드는 존재하지 않을 경우 자동으로 추가됩니다
  • 상태 복구: 실패하거나 재시작된 flow는 이전 상태를 자동으로 불러올 수 있습니다
  • 커스텀 구현: 특수한 저장소 요구 사항을 위해 직접 FlowPersistence 구현을 제공할 수 있습니다

기술적 이점

  1. 저수준 접근을 통한 정밀한 제어
    • 고급 사용 사례를 위한 영속성 작업에 대한 직접 접근
    • 메서드 수준의 영속성 데코레이터를 통한 세밀한 제어
    • 내장된 상태 검사 및 디버깅 기능
    • 상태 변경 및 영속성 작업에 대한 완전한 가시성
  2. 향상된 신뢰성
    • 시스템 장애 또는 재시작 후 자동 상태 복구
    • 데이터 무결성을 위한 트랜잭션 기반 상태 업데이트
    • 명확한 오류 메시지를 제공하는 포괄적인 오류 처리
    • 상태 저장 및 로드 작업 시 강력한 검증
  3. 확장 가능한 아키텍처
    • FlowPersistence 인터페이스를 통한 사용자 정의 가능한 영속성 백엔드
    • SQLite를 넘어선 특수 저장 솔루션 지원
    • 구조화된(Pydantic) 상태와 비구조화(dict) 상태 모두와 호환
    • 기존 CrewAI 흐름 패턴과의 원활한 통합
영속성 시스템의 아키텍처는 기술적 정밀성과 맞춤화 옵션을 강조하여, 개발자가 내장된 신뢰성 기능의 이점을 누리면서 상태 관리에 대한 완전한 제어권을 유지할 수 있게 해줍니다.

흐름 제어

조건부 로직: or

Flows에서 or_ 함수는 여러 메서드를 감지하고 지정된 메서드 중 하나에서 출력이 발생하면 리스너 메서드를 트리거합니다.
from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):

    @start()
    def start_method(self):
        return "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        return "Hello from the second method"

    @listen(or_(start_method, second_method))
    def logger(self, result):
        print(f"Logger: {result}")



flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image 이 Flow를 실행하면, logger 메서드는 start_method 또는 second_method의 출력에 의해 트리거됩니다.
or_ 함수는 여러 메서드를 감지하고 지정된 메서드 중 하나에서 출력이 발생하면 리스너 메서드를 트리거하는 데 사용됩니다.

조건부 로직: and

Flows에서 and_ 함수는 여러 메서드를 리슨하고, 지정된 모든 메서드가 출력을 발생시킬 때만 리스너 메서드가 트리거되도록 합니다.
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.plot()
flow.kickoff()
Flow Visual image 이 Flow를 실행하면, logger 메서드는 start_methodsecond_method가 모두 출력을 발생시켰을 때만 트리거됩니다.
and_ 함수는 여러 메서드를 리슨하고, 지정된 모든 메서드가 출력을 발생시킬 때만 리스너 메서드를 트리거하는 데 사용됩니다.

Router

Flows의 @router() 데코레이터를 사용하면 메서드의 출력값에 따라 조건부 라우팅 로직을 정의할 수 있습니다.
메서드의 출력에 따라 서로 다른 경로를 지정할 수 있어 실행 흐름을 동적으로 제어할 수 있습니다.
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image 위 예제에서 start_method는 랜덤 불리언 값을 생성하여 state에 저장합니다.
second_method@router() 데코레이터를 사용해 불리언 값에 따라 조건부 라우팅 로직을 정의합니다.
불리언 값이 True이면 메서드는 "success"를 반환하고, False이면 "failed"를 반환합니다.
third_methodfourth_methodsecond_method의 출력값을 기다렸다가 반환된 값에 따라 실행됩니다.
이 Flow를 실행하면, start_method에서 생성된 랜덤 불리언 값에 따라 출력값이 달라집니다.

플로우에 에이전트 추가하기

에이전트는 플로우에 원활하게 통합할 수 있으며, 단순하고 집중된 작업 실행이 필요할 때 전체 Crew의 경량 대안으로 활용됩니다. 아래는 에이전트를 플로우 내에서 사용하여 시장 조사를 수행하는 예시입니다:
import asyncio
from typing import Any, Dict, List

from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field

from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start


# Define a structured output format
class MarketAnalysis(BaseModel):
    key_trends: List[str] = Field(description="List of identified market trends")
    market_size: str = Field(description="Estimated market size")
    competitors: List[str] = Field(description="Major competitors in the space")


# Define flow state
class MarketResearchState(BaseModel):
    product: str = ""
    analysis: MarketAnalysis | None = None


# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
    @start()
    def initialize_research(self) -> Dict[str, Any]:
        print(f"Starting market research for {self.state.product}")
        return {"product": self.state.product}

    @listen(initialize_research)
    async def analyze_market(self) -> Dict[str, Any]:
        # Create an Agent for market research
        analyst = Agent(
            role="Market Research Analyst",
            goal=f"Analyze the market for {self.state.product}",
            backstory="You are an experienced market analyst with expertise in "
            "identifying market trends and opportunities.",
            tools=[SerperDevTool()],
            verbose=True,
        )

        # Define the research query
        query = f"""
        Research the market for {self.state.product}. Include:
        1. Key market trends
        2. Market size
        3. Major competitors

        Format your response according to the specified structure.
        """

        # Execute the analysis with structured output format
        result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
        if result.pydantic:
            print("result", result.pydantic)
        else:
            print("result", result)

        # Return the analysis to update the state
        return {"analysis": result.pydantic}

    @listen(analyze_market)
    def present_results(self, analysis) -> None:
        print("\nMarket Analysis Results")
        print("=====================")

        if isinstance(analysis, dict):
            # If we got a dict with 'analysis' key, extract the actual analysis object
            market_analysis = analysis.get("analysis")
        else:
            market_analysis = analysis

        if market_analysis and isinstance(market_analysis, MarketAnalysis):
            print("\nKey Market Trends:")
            for trend in market_analysis.key_trends:
                print(f"- {trend}")

            print(f"\nMarket Size: {market_analysis.market_size}")

            print("\nMajor Competitors:")
            for competitor in market_analysis.competitors:
                print(f"- {competitor}")
        else:
            print("No structured analysis data available.")
            print("Raw analysis:", analysis)


# Usage example
async def run_flow():
    flow = MarketResearchFlow()
    flow.plot("MarketResearchFlowPlot")
    result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
    return result


# Run the flow
if __name__ == "__main__":
    asyncio.run(run_flow())
Flow Visual image 이 예시는 플로우에서 에이전트를 사용할 때의 몇 가지 주요 기능을 보여줍니다:
  1. 구조화된 출력: Pydantic 모델을 사용하여 예상 출력 형식(MarketAnalysis)을 정의함으로써 플로우 전체에서 타입 안정성과 구조화된 데이터를 보장합니다.
  2. 상태 관리: 플로우 상태(MarketResearchState)는 단계 간의 컨텍스트를 유지하고 입력과 출력을 모두 저장합니다.
  3. 도구 통합: 에이전트는 기능 강화를 위해 WebsiteSearchTool과 같은 도구를 사용할 수 있습니다.

Flows에 Crews 추가하기

CrewAI에서 여러 crews로 flow를 생성하는 것은 간단합니다. 다음 명령어를 실행하여 여러 crews가 포함된 flow를 생성하는 데 필요한 모든 스캐폴딩이 포함된 새 CrewAI 프로젝트를 생성할 수 있습니다.
crewai create flow name_of_flow
이 명령어는 필요한 폴더 구조를 갖춘 새 CrewAI 프로젝트를 생성합니다. 생성된 프로젝트에는 이미 동작 중인 미리 구축된 crew인 poem_crew가 포함되어 있습니다. 이 crew를 템플릿으로 사용하여 복사, 붙여넣기, 수정함으로써 다른 crew를 만들 수 있습니다.

폴더 구조

crewai create flow name_of_flow 명령을 실행하면 다음과 유사한 폴더 구조를 볼 수 있습니다:
디렉터리/파일설명
name_of_flow/flow의 루트 디렉터리입니다.
├── crews/특정 crew에 대한 디렉터리를 포함합니다.
│ └── poem_crew/”poem_crew”의 설정 및 스크립트가 포함된 디렉터리입니다.
│ ├── config/”poem_crew”의 설정 파일 디렉터리입니다.
│ │ ├── agents.yaml”poem_crew”의 agent를 정의하는 YAML 파일입니다.
│ │ └── tasks.yaml”poem_crew”의 task를 정의하는 YAML 파일입니다.
│ ├── poem_crew.py”poem_crew”의 기능을 위한 스크립트입니다.
├── tools/flow에서 사용되는 추가 도구를 위한 디렉터리입니다.
│ └── custom_tool.py사용자 정의 도구 구현 파일입니다.
├── main.pyflow를 실행하는 메인 스크립트입니다.
├── README.md프로젝트 설명 및 안내 문서입니다.
├── pyproject.toml프로젝트의 종속성 및 설정을 위한 구성 파일입니다.
└── .gitignore버전 관리에서 무시할 파일과 디렉터리를 지정합니다.

크루 빌드하기

crews 폴더에서는 여러 개의 크루를 정의할 수 있습니다. 각 크루는 자체 폴더를 가지며, 설정 파일과 크루 정의 파일을 포함합니다. 예를 들어, poem_crew 폴더에는 다음과 같은 파일이 있습니다:
  • config/agents.yaml: 크루의 agent를 정의합니다.
  • config/tasks.yaml: 크루의 task를 정의합니다.
  • poem_crew.py: agent, task, 그리고 크루 자체를 포함한 crew 정의가 들어 있습니다.
poem_crew를 복사, 붙여넣기, 그리고 편집하여 다른 크루를 생성할 수 있습니다.

main.py에서 Crew 연결하기

main.py 파일은 flow를 생성하고 crew들을 서로 연결하는 곳입니다. Flow 클래스를 사용하고, @start@listen 데코레이터를 사용하여 실행 흐름을 지정하여 flow를 정의할 수 있습니다. 다음은 main.py 파일에서 poem_crew를 연결하는 예제입니다:
Code
#!/usr/bin/env python
from random import randint

from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew

class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""

class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})

        print("Poem generated", result.raw)
        self.state.poem = result.raw

    @listen(generate_poem)
    def save_poem(self):
        print("Saving poem")
        with open("poem.txt", "w") as f:
            f.write(self.state.poem)

def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot("PoemFlowPlot")

if __name__ == "__main__":
    kickoff()
    plot()
이 예제에서 PoemFlow 클래스는 문장 수를 생성하고, PoemCrew를 사용하여 시를 생성한 후, 시를 파일에 저장하는 flow를 정의합니다. 이 flow는 kickoff() 메서드를 호출하여 시작됩니다. plot() 메서드로 PoemFlowPlot이 생성됩니다. Flow Visual image

플로우 실행하기

(선택 사항) 플로우를 실행하기 전에, 다음 명령어를 실행하여 의존성을 설치할 수 있습니다:
crewai install
모든 의존성이 설치되면, 다음 명령어를 실행하여 가상 환경을 활성화해야 합니다:
source .venv/bin/activate
가상 환경을 활성화한 후, 아래 명령어 중 하나를 실행하여 플로우를 실행할 수 있습니다:
crewai flow kickoff
또는
uv run kickoff
플로우가 실행되면, 콘솔에서 출력을 확인할 수 있습니다.

플롯 플로우

AI 워크플로우를 시각화하면 플로우의 구조와 실행 경로에 대한 중요한 인사이트를 얻을 수 있습니다. CrewAI는 플로우의 인터랙티브 플롯을 생성할 수 있는 강력한 시각화 도구를 제공하여 AI 워크플로우를 보다 쉽게 이해하고 최적화할 수 있도록 도와줍니다.

플롯(Plots)이란 무엇인가요?

CrewAI에서 플롯(Plots)은 AI 워크플로우의 그래픽 표현입니다. 플롯은 다양한 태스크와 그들의 연결, 그리고 태스크 간 데이터 흐름을 시각적으로 보여줍니다. 이러한 시각화는 작업 순서를 이해하고, 병목 현상을 식별하며, 워크플로우 논리가 기대에 부합하는지 확인하는 데 도움이 됩니다.

플롯 생성 방법

CrewAI는 플로우의 플롯을 생성하는 두 가지 편리한 방법을 제공합니다:

옵션 1: plot() 메서드 사용하기

flow 인스턴스와 직접 작업하는 경우, flow 객체에서 plot() 메서드를 호출하여 플롯을 생성할 수 있습니다. 이 메서드는 flow의 인터랙티브 플롯이 포함된 HTML 파일을 생성합니다.
Code
# Assuming you have a flow instance
flow.plot("my_flow_plot")
이렇게 하면 현재 디렉토리에 my_flow_plot.html이라는 파일이 생성됩니다. 이 파일을 웹 브라우저에서 열어 인터랙티브 플롯을 볼 수 있습니다.

옵션 2: 커맨드 라인 사용

구조화된 CrewAI 프로젝트 내에서 작업 중이라면 커맨드 라인을 사용하여 플롯을 생성할 수 있습니다. 이는 전체 플로우 설정을 시각화하고자 하는 대규모 프로젝트에서 특히 유용합니다.
crewai flow plot
이 명령은 플로우의 플롯이 포함된 HTML 파일을 생성하며, 이는 plot() 메서드와 유사합니다. 파일은 프로젝트 디렉터리에 저장되며, 웹 브라우저에서 열어 플로우를 탐색할 수 있습니다.

플롯 이해하기

생성된 플롯은 flow 내의 작업을 나타내는 노드와 실행 흐름을 나타내는 방향성이 있는 엣지를 표시합니다. 플롯은 인터랙티브하게 제공되어, 확대/축소를 하거나 노드 위에 마우스를 올려 추가 정보를 볼 수 있습니다. flow를 시각화하면 워크플로의 구조를 더욱 명확하게 이해할 수 있어 디버깅, 최적화, 그리고 AI 프로세스를 다른 사람들에게 설명하는 데 도움이 됩니다.

결론

플로우를 시각적으로 표현하는 것은 CrewAI의 강력한 기능으로, 복잡한 AI 워크플로우를 설계하고 관리하는 능력을 크게 향상시켜줍니다. plot() 메서드나 커맨드 라인 중 어떤 방법을 사용하더라도, 플롯을 생성하면 워크플로우의 시각적 표현을 얻을 수 있어 개발과 발표 모두에 도움이 됩니다.

다음 단계

추가적인 flow 예제를 살펴보고 싶으시다면, 저희 examples 레포지토리에서 다양한 추천 예제를 확인하실 수 있습니다. 아래는 각각 고유한 사용 사례를 보여주는 네 가지 구체적인 flow 예제로, 현재 문제 유형에 맞는 예시를 찾는 데 도움이 될 것입니다:
  1. 이메일 자동 응답자 Flow: 이 예제는 백그라운드 작업이 계속 실행되면서 이메일 응답을 자동화하는 무한 루프를 보여줍니다. 수동 개입 없이 반복적으로 수행해야 하는 작업에 적합한 사용 사례입니다. 예제 보기
  2. 리드 점수 Flow: 이 flow 예제는 human-in-the-loop 피드백을 추가하고 router를 사용하여 다양한 조건 분기를 처리하는 방법을 보여줍니다. 워크플로우에 동적 의사결정과 인간의 관리·감독을 통합하는 방식을 확인할 수 있는 훌륭한 예시입니다. 예제 보기
  3. 책 집필 Flow: 이 예제는 여러 crew를 연속적으로 연결하는 데 탁월하며, 한 crew의 출력 결과가 다른 crew에 의해 사용됩니다. 구체적으로, 한 crew가 전체 책의 개요를 작성하고, 다른 crew가 그 개요를 바탕으로 챕터를 생성합니다. 결국 모든 것이 연결되어 완성된 책이 만들어집니다. 여러 작업 간 조율이 필요한 복잡한 다단계 프로세스에 적합한 flow입니다. 예제 보기
  4. 미팅 어시스턴트 Flow: 이 flow는 하나의 이벤트가 여러 후속 작업을 트리거하도록 브로드캐스트하는 방법을 보여줍니다. 예를 들어, 미팅이 끝난 후 Trello 보드를 업데이트하고 Slack 메시지를 전송하며 결과를 저장할 수 있습니다. 하나의 이벤트로부터 여러 결과를 처리하는 좋은 예시로, 포괄적인 작업 관리 및 알림 시스템에 이상적입니다. 예제 보기
이 예제들을 통해 반복되는 작업 자동화부터 동적 의사결정과 인간 피드백이 포함된 복잡한 다단계 프로세스 관리에 이르기까지 다양한 사용 사례에서 CrewAI Flows를 어떻게 활용할 수 있는지에 대한 통찰력을 얻을 수 있습니다. 또한, 아래의 CrewAI에서 flows를 사용하는 방법에 대한 YouTube 영상을 확인해보세요!

플로우 실행하기

플로우를 실행하는 방법에는 두 가지가 있습니다:

Flow API 사용하기

플로우를 프로그래밍 방식으로 실행하려면, 플로우 클래스의 인스턴스를 생성하고 kickoff() 메서드를 호출하면 됩니다:
flow = ExampleFlow()
result = flow.kickoff()

CLI 사용하기

버전 0.103.0부터 crewai run 명령어를 사용하여 flow를 실행할 수 있습니다:
crewai run
이 명령어는 프로젝트가 pyproject.toml의 type = "flow" 설정을 기반으로 flow인지 자동으로 감지하여 해당 방식으로 실행합니다. 명령줄에서 flow를 실행하는 권장 방법입니다. 하위 호환성을 위해 다음 명령어도 사용할 수 있습니다:
crewai flow kickoff
하지만 crewai run 명령어가 이제 crew와 flow 모두에 작동하므로 더욱 선호되는 방법입니다.