How to Guides
Conditional Tasks
Get Started
Core Concepts
How to Guides
- Create Custom Tools
- Sequential Processes
- Hierarchical Process
- Create Your Own Manager Agent
- Connect to any LLM
- Customize Agents
- Coding Agents
- Force Tool Output as Result
- Human Input on Execution
- Kickoff Crew Asynchronously
- Kickoff Crew for Each
- Replay Tasks from Latest Crew Kickoff
- Conditional Tasks
- Agent Monitoring with AgentOps
- Agent Monitoring with Langtrace
- Agent Monitoring with OpenLIT
Tools
- Browserbase Web Loader
- Code Docs RAG Search
- Code Interpreter
- Composio Tool
- CSV RAG Search
- DALL-E Tool
- Directory RAG Search
- Directory Read
- DOCX RAG Search
- EXA Search Web Loader
- File Read
- File Write
- Firecrawl Crawl Website
- Firecrawl Scrape Website
- Firecrawl Search
- Github Search
- Google Serper Search
- JSON RAG Search
- MDX RAG Search
- MySQL RAG Search
- NL2SQL Tool
- PDF RAG Search
- PG RAG Search
- Scrape Website
- Selenium Scraper
- Spider Scraper
- TXT RAG Search
- Vision Tool
- Website RAG Search
- XML RAG Search
- YouTube Channel RAG Search
- YouTube Video RAG Search
Telemetry
How to Guides
Conditional Tasks
Learn how to use conditional tasks in a crewAI kickoff
Introduction
Conditional Tasks in crewAI allow for dynamic workflow adaptation based on the outcomes of previous tasks. This powerful feature enables crews to make decisions and execute tasks selectively, enhancing the flexibility and efficiency of your AI-driven processes.
Example Usage
Code
from typing import List
from pydantic import BaseModel
from crewai import Agent, Crew
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.task import Task
from crewai_tools import SerperDevTool
# Define a condition function for the conditional task
# If false, the task will be skipped, if true, then execute the task.
def is_data_missing(output: TaskOutput) -> bool:
return len(output.pydantic.events) < 10 # this will skip this task
# Define the agents
data_fetcher_agent = Agent(
role="Data Fetcher",
goal="Fetch data online using Serper tool",
backstory="Backstory 1",
verbose=True,
tools=[SerperDevTool()]
)
data_processor_agent = Agent(
role="Data Processor",
goal="Process fetched data",
backstory="Backstory 2",
verbose=True
)
summary_generator_agent = Agent(
role="Summary Generator",
goal="Generate summary from fetched data",
backstory="Backstory 3",
verbose=True
)
class EventOutput(BaseModel):
events: List[str]
task1 = Task(
description="Fetch data about events in San Francisco using Serper tool",
expected_output="List of 10 things to do in SF this week",
agent=data_fetcher_agent,
output_pydantic=EventOutput,
)
conditional_task = ConditionalTask(
description="""
Check if data is missing. If we have less than 10 events,
fetch more events using Serper tool so that
we have a total of 10 events in SF this week..
""",
expected_output="List of 10 Things to do in SF this week",
condition=is_data_missing,
agent=data_processor_agent,
)
task3 = Task(
description="Generate summary of events in San Francisco from fetched data",
expected_output="A complete report on the customer and their customers and competitors, including their demographics, preferences, market positioning and audience engagement.",
agent=summary_generator_agent,
)
# Create a crew with the tasks
crew = Crew(
agents=[data_fetcher_agent, data_processor_agent, summary_generator_agent],
tasks=[task1, conditional_task, task3],
verbose=True,
planning=True
)
# Run the crew
result = crew.kickoff()
print("results", result)
Was this page helpful?
On this page