Checkpointing is in early release. APIs may change in future versions.
Overview
Checkpointing automatically saves execution state during a run. If a crew, flow, or agent fails mid-execution, you can restore from the last checkpoint and resume without re-running completed work.
Quick Start
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # uses defaults: ./.checkpoints, on task_completed
)
result = crew.kickoff()
Checkpoint files are written to ./.checkpoints/ after each completed task.
Configuration
Use CheckpointConfig for full control:
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
directory="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
CheckpointConfig Fields
| Field | Type | Default | Description |
|---|
directory | str | "./.checkpoints" | Filesystem path for checkpoint files |
on_events | list[str] | ["task_completed"] | Event types that trigger a checkpoint |
provider | BaseProvider | JsonProvider() | Storage backend |
max_checkpoints | int | None | None | Max files to keep; oldest pruned first |
Inheritance and Opt-Out
The checkpoint field on Crew, Flow, and Agent accepts CheckpointConfig, True, False, or None:
| Value | Behavior |
|---|
None (default) | Inherit from parent. An agent inherits its crew’s config. |
True | Enable with defaults. |
False | Explicit opt-out. Stops inheritance from parent. |
CheckpointConfig(...) | Custom configuration. |
crew = Crew(
agents=[
Agent(role="Researcher", ...), # inherits crew's checkpoint
Agent(role="Writer", ..., checkpoint=False), # opted out, no checkpoints
],
tasks=[...],
checkpoint=True,
)
Resuming from a Checkpoint
# Restore and resume
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # picks up from last completed task
The restored crew skips already-completed tasks and resumes from the first incomplete one.
Works on Crew, Flow, and Agent
Crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(directory="./crew_cp"),
)
Default trigger: task_completed (one checkpoint per finished task).
Flow
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
directory="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# Resume
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
Agent
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
directory="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
Storage Providers
CrewAI ships with two checkpoint storage providers.
JsonProvider (default)
Writes each checkpoint as a separate JSON file. Simple, human-readable, easy to inspect.
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
directory="./my_checkpoints",
provider=JsonProvider(), # this is the default
max_checkpoints=5, # prunes oldest files
),
)
Files are named <timestamp>_<uuid>.json inside the directory.
SqliteProvider
Stores all checkpoints in a single SQLite database file. Better for high-frequency checkpointing and avoids many small files.
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
directory="./.checkpoints.db",
provider=SqliteProvider(max_checkpoints=50),
),
)
SqliteProvider accepts its own max_checkpoints parameter that prunes old rows via SQL. WAL journal mode is enabled for concurrent read access.
When using SqliteProvider, the directory field is the database file path, not a directory. The max_checkpoints on CheckpointConfig controls filesystem pruning (for JsonProvider), while SqliteProvider.max_checkpoints controls row pruning in the database.
Event Types
The on_events field accepts any combination of event type strings. Common choices:
| Use Case | Events |
|---|
| After each task (Crew) | ["task_completed"] |
| After each flow method | ["method_execution_finished"] |
| After agent execution | ["agent_execution_completed"], ["lite_agent_execution_completed"] |
| On crew completion only | ["crew_kickoff_completed"] |
| After every LLM call | ["llm_call_completed"] |
| On everything | ["*"] |
Using ["*"] or high-frequency events like llm_call_completed will write many checkpoint files and may impact performance. Use max_checkpoints to limit disk usage.
Manual Checkpointing
For full control, register your own event handler and call state.checkpoint() directly:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# Sync handler
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
# Async handler
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
The state argument is the RuntimeState passed automatically by the event bus when your handler accepts 3 parameters. You can register handlers on any event type listed in the Event Listeners documentation.
Checkpointing is best-effort: if a checkpoint write fails, the error is logged but execution continues uninterrupted.