from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ArticleState(BaseModel):
topic: str = ""
research: str = ""
draft: str = ""
final_article: str = ""
class ArticleFlow(Flow[ArticleState]):
@start()
def run_research_crew(self):
"""A full Crew of agents handles research."""
researcher = Agent(
role="Senior Research Analyst",
goal=f"Produce comprehensive research on: {self.state.topic}",
backstory="You're a veteran analyst known for thorough, "
"well-sourced research reports.",
llm="gpt-4o"
)
research_task = Task(
description=f"Research '{self.state.topic}' thoroughly. "
"Cover key trends, data points, and expert opinions.",
expected_output="A detailed research brief with sources.",
agent=researcher
)
crew = Crew(agents=[researcher], tasks=[research_task])
result = crew.kickoff()
self.state.research = result.raw
return result.raw
@listen(run_research_crew)
def run_writing_crew(self, research_output):
"""A different Crew handles writing."""
writer = Agent(
role="Technical Writer",
goal="Write a compelling article based on provided research.",
backstory="You turn complex research into engaging, clear prose.",
llm="gpt-4o"
)
editor = Agent(
role="Senior Editor",
goal="Review and polish articles for publication quality.",
backstory="20 years of editorial experience at top tech publications.",
llm="gpt-4o"
)
write_task = Task(
description=f"Write an article based on this research:\n{self.state.research}",
expected_output="A well-structured draft article.",
agent=writer
)
edit_task = Task(
description="Review, fact-check, and polish the draft article.",
expected_output="A publication-ready article.",
agent=editor
)
crew = Crew(agents=[writer, editor], tasks=[write_task, edit_task])
result = crew.kickoff()
self.state.final_article = result.raw
return result.raw
# Run the full pipeline
flow = ArticleFlow()
flow.state.topic = "The Future of Edge AI"
flow.kickoff()
print(flow.state.final_article)