Skip to main content
Checkpointing is in early release. APIs may change in future versions.

Overview

Checkpointing automatically saves execution state during a run. If a crew, flow, or agent fails mid-execution, you can restore from the last checkpoint and resume without re-running completed work.

Quick Start

from crewai import Crew, CheckpointConfig

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=True,  # uses defaults: ./.checkpoints, on task_completed
)
result = crew.kickoff()
Checkpoint files are written to ./.checkpoints/ after each completed task.

Configuration

Use CheckpointConfig for full control:
from crewai import Crew, CheckpointConfig

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        directory="./my_checkpoints",
        on_events=["task_completed", "crew_kickoff_completed"],
        max_checkpoints=5,
    ),
)

CheckpointConfig Fields

FieldTypeDefaultDescription
directorystr"./.checkpoints"Filesystem path for checkpoint files
on_eventslist[str]["task_completed"]Event types that trigger a checkpoint
providerBaseProviderJsonProvider()Storage backend
max_checkpointsint | NoneNoneMax files to keep; oldest pruned first

Inheritance and Opt-Out

The checkpoint field on Crew, Flow, and Agent accepts CheckpointConfig, True, False, or None:
ValueBehavior
None (default)Inherit from parent. An agent inherits its crew’s config.
TrueEnable with defaults.
FalseExplicit opt-out. Stops inheritance from parent.
CheckpointConfig(...)Custom configuration.
crew = Crew(
    agents=[
        Agent(role="Researcher", ...),                  # inherits crew's checkpoint
        Agent(role="Writer", ..., checkpoint=False),     # opted out, no checkpoints
    ],
    tasks=[...],
    checkpoint=True,
)

Resuming from a Checkpoint

# Restore and resume
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff()  # picks up from last completed task
The restored crew skips already-completed tasks and resumes from the first incomplete one.

Works on Crew, Flow, and Agent

Crew

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task, review_task],
    checkpoint=CheckpointConfig(directory="./crew_cp"),
)
Default trigger: task_completed (one checkpoint per finished task).

Flow

from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig

class MyFlow(Flow):
    @start()
    def step_one(self):
        return "data"

    @listen(step_one)
    def step_two(self, data):
        return process(data)

flow = MyFlow(
    checkpoint=CheckpointConfig(
        directory="./flow_cp",
        on_events=["method_execution_finished"],
    ),
)
result = flow.kickoff()

# Resume
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()

Agent

agent = Agent(
    role="Researcher",
    goal="Research topics",
    backstory="Expert researcher",
    checkpoint=CheckpointConfig(
        directory="./agent_cp",
        on_events=["lite_agent_execution_completed"],
    ),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])

Storage Providers

CrewAI ships with two checkpoint storage providers.

JsonProvider (default)

Writes each checkpoint as a separate JSON file. Simple, human-readable, easy to inspect.
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        directory="./my_checkpoints",
        provider=JsonProvider(),       # this is the default
        max_checkpoints=5,             # prunes oldest files
    ),
)
Files are named <timestamp>_<uuid>.json inside the directory.

SqliteProvider

Stores all checkpoints in a single SQLite database file. Better for high-frequency checkpointing and avoids many small files.
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        directory="./.checkpoints.db",
        provider=SqliteProvider(max_checkpoints=50),
    ),
)
SqliteProvider accepts its own max_checkpoints parameter that prunes old rows via SQL. WAL journal mode is enabled for concurrent read access.
When using SqliteProvider, the directory field is the database file path, not a directory. The max_checkpoints on CheckpointConfig controls filesystem pruning (for JsonProvider), while SqliteProvider.max_checkpoints controls row pruning in the database.

Event Types

The on_events field accepts any combination of event type strings. Common choices:
Use CaseEvents
After each task (Crew)["task_completed"]
After each flow method["method_execution_finished"]
After agent execution["agent_execution_completed"], ["lite_agent_execution_completed"]
On crew completion only["crew_kickoff_completed"]
After every LLM call["llm_call_completed"]
On everything["*"]
Using ["*"] or high-frequency events like llm_call_completed will write many checkpoint files and may impact performance. Use max_checkpoints to limit disk usage.

Manual Checkpointing

For full control, register your own event handler and call state.checkpoint() directly:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent

# Sync handler
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
    path = state.checkpoint("./my_checkpoints")
    print(f"Saved checkpoint: {path}")

# Async handler
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
    path = await state.acheckpoint("./my_checkpoints")
    print(f"Saved checkpoint: {path}")
The state argument is the RuntimeState passed automatically by the event bus when your handler accepts 3 parameters. You can register handlers on any event type listed in the Event Listeners documentation. Checkpointing is best-effort: if a checkpoint write fails, the error is logged but execution continues uninterrupted.