Coverage for src/keel/checkpoint.py: 100%
203 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Resumable checkpoint helpers for keel work runs."""
3from __future__ import annotations
5import json
6from pathlib import Path
7from typing import Any
9from . import config as cfg
10from . import model
12CHECKPOINT_SCHEMA_VERSION = "keel.checkpoint.v1"
13DEFAULT_CHECKPOINT_PATH = ".keel/state/checkpoint.json"
14RECORD_TYPE_RUN_CHECKPOINT = "run_checkpoint"
16COMMANDS = ("ship", "work-block", "overnight")
17STEP_IDS = tuple(step.id for step in model.BACKBONE)
18MERGE_STATES = ("not-started", "pending", "merged", "failed", "skipped")
19CAPTURE_STATES = ("not-started", "applied", "deferred", "skipped", "failed")
20CLOSE_STATES = ("not-started", "closed", "failed")
21LIVE_PR_STATES = ("unknown", "missing", "open", "merged", "closed")
22LIVE_WORKTREE_STATES = ("unknown", "present", "missing")
24_IDEMPOTENT_STEPS = {
25 "s0": "load-config",
26 "s1": "select-or-reconcile-issue",
27 "s2": "ensure-branch-and-worktree",
28 "s3": "rerun-guards",
29 "s4": "continue-implementation",
30 "s5": "reclassify-diff",
31 "s6": "recheck-ci",
32 "s7": "rerun-review",
33 "s8": "rerun-test",
34 "s9": "continue-fixloop",
35 "s10": "revalidate-merge-state",
36 "s11": "run-or-verify-capture",
37 "s12": "run-or-verify-close",
38}
41class CheckpointError(ValueError):
42 """Raised when a checkpoint cannot be decoded as the stable schema."""
45def checkpoint_contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]:
46 """Return the project-neutral checkpoint storage and resume contract."""
47 path, source = configured_checkpoint_path(config)
48 return {
49 "schema_version": CHECKPOINT_SCHEMA_VERSION,
50 "format": "json",
51 "path": path,
52 "path_source": source,
53 "missing_handling": "no-checkpoint",
54 "write_owner": ["ship", "work-block", "overnight"],
55 "resume_command": "resume",
56 "checkpoint_command": "checkpoint",
57 "steps": [
58 {
59 "step_id": step.id,
60 "step_name": step.name,
61 "safe_boundary": step.id in _IDEMPOTENT_STEPS,
62 "resume_action": _IDEMPOTENT_STEPS.get(step.id),
63 }
64 for step in model.BACKBONE
65 ],
66 "consumer_neutral": True,
67 }
70def configured_checkpoint_path(config: cfg.ProjectConfig) -> tuple[str, str]:
71 """Return the configured checkpoint path and source."""
72 pack = config.policy_pack or {}
73 reports = pack.get("reports") if isinstance(pack.get("reports"), dict) else {}
74 value = reports.get("checkpoint")
75 if isinstance(value, str) and value.strip():
76 return value, "policy_pack.reports.checkpoint"
77 return DEFAULT_CHECKPOINT_PATH, "default"
80def resolve_path(root: str | Path, config: cfg.ProjectConfig) -> Path:
81 """Resolve the checkpoint path under ``root`` and reject escapes."""
82 raw, _ = configured_checkpoint_path(config)
83 path = Path(raw)
84 if path.is_absolute():
85 raise CheckpointError("checkpoint path must be relative to the project root")
86 root_path = Path(root).resolve()
87 resolved = (root_path / path).resolve()
88 try:
89 resolved.relative_to(root_path)
90 except ValueError as exc:
91 raise CheckpointError("checkpoint path escapes the project root") from exc
92 return resolved
95def build_checkpoint_record(
96 *,
97 run_id: str,
98 command: str,
99 current_step: str,
100 base_branch: str,
101 target: str | None = None,
102 issue_queue: list[int] | None = None,
103 active_issue: int | None = None,
104 branch: str | None = None,
105 worktree: str | None = None,
106 pull_request: int | None = None,
107 head_sha: str | None = None,
108 completed_steps: list[str] | None = None,
109 last_gate: str | None = None,
110 last_review: str | None = None,
111 last_check: str | None = None,
112 jury_mode: str | None = None,
113 merge_state: str = "not-started",
114 capture_state: str = "not-started",
115 close_state: str = "not-started",
116 stop_reason: str | None = None,
117) -> dict[str, Any]:
118 """Build one deterministic checkpoint record."""
119 record = {
120 "schema_version": CHECKPOINT_SCHEMA_VERSION,
121 "record_type": RECORD_TYPE_RUN_CHECKPOINT,
122 "run_id": run_id,
123 "command": command,
124 "target": target,
125 "queue": {
126 "issues": list(issue_queue or ()),
127 "active_issue": active_issue,
128 },
129 "position": {
130 "current_step": current_step,
131 "completed_steps": list(completed_steps or ()),
132 },
133 "identifiers": {
134 "base_branch": base_branch,
135 "branch": branch,
136 "worktree": worktree,
137 "pull_request": pull_request,
138 "head_sha": head_sha,
139 },
140 "state": {
141 "last_gate": last_gate,
142 "last_review": last_review,
143 "last_check": last_check,
144 "jury_mode": jury_mode,
145 "merge": merge_state,
146 "capture": capture_state,
147 "close": close_state,
148 "stop_reason": stop_reason,
149 },
150 "resume": {
151 "safe_boundary": current_step in _IDEMPOTENT_STEPS,
152 "action": _IDEMPOTENT_STEPS.get(current_step),
153 "must_reconcile_live_state": True,
154 "repeat_policy": {
155 "merge": "never-repeat-if-live-pr-merged",
156 "comments": "idempotent-anchor-or-skip",
157 "worktree": "refuse-unrelated-existing-path",
158 },
159 },
160 }
161 validate_checkpoint(record)
162 return record
165def encode_checkpoint(record: dict[str, Any]) -> str:
166 """Encode one checkpoint as stable JSON."""
167 validate_checkpoint(record)
168 return json.dumps(record, indent=2, sort_keys=True) + "\n"
171def parse_checkpoint(text: str) -> dict[str, Any]:
172 """Parse and validate one checkpoint record."""
173 try:
174 record = json.loads(text)
175 except json.JSONDecodeError as exc:
176 raise CheckpointError("invalid JSON") from exc
177 validate_checkpoint(record)
178 return record
181def read_checkpoint(path: str | Path) -> dict[str, Any] | None:
182 """Read a checkpoint; a missing checkpoint means there is no resumable run."""
183 checkpoint_path = Path(path)
184 if not checkpoint_path.exists():
185 return None
186 return parse_checkpoint(checkpoint_path.read_text(encoding="utf-8"))
189def write_checkpoint(path: str | Path, record: dict[str, Any]) -> None:
190 """Write one validated checkpoint, replacing the previous resume point."""
191 checkpoint_path = Path(path)
192 checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
193 checkpoint_path.write_text(encode_checkpoint(record), encoding="utf-8")
196def resume_plan_as_dict(
197 record: dict[str, Any] | None,
198 *,
199 live_pr_state: str = "unknown",
200 live_worktree_state: str = "unknown",
201) -> dict[str, Any]:
202 """Return a deterministic dry-run resume plan after live-state reconciliation."""
203 if live_pr_state not in LIVE_PR_STATES:
204 raise CheckpointError("unsupported live_pr_state")
205 if live_worktree_state not in LIVE_WORKTREE_STATES:
206 raise CheckpointError("unsupported live_worktree_state")
207 if record is None:
208 return {
209 "status": "no-checkpoint",
210 "can_resume": False,
211 "reason": "checkpoint file is missing",
212 "next_step": None,
213 "resume_action": None,
214 "reconcile": _live_state(live_pr_state, live_worktree_state),
215 "warnings": [],
216 }
217 validate_checkpoint(record)
218 warnings: list[str] = []
219 state = record["state"]
220 identifiers = record["identifiers"]
221 current_step = record["position"]["current_step"]
222 status = "ready"
223 can_resume = True
224 reason = "resume from the recorded safe boundary"
225 next_step = current_step
226 action = record["resume"]["action"]
228 if live_pr_state == "merged" or state["merge"] == "merged":
229 if state["capture"] in {"not-started", "failed"}:
230 status = "needs-capture"
231 next_step = "s11"
232 action = _IDEMPOTENT_STEPS["s11"]
233 reason = "merge is complete; resume at capture without repeating merge"
234 elif state["close"] in {"not-started", "failed"}:
235 status = "needs-close"
236 next_step = "s12"
237 action = _IDEMPOTENT_STEPS["s12"]
238 reason = "capture is complete or skipped; resume at close"
239 else:
240 status = "complete"
241 can_resume = False
242 next_step = None
243 action = None
244 reason = "merge, capture, and close are already complete"
245 elif live_pr_state == "closed" and identifiers.get("pull_request"):
246 status = "ambiguous"
247 can_resume = False
248 reason = "checkpoint references a pull request that live state reports closed"
249 warnings.append("reopen the PR or reconcile the checkpoint before resuming")
250 elif live_worktree_state == "missing" and identifiers.get("worktree"):
251 status = "ambiguous"
252 can_resume = False
253 reason = "checkpoint references a worktree that live state reports missing"
254 warnings.append("recreate or reconcile the recorded worktree before resuming")
255 elif live_pr_state == "missing" and identifiers.get("pull_request"):
256 status = "ambiguous"
257 can_resume = False
258 reason = "checkpoint references a pull request that live state reports missing"
259 warnings.append("verify whether the PR was deleted or the checkpoint is stale")
260 elif current_step == "s6":
261 status = "waiting-on-ci"
262 reason = "resume by rechecking CI before review, test, or merge"
263 elif identifiers.get("pull_request"):
264 status = "pr-open"
265 reason = "resume against the recorded pull request after refreshing live state"
267 return {
268 "status": status,
269 "can_resume": can_resume,
270 "reason": reason,
271 "next_step": next_step,
272 "resume_action": action,
273 "checkpoint": record,
274 "reconcile": _live_state(live_pr_state, live_worktree_state),
275 "warnings": warnings,
276 }
279COVERAGE_STATES = ("covered", "missing", "stale-step")
282def covering_checkpoint(
283 record: dict[str, Any] | None,
284 run_id: str,
285 expected_step: str,
286) -> dict[str, Any]:
287 """Decide whether a checkpoint covers ``run_id`` at ``expected_step``.
289 Pure and deterministic — no I/O. ``record`` is the current checkpoint
290 (or ``None`` when no checkpoint file exists); ``expected_step`` must be a
291 backbone step id. A run is *covered* when its checkpoint is for the same
292 ``run_id`` and the run actually progressed to ``expected_step`` (the
293 recorded ``current_step`` is at or past it, or the step is in
294 ``completed_steps``). The result distinguishes:
296 * ``covered`` — a current checkpoint for this run reached ``expected_step``;
297 * ``missing`` — no checkpoint, or a checkpoint for a different run;
298 * ``stale-step`` — a checkpoint for this run that has not reached the step.
299 """
300 if expected_step not in STEP_IDS:
301 raise CheckpointError("expected_step must be a backbone step id")
302 expected_index = STEP_IDS.index(expected_step)
303 if record is None:
304 return {
305 "status": "missing",
306 "covered": False,
307 "run_id": run_id,
308 "expected_step": expected_step,
309 "checkpoint_run_id": None,
310 "checkpoint_step": None,
311 "reason": f"no current checkpoint for run {run_id} at step {expected_step}",
312 }
313 validate_checkpoint(record)
314 checkpoint_run_id = record.get("run_id")
315 position = record["position"]
316 current_step = position["current_step"]
317 completed = position.get("completed_steps", [])
318 base = {
319 "run_id": run_id,
320 "expected_step": expected_step,
321 "checkpoint_run_id": checkpoint_run_id,
322 "checkpoint_step": current_step,
323 }
324 if checkpoint_run_id != run_id:
325 return {
326 **base,
327 "status": "missing",
328 "covered": False,
329 "reason": (
330 f"no current checkpoint for run {run_id} at step {expected_step} "
331 f"(checkpoint is for run {checkpoint_run_id})"
332 ),
333 }
334 reached = expected_step in completed or STEP_IDS.index(current_step) >= expected_index
335 if not reached:
336 return {
337 **base,
338 "status": "stale-step",
339 "covered": False,
340 "reason": (
341 f"no current checkpoint for run {run_id} at step {expected_step} "
342 f"(run is at {current_step})"
343 ),
344 }
345 return {
346 **base,
347 "status": "covered",
348 "covered": True,
349 "reason": f"checkpoint for run {run_id} reached step {expected_step}",
350 }
353def find_orphans(
354 *,
355 live_branches: list[str] | None = None,
356 live_pull_requests: list[int] | None = None,
357 checkpoint_record: dict[str, Any] | None = None,
358 ledger_records: list[dict[str, Any]] | None = None,
359) -> dict[str, Any]:
360 """Return live branches/PRs with no covering checkpoint or ledger record.
362 Pure and deterministic — no I/O. An *orphan* is a branch or pull request
363 that exists on the git/transport side but is referenced by neither the
364 current checkpoint nor any ledger record, i.e. keel has no covering state
365 for it (the GAP-13 hazard from the git side). Advisory only.
366 """
367 known_branches, known_prs = _known_references(checkpoint_record, ledger_records)
368 orphan_branches = [
369 branch for branch in (live_branches or []) if branch not in known_branches
370 ]
371 orphan_prs = [pr for pr in (live_pull_requests or []) if pr not in known_prs]
372 return {
373 "branches": orphan_branches,
374 "pull_requests": orphan_prs,
375 "orphan_count": len(orphan_branches) + len(orphan_prs),
376 "known_branches": sorted(known_branches),
377 "known_pull_requests": sorted(known_prs),
378 }
381def _known_references(
382 checkpoint_record: dict[str, Any] | None,
383 ledger_records: list[dict[str, Any]] | None,
384) -> tuple[set[str], set[int]]:
385 branches: set[str] = set()
386 prs: set[int] = set()
387 if checkpoint_record is not None:
388 identifiers = checkpoint_record.get("identifiers", {})
389 branch = identifiers.get("branch")
390 if isinstance(branch, str) and branch:
391 branches.add(branch)
392 pull_request = identifiers.get("pull_request")
393 if isinstance(pull_request, int):
394 prs.add(pull_request)
395 for record in ledger_records or []:
396 git = record.get("git") if isinstance(record.get("git"), dict) else {}
397 branch = git.get("branch")
398 if isinstance(branch, str) and branch:
399 branches.add(branch)
400 pr_entry = record.get("pull_request")
401 if isinstance(pr_entry, dict):
402 number = pr_entry.get("number")
403 if isinstance(number, int):
404 prs.add(number)
405 return branches, prs
408def validate_checkpoint(record: Any) -> None:
409 """Validate the stable checkpoint shape."""
410 if not isinstance(record, dict):
411 raise CheckpointError("checkpoint must be an object")
412 if record.get("schema_version") != CHECKPOINT_SCHEMA_VERSION:
413 raise CheckpointError("unsupported schema_version")
414 if record.get("record_type") != RECORD_TYPE_RUN_CHECKPOINT:
415 raise CheckpointError("unsupported record_type")
416 if record.get("command") not in COMMANDS:
417 raise CheckpointError("unsupported command")
418 queue = record.get("queue")
419 if not isinstance(queue, dict) or not isinstance(queue.get("issues"), list):
420 raise CheckpointError("queue must be an object with issues")
421 position = record.get("position")
422 if not isinstance(position, dict) or position.get("current_step") not in _IDEMPOTENT_STEPS:
423 raise CheckpointError("unsupported current_step")
424 completed = position.get("completed_steps")
425 if not isinstance(completed, list) or any(step not in STEP_IDS for step in completed):
426 raise CheckpointError("unsupported completed_steps")
427 identifiers = record.get("identifiers")
428 if not isinstance(identifiers, dict) or "base_branch" not in identifiers:
429 raise CheckpointError("identifiers must include base_branch")
430 state = record.get("state")
431 if not isinstance(state, dict):
432 raise CheckpointError("state must be an object")
433 if state.get("merge") not in MERGE_STATES:
434 raise CheckpointError("unsupported merge state")
435 if state.get("capture") not in CAPTURE_STATES:
436 raise CheckpointError("unsupported capture state")
437 if state.get("close") not in CLOSE_STATES:
438 raise CheckpointError("unsupported close state")
439 resume = record.get("resume")
440 if not isinstance(resume, dict):
441 raise CheckpointError("resume must be an object")
442 if resume.get("action") != _IDEMPOTENT_STEPS[position["current_step"]]:
443 raise CheckpointError("resume action does not match current_step")
444 if not isinstance(resume.get("repeat_policy"), dict):
445 raise CheckpointError("resume repeat_policy must be an object")
448def _live_state(live_pr_state: str, live_worktree_state: str) -> dict[str, str]:
449 return {
450 "pull_request": live_pr_state,
451 "worktree": live_worktree_state,
452 "source": "live-git-github-state-or-adapter-supplied-dry-run-state",
453 }