O checkpointing esta em versao inicial. As APIs podem mudar em versoes futuras.
Visao Geral
O checkpointing salva automaticamente o estado de execucao durante uma execucao. Se uma crew, flow ou agente falhar no meio da execucao, voce pode restaurar a partir do ultimo checkpoint e retomar sem reexecutar o trabalho ja concluido.
Inicio Rapido
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # usa padroes: ./.checkpoints, em task_completed
)
result = crew.kickoff()
Os arquivos de checkpoint sao gravados em ./.checkpoints/ apos cada tarefa concluida.
Configuracao
Use CheckpointConfig para controle total:
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
directory="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
Campos do CheckpointConfig
| Campo | Tipo | Padrao | Descricao |
|---|
directory | str | "./.checkpoints" | Caminho para os arquivos de checkpoint |
on_events | list[str] | ["task_completed"] | Tipos de evento que acionam um checkpoint |
provider | BaseProvider | JsonProvider() | Backend de armazenamento |
max_checkpoints | int | None | None | Maximo de arquivos a manter; os mais antigos sao removidos primeiro |
Heranca e Desativacao
O campo checkpoint em Crew, Flow e Agent aceita CheckpointConfig, True, False ou None:
| Valor | Comportamento |
|---|
None (padrao) | Herda do pai. Um agente herda a configuracao da crew. |
True | Ativa com padroes. |
False | Desativacao explicita. Interrompe a heranca do pai. |
CheckpointConfig(...) | Configuracao personalizada. |
crew = Crew(
agents=[
Agent(role="Researcher", ...), # herda checkpoint da crew
Agent(role="Writer", ..., checkpoint=False), # desativado, sem checkpoints
],
tasks=[...],
checkpoint=True,
)
Retomando a partir de um Checkpoint
# Restaurar e retomar
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # retoma a partir da ultima tarefa concluida
A crew restaurada pula tarefas ja concluidas e retoma a partir da primeira incompleta.
Funciona em Crew, Flow e Agent
Crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(directory="./crew_cp"),
)
Gatilho padrao: task_completed (um checkpoint por tarefa finalizada).
Flow
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
directory="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# Retomar
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
Agent
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
directory="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
Provedores de Armazenamento
O CrewAI inclui dois provedores de armazenamento para checkpoints.
JsonProvider (padrao)
Grava cada checkpoint como um arquivo JSON separado.
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
directory="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
SqliteProvider
Armazena todos os checkpoints em um unico arquivo SQLite.
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
directory="./.checkpoints.db",
provider=SqliteProvider(max_checkpoints=50),
),
)
Ao usar SqliteProvider, o campo directory e o caminho do arquivo de banco de dados, nao um diretorio.
Tipos de Evento
O campo on_events aceita qualquer combinacao de strings de tipo de evento. Escolhas comuns:
| Caso de Uso | Eventos |
|---|
| Apos cada tarefa (Crew) | ["task_completed"] |
| Apos cada metodo do flow | ["method_execution_finished"] |
| Apos execucao do agente | ["agent_execution_completed"], ["lite_agent_execution_completed"] |
| Apenas na conclusao da crew | ["crew_kickoff_completed"] |
| Apos cada chamada LLM | ["llm_call_completed"] |
| Em tudo | ["*"] |
Usar ["*"] ou eventos de alta frequencia como llm_call_completed gravara muitos arquivos de checkpoint e pode impactar o desempenho. Use max_checkpoints para limitar o uso de disco.
Checkpointing Manual
Para controle total, registre seu proprio handler de evento e chame state.checkpoint() diretamente:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# Handler sincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
# Handler assincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
O argumento state e o RuntimeState passado automaticamente pelo barramento de eventos quando seu handler aceita 3 parametros. Voce pode registrar handlers em qualquer tipo de evento listado na documentacao de Event Listeners.
O checkpointing e best-effort: se uma gravacao de checkpoint falhar, o erro e registrado no log, mas a execucao continua sem interrupcao.