Pipelines
Understanding and utilizing pipelines in the crewAI framework for efficient multi-stage task processing.
What is a Pipeline?
A pipeline in CrewAI represents a structured workflow that allows for the sequential or parallel execution of multiple crews. It provides a way to organize complex processes involving multiple stages, where the output of one stage can serve as input for subsequent stages.
Key Terminology
Understanding the following terms is crucial for working effectively with pipelines:
- Stage: A distinct part of the pipeline, which can be either sequential (a single crew) or parallel (multiple crews executing concurrently).
- Kickoff: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
- Branch: Parallel executions within a stage (e.g., concurrent crew operations).
- Trace: The journey of an individual input through the entire pipeline, capturing the path and transformations it undergoes.
Example pipeline structure:
This represents a pipeline with three stages:
- A sequential stage (crew1)
- A parallel stage with two branches (crew2 and crew3 executing concurrently)
- Another sequential stage (crew4)
Each input creates its own kickoff, flowing through all stages of the pipeline. Multiple kickoffs can be processed concurrently, each following the defined pipeline structure.
Pipeline Attributes
Attribute | Parameters | Description |
---|---|---|
Stages | stages | A list of PipelineStage (crews, lists of crews, or routers) representing the stages to be executed in sequence. |
Creating a Pipeline
When creating a pipeline, you define a series of stages, each consisting of either a single crew or a list of crews for parallel execution. The pipeline ensures that each stage is executed in order, with the output of one stage feeding into the next.
Example: Assembling a Pipeline
Pipeline Methods
Method | Description |
---|---|
kickoff | Executes the pipeline, processing all stages and returning the results. This method initiates one or more kickoffs through the pipeline, handling the flow of data between stages. |
process_runs | Runs the pipeline for each input provided, handling the flow and transformation of data between stages. |
Pipeline Output
The output of a pipeline in the CrewAI framework is encapsulated within the PipelineKickoffResult
class.
This class provides a structured way to access the results of the pipeline’s execution, including various formats such as raw strings, JSON, and Pydantic models.
Pipeline Output Attributes
Attribute | Parameters | Type | Description |
---|---|---|---|
ID | id | UUID4 | A unique identifier for the pipeline output. |
Run Results | run_results | List[PipelineRunResult] | A list of PipelineRunResult objects, each representing the output of a single run through the pipeline. |
Pipeline Output Methods
Method/Property | Description |
---|---|
add_run_result | Adds a PipelineRunResult to the list of run results. |
Pipeline Run Result Attributes
Attribute | Parameters | Type | Description |
---|---|---|---|
ID | id | UUID4 | A unique identifier for the run result. |
Raw | raw | str | The raw output of the final stage in the pipeline kickoff. |
Pydantic | pydantic | Any | A Pydantic model object representing the structured output of the final stage, if applicable. |
JSON Dict | json_dict | Union[Dict[str, Any], None] | A dictionary representing the JSON output of the final stage, if applicable. |
Token Usage | token_usage | Dict[str, UsageMetrics] | A summary of token usage across all stages of the pipeline kickoff. |
Trace | trace | List[Any] | A trace of the journey of inputs through the pipeline kickoff. |
Crews Outputs | crews_outputs | List[CrewOutput] | A list of CrewOutput objects, representing the outputs from each crew in the pipeline kickoff. |
Pipeline Run Result Methods and Properties
Method/Property | Description |
---|---|
json | Returns the JSON string representation of the run result if the output format of the final task is JSON. |
to_dict | Converts the JSON and Pydantic outputs to a dictionary. |
str | Returns the string representation of the run result, prioritizing Pydantic, then JSON, then raw. |
Accessing Pipeline Outputs
Once a pipeline has been executed, its output can be accessed through the PipelineOutput
object returned by the process_runs
method.
The PipelineOutput
class provides access to individual PipelineRunResult
objects, each representing a single run through the pipeline.
Example
This example demonstrates how to access and work with the pipeline output, including individual run results and their associated data.
Using Pipelines
Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to:
- Sequence Operations: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next.
- Parallel Processing: Run multiple crews concurrently within a stage for increased efficiency.
- Manage Complex Workflows: Break down large tasks into smaller, manageable steps executed by specialized crews.
Example: Running a Pipeline
Advanced Features
Parallel Execution within Stages
You can define parallel execution within a stage by providing a list of crews, creating multiple branches:
Routers in Pipelines
Routers are a powerful feature in crewAI pipelines that allow for dynamic decision-making and branching within your workflow. They enable you to direct the flow of execution based on specific conditions or criteria, making your pipelines more flexible and adaptive.
What is a Router?
A router in crewAI is a special component that can be included as a stage in your pipeline. It evaluates the input data and determines which path the execution should take next. This allows for conditional branching in your pipeline, where different crews or sub-pipelines can be executed based on the router’s decision.
Key Components of a Router
- Routes: A dictionary of named routes, each associated with a condition and a pipeline to execute if the condition is met.
- Default Route: A fallback pipeline that is executed if none of the defined route conditions are met.
Creating a Router
Here’s an example of how to create a router:
In this example, the router decides between an urgent pipeline and a normal pipeline based on the urgency score of the email. If the urgency score is greater than 7, it routes to the urgent pipeline; otherwise, it uses the normal pipeline. If the input doesn’t include an urgency score, it defaults to just the classification crew.
Benefits of Using Routers
- Dynamic Workflow: Adapt your pipeline’s behavior based on input characteristics or intermediate results.
- Efficiency: Route urgent tasks to quicker processes, reserving more thorough pipelines for less time-sensitive inputs.
- Flexibility: Easily modify or extend your pipeline’s logic without changing the core structure.
- Scalability: Handle a wide range of email types and urgency levels with a single pipeline structure.
Error Handling and Validation
The Pipeline
class includes validation mechanisms to ensure the robustness of the pipeline structure:
- Validates that stages contain only Crew instances or lists of Crew instances.
- Prevents double nesting of stages to maintain a clear structure.
Was this page helpful?