플로우에서 State의 힘 이해하기
State 관리는 모든 고급 AI 워크플로우의 중추입니다. CrewAI Flows에서 state 시스템은 컨텍스트를 유지하고, 단계 간 데이터를 공유하며, 복잡한 애플리케이션 로직을 구축할 수 있도록 해줍니다. State 관리에 능숙해지는 것은 신뢰할 수 있고, 유지보수가 용이하며, 강력한 AI 애플리케이션을 만들기 위해 필수적입니다.
이 가이드는 CrewAI Flows에서 state를 관리하는 데 꼭 알아야 할 기본 개념부터 고급 기법까지, 실용적인 코드 예제와 함께 단계별로 안내합니다.
상태 관리가 중요한 이유
효과적인 상태 관리는 다음을 가능하게 합니다:
- 실행 단계 간의 컨텍스트 유지 - 워크플로의 다양한 단계 간에 정보를 원활하게 전달할 수 있습니다.
- 복잡한 조건부 논리 구성 - 누적된 데이터를 기반으로 의사 결정을 내릴 수 있습니다.
- 지속적인 애플리케이션 생성 - 워크플로 진행 상황을 저장하고 복원할 수 있습니다.
- 에러를 우아하게 처리 - 더 견고한 애플리케이션을 위한 복구 패턴을 구현할 수 있습니다.
- 애플리케이션 확장 - 적절한 데이터 조직을 통해 복잡한 워크플로를 지원할 수 있습니다.
- 대화형 애플리케이션 활성화 - 컨텍스트 기반 AI 상호작용을 위해 대화 내역을 저장하고 접근할 수 있습니다.
이러한 기능을 효과적으로 활용하는 방법을 살펴보겠습니다.
상태 관리 기본 사항
Flow 상태 라이프사이클
CrewAI Flow에서 상태는 예측 가능한 라이프사이클을 따릅니다:
- 초기화 - flow가 생성될 때, 상태는 초기화됩니다(빈 딕셔너리 또는 Pydantic 모델 인스턴스로)
- 수정 - flow 메서드는 실행되는 동안 상태에 접근하고 이를 수정합니다
- 전달 - 상태는 flow 메서드들 사이에 자동으로 전달됩니다
- 영속화 (선택 사항) - 상태는 스토리지에 저장될 수 있고 나중에 다시 불러올 수 있습니다
- 완료 - 최종 상태는 모든 실행된 메서드의 누적 변경 사항을 반영합니다
이 라이프사이클을 이해하는 것은 효과적인 flow를 설계하는 데 매우 중요합니다.
상태 관리의 두 가지 접근 방식
CrewAI에서는 흐름에서 상태를 관리하는 두 가지 방법을 제공합니다:
- 비구조적 상태 - 유연성을 위해 딕셔너리와 유사한 객체 사용
- 구조적 상태 - 타입 안전성과 검증을 위해 Pydantic 모델 사용
각 접근 방식을 자세히 살펴보겠습니다.
비구조적 상태 관리
비구조적 상태는 사전(dictionary)과 유사한 방식을 사용하여, 단순한 애플리케이션에 유연성과 단순성을 제공합니다.
작동 방식
비구조화된 상태의 경우:
self.state
를 통해 상태에 접근하며, 이는 딕셔너리처럼 동작합니다
- 언제든지 키를 자유롭게 추가, 수정, 삭제할 수 있습니다
- 모든 상태는 모든 flow 메서드에서 자동으로 사용할 수 있습니다
기본 예제
다음은 비구조적 상태 관리를 보여주는 간단한 예제입니다:
from crewai.flow.flow import Flow, listen, start
class UnstructuredStateFlow(Flow):
@start()
def initialize_data(self):
print("Initializing flow data")
# Add key-value pairs to state
self.state["user_name"] = "Alex"
self.state["preferences"] = {
"theme": "dark",
"language": "English"
}
self.state["items"] = []
# The flow state automatically gets a unique ID
print(f"Flow ID: {self.state['id']}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Previous step returned: {previous_result}")
# Access and modify state
user = self.state["user_name"]
print(f"Processing data for {user}")
# Add items to a list in state
self.state["items"].append("item1")
self.state["items"].append("item2")
# Add a new key-value pair
self.state["processed"] = True
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Access multiple state values
user = self.state["user_name"]
theme = self.state["preferences"]["theme"]
items = self.state["items"]
processed = self.state.get("processed", False)
summary = f"User {user} has {len(items)} items with {theme} theme. "
summary += "Data is processed." if processed else "Data is not processed."
return summary
# Run the flow
flow = UnstructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
비구조적 상태를 사용할 때
비구조적 상태는 다음과 같은 경우에 이상적입니다:
- 빠른 프로토타이핑 및 간단한 플로우
- 동적으로 변화하는 상태 요구
- 구조가 사전에 알려지지 않을 수 있는 경우
- 간단한 상태 요구가 있는 플로우
비구조적 상태는 유연하지만, 타입 검사 및 스키마 검증이 없기 때문에 복잡한 애플리케이션에서 오류가 발생할 수 있습니다.
구조화된 상태 관리
구조화된 상태는 Pydantic 모델을 사용하여 flow의 상태에 대한 스키마를 정의함으로써 타입 안전성, 검증, 그리고 더 나은 개발자 경험을 제공합니다.
작동 방식
구조화된 상태에서는:
- 상태 구조를 나타내는 Pydantic 모델을 정의합니다.
- 이 모델 타입을 유형 매개변수로 Flow 클래스에 전달합니다.
self.state
를 통해 상태에 접근할 수 있으며, 이는 Pydantic 모델 인스턴스처럼 동작합니다.
- 모든 필드는 정의된 타입에 따라 검증됩니다.
- IDE 자동 완성 및 타입 체크 지원을 받을 수 있습니다.
기본 예제
구조화된 상태 관리를 구현하는 방법은 다음과 같습니다:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
# Define your state model
class UserPreferences(BaseModel):
theme: str = "light"
language: str = "English"
class AppState(BaseModel):
user_name: str = ""
preferences: UserPreferences = UserPreferences()
items: List[str] = []
processed: bool = False
completion_percentage: float = 0.0
# Create a flow with typed state
class StructuredStateFlow(Flow[AppState]):
@start()
def initialize_data(self):
print("Initializing flow data")
# Set state values (type-checked)
self.state.user_name = "Taylor"
self.state.preferences.theme = "dark"
# The ID field is automatically available
print(f"Flow ID: {self.state.id}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Processing data for {self.state.user_name}")
# Modify state (with type checking)
self.state.items.append("item1")
self.state.items.append("item2")
self.state.processed = True
self.state.completion_percentage = 50.0
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Access state (with autocompletion)
summary = f"User {self.state.user_name} has {len(self.state.items)} items "
summary += f"with {self.state.preferences.theme} theme. "
summary += "Data is processed." if self.state.processed else "Data is not processed."
summary += f" Completion: {self.state.completion_percentage}%"
return summary
# Run the flow
flow = StructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
구조화된 상태의 이점
구조화된 상태를 사용하면 여러 가지 장점이 있습니다:
- 타입 안정성 - 개발 단계에서 타입 오류를 잡을 수 있습니다
- 자체 문서화 - 상태 모델이 어떤 데이터가 사용 가능한지 명확히 문서화합니다
- 검증 - 데이터 타입과 제약 조건을 자동으로 검증합니다
- IDE 지원 - 자동 완성과 인라인 문서화를 받을 수 있습니다
- 기본값 - 누락된 데이터에 대한 대체값을 쉽게 정의할 수 있습니다
구조화된 상태를 사용할 때
구조화된 상태는 다음과 같은 경우에 권장됩니다:
- 명확하게 정의된 데이터 스키마를 가진 복잡한 플로우
- 여러 개발자가 동일한 코드를 작업하는 팀 프로젝트
- 데이터 검증이 중요한 애플리케이션
- 특정 데이터 타입 및 제약 조건을 강제로 적용해야 하는 플로우
자동 상태 ID
비구조화 상태와 구조화 상태 모두 상태 인스턴스를 추적하고 관리하는 데 도움이 되는 고유한 식별자(UUID)를 자동으로 부여받습니다.
작동 방식
- 비구조화 state의 경우, ID는
self.state["id"]
로 접근할 수 있습니다.
- 구조화 state의 경우, ID는
self.state.id
로 접근할 수 있습니다.
- 이 ID는 flow가 생성될 때 자동으로 생성됩니다.
- ID는 flow의 생명주기 동안 동일하게 유지됩니다.
- ID는 추적, 로깅, 저장된 state의 조회에 사용할 수 있습니다.
이 UUID는 persistence를 구현하거나 여러 flow 실행을 추적할 때 특히 유용합니다.
동적 상태 업데이트
구조화된 상태를 사용하든 비구조화된 상태를 사용하든, flow의 실행 중 언제든지 상태를 동적으로 업데이트할 수 있습니다.
단계 간 데이터 전달
Flow 메서드는 값을 반환할 수 있으며, 이러한 반환값은 리스닝 메서드의 인자로 전달됩니다:
from crewai.flow.flow import Flow, listen, start
class DataPassingFlow(Flow):
@start()
def generate_data(self):
# This return value will be passed to listening methods
return "Generated data"
@listen(generate_data)
def process_data(self, data_from_previous_step):
print(f"Received: {data_from_previous_step}")
# You can modify the data and pass it along
processed_data = f"{data_from_previous_step} - processed"
# Also update state
self.state["last_processed"] = processed_data
return processed_data
@listen(process_data)
def finalize_data(self, processed_data):
print(f"Received processed data: {processed_data}")
# Access both the passed data and state
last_processed = self.state.get("last_processed", "")
return f"Final: {processed_data} (from state: {last_processed})"
이 패턴을 사용하면 직접적인 데이터 전달과 state 업데이트를 결합하여 최대한 유연하게 작업할 수 있습니다.
플로우 상태 지속
CrewAI의 가장 강력한 기능 중 하나는 실행 간에 플로우 상태를 지속할 수 있다는 점입니다. 이를 통해 중단, 재개, 심지어 실패 후에도 복구할 수 있는 워크플로우를 구현할 수 있습니다.
@persist() 데코레이터
@persist()
데코레이터는 상태 지속을 자동화하여 flow의 상태를 실행의 주요 지점마다 저장합니다.
클래스 수준 지속성
클래스 수준에서 @persist()
를 적용하면 모든 메서드 실행 후 상태가 저장됩니다:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
value: int = 0
@persist() # Apply to the entire flow class
class PersistentCounterFlow(Flow[CounterState]):
@start()
def increment(self):
self.state.value += 1
print(f"Incremented to {self.state.value}")
return self.state.value
@listen(increment)
def double(self, value):
self.state.value = value * 2
print(f"Doubled to {self.state.value}")
return self.state.value
# First run
flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Second run - state is automatically loaded
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff()
print(f"Second run result: {result2}") # Will be higher due to persisted state
메서드 수준 지속성
더 세밀한 제어를 위해 @persist()
를 특정 메서드에 적용할 수 있습니다:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
class SelectivePersistFlow(Flow):
@start()
def first_step(self):
self.state["count"] = 1
return "First step"
@persist() # Only persist after this method
@listen(first_step)
def important_step(self, prev_result):
self.state["count"] += 1
self.state["important_data"] = "This will be persisted"
return "Important step completed"
@listen(important_step)
def final_step(self, prev_result):
self.state["count"] += 1
return f"Complete with count {self.state['count']}"
고급 상태 패턴
상태 기반 조건부 로직
state를 사용하여 flow에서 복잡한 조건부 로직을 구현할 수 있습니다:
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class PaymentState(BaseModel):
amount: float = 0.0
is_approved: bool = False
retry_count: int = 0
class PaymentFlow(Flow[PaymentState]):
@start()
def process_payment(self):
# Simulate payment processing
self.state.amount = 100.0
self.state.is_approved = self.state.amount < 1000
return "Payment processed"
@router(process_payment)
def check_approval(self, previous_result):
if self.state.is_approved:
return "approved"
elif self.state.retry_count < 3:
return "retry"
else:
return "rejected"
@listen("approved")
def handle_approval(self):
return f"Payment of ${self.state.amount} approved!"
@listen("retry")
def handle_retry(self):
self.state.retry_count += 1
print(f"Retrying payment (attempt {self.state.retry_count})...")
# Could implement retry logic here
return "Retry initiated"
@listen("rejected")
def handle_rejection(self):
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
복잡한 상태 변환 처리
복잡한 상태 변환의 경우, 전용 메서드를 만들어 처리할 수 있습니다.
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import List, Dict
class UserData(BaseModel):
name: str
active: bool = True
login_count: int = 0
class ComplexState(BaseModel):
users: Dict[str, UserData] = {}
active_user_count: int = 0
class TransformationFlow(Flow[ComplexState]):
@start()
def initialize(self):
# Add some users
self.add_user("alice", "Alice")
self.add_user("bob", "Bob")
self.add_user("charlie", "Charlie")
return "Initialized"
@listen(initialize)
def process_users(self, _):
# Increment login counts
for user_id in self.state.users:
self.increment_login(user_id)
# Deactivate one user
self.deactivate_user("bob")
# Update active count
self.update_active_count()
return f"Processed {len(self.state.users)} users"
# Helper methods for state transformations
def add_user(self, user_id: str, name: str):
self.state.users[user_id] = UserData(name=name)
self.update_active_count()
def increment_login(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].login_count += 1
def deactivate_user(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].active = False
self.update_active_count()
def update_active_count(self):
self.state.active_user_count = sum(
1 for user in self.state.users.values() if user.active
)
이와 같은 헬퍼 메서드 생성 패턴은 flow 메서드를 깔끔하게 유지하면서 복잡한 상태 조작을 가능하게 해줍니다.
Crews로 상태 관리하기
CrewAI에서 가장 강력한 패턴 중 하나는 flow 상태 관리와 crew 실행을 결합하는 것입니다.
크루에 상태 전달하기
플로우 상태를 사용하여 크루에 매개변수를 전달할 수 있습니다:
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
class ResearchState(BaseModel):
topic: str = ""
depth: str = "medium"
results: str = ""
class ResearchFlow(Flow[ResearchState]):
@start()
def get_parameters(self):
# In a real app, this might come from user input
self.state.topic = "Artificial Intelligence Ethics"
self.state.depth = "deep"
return "Parameters set"
@listen(get_parameters)
def execute_research(self, _):
# Create agents
researcher = Agent(
role="Research Specialist",
goal=f"Research {self.state.topic} in {self.state.depth} detail",
backstory="You are an expert researcher with a talent for finding accurate information."
)
writer = Agent(
role="Content Writer",
goal="Transform research into clear, engaging content",
backstory="You excel at communicating complex ideas clearly and concisely."
)
# Create tasks
research_task = Task(
description=f"Research {self.state.topic} with {self.state.depth} analysis",
expected_output="Comprehensive research notes in markdown format",
agent=researcher
)
writing_task = Task(
description=f"Create a summary on {self.state.topic} based on the research",
expected_output="Well-written article in markdown format",
agent=writer,
context=[research_task]
)
# Create and run crew
research_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
# Run crew and store result in state
result = research_crew.kickoff()
self.state.results = result.raw
return "Research completed"
@listen(execute_research)
def summarize_results(self, _):
# Access the stored results
result_length = len(self.state.results)
return f"Research on {self.state.topic} completed with {result_length} characters of results."
State에서 Crew 출력 처리하기
Crew가 완료되면, 해당 출력을 처리하여 flow state에 저장할 수 있습니다:
@listen(execute_crew)
def process_crew_results(self, _):
# Parse the raw results (assuming JSON output)
import json
try:
results_dict = json.loads(self.state.raw_results)
self.state.processed_results = {
"title": results_dict.get("title", ""),
"main_points": results_dict.get("main_points", []),
"conclusion": results_dict.get("conclusion", "")
}
return "Results processed successfully"
except json.JSONDecodeError:
self.state.error = "Failed to parse crew results as JSON"
return "Error processing results"
상태 관리 모범 사례
1. 상태를 집중적으로 유지하세요
상태를 설계할 때 꼭 필요한 내용만 포함하도록 하세요:
# Too broad
class BloatedState(BaseModel):
user_data: Dict = {}
system_settings: Dict = {}
temporary_calculations: List = []
debug_info: Dict = {}
# ...many more fields
# Better: Focused state
class FocusedState(BaseModel):
user_id: str
preferences: Dict[str, str]
completion_status: Dict[str, bool]
2. 복잡한 플로우를 위한 구조화된 상태 사용
플로우의 복잡도가 증가할수록 구조화된 상태의 가치는 점점 커집니다:
# Simple flow can use unstructured state
class SimpleGreetingFlow(Flow):
@start()
def greet(self):
self.state["name"] = "World"
return f"Hello, {self.state['name']}!"
# Complex flow benefits from structured state
class UserRegistrationState(BaseModel):
username: str
email: str
verification_status: bool = False
registration_date: datetime = Field(default_factory=datetime.now)
last_login: Optional[datetime] = None
class RegistrationFlow(Flow[UserRegistrationState]):
# Methods with strongly-typed state access
3. 문서 상태 전이
복잡한 흐름의 경우, 실행 중에 상태가 어떻게 변하는지 문서화하세요:
@start()
def initialize_order(self):
"""
Initialize order state with empty values.
State before: {}
State after: {order_id: str, items: [], status: 'new'}
"""
self.state.order_id = str(uuid.uuid4())
self.state.items = []
self.state.status = "new"
return "Order initialized"
4. 상태 오류를 정상적으로 처리하기
상태 접근에 대한 오류 처리를 구현하세요:
@listen(previous_step)
def process_data(self, _):
try:
# Try to access a value that might not exist
user_preference = self.state.preferences.get("theme", "default")
except (AttributeError, KeyError):
# Handle the error gracefully
self.state.errors = self.state.get("errors", [])
self.state.errors.append("Failed to access preferences")
user_preference = "default"
return f"Used preference: {user_preference}"
5. 상태를 사용하여 진행 상황 추적
긴 실행 흐름에서 진행 상황을 추적하기 위해 상태를 활용하세요:
class ProgressTrackingFlow(Flow):
@start()
def initialize(self):
self.state["total_steps"] = 3
self.state["current_step"] = 0
self.state["progress"] = 0.0
self.update_progress()
return "Initialized"
def update_progress(self):
"""Helper method to calculate and update progress"""
if self.state.get("total_steps", 0) > 0:
self.state["progress"] = (self.state.get("current_step", 0) /
self.state["total_steps"]) * 100
print(f"Progress: {self.state['progress']:.1f}%")
@listen(initialize)
def step_one(self, _):
# Do work...
self.state["current_step"] = 1
self.update_progress()
return "Step 1 complete"
# Additional steps...
6. 가능한 경우 불변(Immutable) 연산 사용하기
특히 구조화된 상태에서는 명확성을 위해 불변 연산을 선호하세요:
# 리스트를 즉시 수정하는 대신:
self.state.items.append(new_item) # 변경 가능한 연산
# 새로운 상태를 생성하는 것을 고려하세요:
from pydantic import BaseModel
from typing import List
class ItemState(BaseModel):
items: List[str] = []
class ImmutableFlow(Flow[ItemState]):
@start()
def add_item(self):
# 추가된 항목과 함께 새로운 리스트 생성
self.state.items = [*self.state.items, "new item"]
return "Item added"
플로우 상태 디버깅
상태 변경 로깅
개발할 때 상태 변화를 추적하기 위해 로깅을 추가하세요:
import logging
logging.basicConfig(level=logging.INFO)
class LoggingFlow(Flow):
def log_state(self, step_name):
logging.info(f"State after {step_name}: {self.state}")
@start()
def initialize(self):
self.state["counter"] = 0
self.log_state("initialize")
return "Initialized"
@listen(initialize)
def increment(self, _):
self.state["counter"] += 1
self.log_state("increment")
return f"Incremented to {self.state['counter']}"
상태 시각화
디버깅을 위해 상태를 시각화하는 메서드를 추가할 수 있습니다:
def visualize_state(self):
"""Create a simple visualization of the current state"""
import json
from rich.console import Console
from rich.panel import Panel
console = Console()
if hasattr(self.state, "model_dump"):
# Pydantic v2
state_dict = self.state.model_dump()
elif hasattr(self.state, "dict"):
# Pydantic v1
state_dict = self.state.dict()
else:
# Unstructured state
state_dict = dict(self.state)
# Remove id for cleaner output
if "id" in state_dict:
state_dict.pop("id")
state_json = json.dumps(state_dict, indent=2, default=str)
console.print(Panel(state_json, title="Current Flow State"))
CrewAI Flows에서 상태 관리를 마스터하면 컨텍스트를 유지하고, 복잡한 결정을 내리며, 일관된 결과를 제공하는 정교하고 견고한 AI 애플리케이션을 구축할 수 있는 힘을 얻게 됩니다.
비구조화 상태든 구조화 상태든 적절한 상태 관리 방식을 구현하면 유지 관리가 용이하고, 확장 가능하며, 실제 문제를 효과적으로 해결할 수 있는 플로우를 만들 수 있습니다.
더 복잡한 플로우를 개발할수록 좋은 상태 관리는 유연성과 구조성 사이의 올바른 균형을 찾는 것임을 기억하세요. 이를 통해 코드가 강력하면서도 이해하기 쉬워집니다.
이제 CrewAI Flows에서 상태 관리의 개념과 실습을 마스터하셨습니다! 이 지식을 통해 컨텍스트를 효과적으로 유지하고, 단계 간 데이터를 공유하며, 정교한 애플리케이션 로직을 구현하는 견고한 AI 워크플로우를 만들 수 있습니다.
다음 단계
- flow에서 구조화된 state와 비구조화된 state를 모두 실험해 보세요
- 장기 실행 워크플로를 위해 state 영속성을 구현해 보세요
- 첫 crew 만들기를 탐색하여 crew와 flow가 어떻게 함께 작동하는지 확인해 보세요
- 더 고급 기능을 원한다면 Flow 참고 문서를 확인해 보세요