Pular para o conteúdo principal
O checkpointing esta em versao inicial. As APIs podem mudar em versoes futuras.

Visao Geral

O checkpointing salva automaticamente o estado de execucao durante uma execucao. Se uma crew, flow ou agente falhar no meio da execucao, voce pode restaurar a partir do ultimo checkpoint e retomar sem reexecutar o trabalho ja concluido.

Inicio Rapido

from crewai import Crew, CheckpointConfig

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=True,  # usa padroes: ./.checkpoints, em task_completed
)
result = crew.kickoff()
Os arquivos de checkpoint sao gravados em ./.checkpoints/ apos cada tarefa concluida.

Configuracao

Use CheckpointConfig para controle total:
from crewai import Crew, CheckpointConfig

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        directory="./my_checkpoints",
        on_events=["task_completed", "crew_kickoff_completed"],
        max_checkpoints=5,
    ),
)

Campos do CheckpointConfig

CampoTipoPadraoDescricao
directorystr"./.checkpoints"Caminho para os arquivos de checkpoint
on_eventslist[str]["task_completed"]Tipos de evento que acionam um checkpoint
providerBaseProviderJsonProvider()Backend de armazenamento
max_checkpointsint | NoneNoneMaximo de arquivos a manter; os mais antigos sao removidos primeiro

Heranca e Desativacao

O campo checkpoint em Crew, Flow e Agent aceita CheckpointConfig, True, False ou None:
ValorComportamento
None (padrao)Herda do pai. Um agente herda a configuracao da crew.
TrueAtiva com padroes.
FalseDesativacao explicita. Interrompe a heranca do pai.
CheckpointConfig(...)Configuracao personalizada.
crew = Crew(
    agents=[
        Agent(role="Researcher", ...),                  # herda checkpoint da crew
        Agent(role="Writer", ..., checkpoint=False),     # desativado, sem checkpoints
    ],
    tasks=[...],
    checkpoint=True,
)

Retomando a partir de um Checkpoint

# Restaurar e retomar
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff()  # retoma a partir da ultima tarefa concluida
A crew restaurada pula tarefas ja concluidas e retoma a partir da primeira incompleta.

Funciona em Crew, Flow e Agent

Crew

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task, review_task],
    checkpoint=CheckpointConfig(directory="./crew_cp"),
)
Gatilho padrao: task_completed (um checkpoint por tarefa finalizada).

Flow

from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig

class MyFlow(Flow):
    @start()
    def step_one(self):
        return "data"

    @listen(step_one)
    def step_two(self, data):
        return process(data)

flow = MyFlow(
    checkpoint=CheckpointConfig(
        directory="./flow_cp",
        on_events=["method_execution_finished"],
    ),
)
result = flow.kickoff()

# Retomar
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()

Agent

agent = Agent(
    role="Researcher",
    goal="Research topics",
    backstory="Expert researcher",
    checkpoint=CheckpointConfig(
        directory="./agent_cp",
        on_events=["lite_agent_execution_completed"],
    ),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])

Provedores de Armazenamento

O CrewAI inclui dois provedores de armazenamento para checkpoints.

JsonProvider (padrao)

Grava cada checkpoint como um arquivo JSON separado.
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        directory="./my_checkpoints",
        provider=JsonProvider(),
        max_checkpoints=5,
    ),
)

SqliteProvider

Armazena todos os checkpoints em um unico arquivo SQLite.
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        directory="./.checkpoints.db",
        provider=SqliteProvider(max_checkpoints=50),
    ),
)
Ao usar SqliteProvider, o campo directory e o caminho do arquivo de banco de dados, nao um diretorio.

Tipos de Evento

O campo on_events aceita qualquer combinacao de strings de tipo de evento. Escolhas comuns:
Caso de UsoEventos
Apos cada tarefa (Crew)["task_completed"]
Apos cada metodo do flow["method_execution_finished"]
Apos execucao do agente["agent_execution_completed"], ["lite_agent_execution_completed"]
Apenas na conclusao da crew["crew_kickoff_completed"]
Apos cada chamada LLM["llm_call_completed"]
Em tudo["*"]
Usar ["*"] ou eventos de alta frequencia como llm_call_completed gravara muitos arquivos de checkpoint e pode impactar o desempenho. Use max_checkpoints para limitar o uso de disco.

Checkpointing Manual

Para controle total, registre seu proprio handler de evento e chame state.checkpoint() diretamente:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent

# Handler sincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
    path = state.checkpoint("./my_checkpoints")
    print(f"Checkpoint salvo: {path}")

# Handler assincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
    path = await state.acheckpoint("./my_checkpoints")
    print(f"Checkpoint salvo: {path}")
O argumento state e o RuntimeState passado automaticamente pelo barramento de eventos quando seu handler aceita 3 parametros. Voce pode registrar handlers em qualquer tipo de evento listado na documentacao de Event Listeners. O checkpointing e best-effort: se uma gravacao de checkpoint falhar, o erro e registrado no log, mas a execucao continua sem interrupcao.