# Example: Content Production Pipeline combining Crews and Flows
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
from typing import List, Dict
class ContentState(BaseModel):
topic: str = ""
target_audience: str = ""
content_type: str = ""
outline: Dict = {}
draft_content: str = ""
final_content: str = ""
seo_score: int = 0
class ContentProductionFlow(Flow[ContentState]):
@start()
def initialize_project(self):
# Set initial parameters
self.state.topic = "Sustainable Investing"
self.state.target_audience = "Millennial Investors"
self.state.content_type = "Blog Post"
return "Project initialized"
@listen(initialize_project)
def create_outline(self, _):
# Use a research crew to create an outline
researcher = Agent(
role="Content Researcher",
goal=f"Research {self.state.topic} for {self.state.target_audience}",
backstory="You are an expert researcher with deep knowledge of content creation."
)
outliner = Agent(
role="Content Strategist",
goal=f"Create an engaging outline for a {self.state.content_type}",
backstory="You excel at structuring content for maximum engagement."
)
research_task = Task(
description=f"Research {self.state.topic} focusing on what would interest {self.state.target_audience}",
expected_output="Comprehensive research notes with key points and statistics",
agent=researcher
)
outline_task = Task(
description=f"Create an outline for a {self.state.content_type} about {self.state.topic}",
expected_output="Detailed content outline with sections and key points",
agent=outliner,
context=[research_task]
)
outline_crew = Crew(
agents=[researcher, outliner],
tasks=[research_task, outline_task],
process=Process.sequential,
verbose=True
)
# Run the crew and store the result
result = outline_crew.kickoff()
# Parse the outline (in a real app, you might use a more robust parsing approach)
import json
try:
self.state.outline = json.loads(result.raw)
except:
# Fallback if not valid JSON
self.state.outline = {"sections": result.raw}
return "Outline created"
@listen(create_outline)
def write_content(self, _):
# Use a writing crew to create the content
writer = Agent(
role="Content Writer",
goal=f"Write engaging content for {self.state.target_audience}",
backstory="You are a skilled writer who creates compelling content."
)
editor = Agent(
role="Content Editor",
goal="Ensure content is polished, accurate, and engaging",
backstory="You have a keen eye for detail and a talent for improving content."
)
writing_task = Task(
description=f"Write a {self.state.content_type} about {self.state.topic} following this outline: {self.state.outline}",
expected_output="Complete draft content in markdown format",
agent=writer
)
editing_task = Task(
description="Edit and improve the draft content for clarity, engagement, and accuracy",
expected_output="Polished final content in markdown format",
agent=editor,
context=[writing_task]
)
writing_crew = Crew(
agents=[writer, editor],
tasks=[writing_task, editing_task],
process=Process.sequential,
verbose=True
)
# Run the crew and store the result
result = writing_crew.kickoff()
self.state.final_content = result.raw
return "Content created"
@listen(write_content)
def optimize_for_seo(self, _):
# Use a direct LLM call for SEO optimization
from crewai import LLM
llm = LLM(model="openai/gpt-4o-mini")
prompt = f"""
Analyze this content for SEO effectiveness for the keyword "{self.state.topic}".
Rate it on a scale of 1-100 and provide 3 specific recommendations for improvement.
Content: {self.state.final_content[:1000]}... (truncated for brevity)
Format your response as JSON with the following structure:
{{
"score": 85,
"recommendations": [
"Recommendation 1",
"Recommendation 2",
"Recommendation 3"
]
}}
"""
seo_analysis = llm.call(prompt)
# Parse the SEO analysis
import json
try:
analysis = json.loads(seo_analysis)
self.state.seo_score = analysis.get("score", 0)
return analysis
except:
self.state.seo_score = 50
return {"score": 50, "recommendations": ["Unable to parse SEO analysis"]}
# Run the flow
content_flow = ContentProductionFlow()
result = content_flow.kickoff()