Coverage for src/keel/checkpoint.py: 100%

203 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Resumable checkpoint helpers for keel work runs.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from pathlib import Path 

7from typing import Any 

8 

9from . import config as cfg 

10from . import model 

11 

12CHECKPOINT_SCHEMA_VERSION = "keel.checkpoint.v1" 

13DEFAULT_CHECKPOINT_PATH = ".keel/state/checkpoint.json" 

14RECORD_TYPE_RUN_CHECKPOINT = "run_checkpoint" 

15 

16COMMANDS = ("ship", "work-block", "overnight") 

17STEP_IDS = tuple(step.id for step in model.BACKBONE) 

18MERGE_STATES = ("not-started", "pending", "merged", "failed", "skipped") 

19CAPTURE_STATES = ("not-started", "applied", "deferred", "skipped", "failed") 

20CLOSE_STATES = ("not-started", "closed", "failed") 

21LIVE_PR_STATES = ("unknown", "missing", "open", "merged", "closed") 

22LIVE_WORKTREE_STATES = ("unknown", "present", "missing") 

23 

24_IDEMPOTENT_STEPS = { 

25 "s0": "load-config", 

26 "s1": "select-or-reconcile-issue", 

27 "s2": "ensure-branch-and-worktree", 

28 "s3": "rerun-guards", 

29 "s4": "continue-implementation", 

30 "s5": "reclassify-diff", 

31 "s6": "recheck-ci", 

32 "s7": "rerun-review", 

33 "s8": "rerun-test", 

34 "s9": "continue-fixloop", 

35 "s10": "revalidate-merge-state", 

36 "s11": "run-or-verify-capture", 

37 "s12": "run-or-verify-close", 

38} 

39 

40 

41class CheckpointError(ValueError): 

42 """Raised when a checkpoint cannot be decoded as the stable schema.""" 

43 

44 

45def checkpoint_contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]: 

46 """Return the project-neutral checkpoint storage and resume contract.""" 

47 path, source = configured_checkpoint_path(config) 

48 return { 

49 "schema_version": CHECKPOINT_SCHEMA_VERSION, 

50 "format": "json", 

51 "path": path, 

52 "path_source": source, 

53 "missing_handling": "no-checkpoint", 

54 "write_owner": ["ship", "work-block", "overnight"], 

55 "resume_command": "resume", 

56 "checkpoint_command": "checkpoint", 

57 "steps": [ 

58 { 

59 "step_id": step.id, 

60 "step_name": step.name, 

61 "safe_boundary": step.id in _IDEMPOTENT_STEPS, 

62 "resume_action": _IDEMPOTENT_STEPS.get(step.id), 

63 } 

64 for step in model.BACKBONE 

65 ], 

66 "consumer_neutral": True, 

67 } 

68 

69 

70def configured_checkpoint_path(config: cfg.ProjectConfig) -> tuple[str, str]: 

71 """Return the configured checkpoint path and source.""" 

72 pack = config.policy_pack or {} 

73 reports = pack.get("reports") if isinstance(pack.get("reports"), dict) else {} 

74 value = reports.get("checkpoint") 

75 if isinstance(value, str) and value.strip(): 

76 return value, "policy_pack.reports.checkpoint" 

77 return DEFAULT_CHECKPOINT_PATH, "default" 

78 

79 

80def resolve_path(root: str | Path, config: cfg.ProjectConfig) -> Path: 

81 """Resolve the checkpoint path under ``root`` and reject escapes.""" 

82 raw, _ = configured_checkpoint_path(config) 

83 path = Path(raw) 

84 if path.is_absolute(): 

85 raise CheckpointError("checkpoint path must be relative to the project root") 

86 root_path = Path(root).resolve() 

87 resolved = (root_path / path).resolve() 

88 try: 

89 resolved.relative_to(root_path) 

90 except ValueError as exc: 

91 raise CheckpointError("checkpoint path escapes the project root") from exc 

92 return resolved 

93 

94 

95def build_checkpoint_record( 

96 *, 

97 run_id: str, 

98 command: str, 

99 current_step: str, 

100 base_branch: str, 

101 target: str | None = None, 

102 issue_queue: list[int] | None = None, 

103 active_issue: int | None = None, 

104 branch: str | None = None, 

105 worktree: str | None = None, 

106 pull_request: int | None = None, 

107 head_sha: str | None = None, 

108 completed_steps: list[str] | None = None, 

109 last_gate: str | None = None, 

110 last_review: str | None = None, 

111 last_check: str | None = None, 

112 jury_mode: str | None = None, 

113 merge_state: str = "not-started", 

114 capture_state: str = "not-started", 

115 close_state: str = "not-started", 

116 stop_reason: str | None = None, 

117) -> dict[str, Any]: 

118 """Build one deterministic checkpoint record.""" 

119 record = { 

120 "schema_version": CHECKPOINT_SCHEMA_VERSION, 

121 "record_type": RECORD_TYPE_RUN_CHECKPOINT, 

122 "run_id": run_id, 

123 "command": command, 

124 "target": target, 

125 "queue": { 

126 "issues": list(issue_queue or ()), 

127 "active_issue": active_issue, 

128 }, 

129 "position": { 

130 "current_step": current_step, 

131 "completed_steps": list(completed_steps or ()), 

132 }, 

133 "identifiers": { 

134 "base_branch": base_branch, 

135 "branch": branch, 

136 "worktree": worktree, 

137 "pull_request": pull_request, 

138 "head_sha": head_sha, 

139 }, 

140 "state": { 

141 "last_gate": last_gate, 

142 "last_review": last_review, 

143 "last_check": last_check, 

144 "jury_mode": jury_mode, 

145 "merge": merge_state, 

146 "capture": capture_state, 

147 "close": close_state, 

148 "stop_reason": stop_reason, 

149 }, 

150 "resume": { 

151 "safe_boundary": current_step in _IDEMPOTENT_STEPS, 

152 "action": _IDEMPOTENT_STEPS.get(current_step), 

153 "must_reconcile_live_state": True, 

154 "repeat_policy": { 

155 "merge": "never-repeat-if-live-pr-merged", 

156 "comments": "idempotent-anchor-or-skip", 

157 "worktree": "refuse-unrelated-existing-path", 

158 }, 

159 }, 

160 } 

161 validate_checkpoint(record) 

162 return record 

163 

164 

165def encode_checkpoint(record: dict[str, Any]) -> str: 

166 """Encode one checkpoint as stable JSON.""" 

167 validate_checkpoint(record) 

168 return json.dumps(record, indent=2, sort_keys=True) + "\n" 

169 

170 

171def parse_checkpoint(text: str) -> dict[str, Any]: 

172 """Parse and validate one checkpoint record.""" 

173 try: 

174 record = json.loads(text) 

175 except json.JSONDecodeError as exc: 

176 raise CheckpointError("invalid JSON") from exc 

177 validate_checkpoint(record) 

178 return record 

179 

180 

181def read_checkpoint(path: str | Path) -> dict[str, Any] | None: 

182 """Read a checkpoint; a missing checkpoint means there is no resumable run.""" 

183 checkpoint_path = Path(path) 

184 if not checkpoint_path.exists(): 

185 return None 

186 return parse_checkpoint(checkpoint_path.read_text(encoding="utf-8")) 

187 

188 

189def write_checkpoint(path: str | Path, record: dict[str, Any]) -> None: 

190 """Write one validated checkpoint, replacing the previous resume point.""" 

191 checkpoint_path = Path(path) 

192 checkpoint_path.parent.mkdir(parents=True, exist_ok=True) 

193 checkpoint_path.write_text(encode_checkpoint(record), encoding="utf-8") 

194 

195 

196def resume_plan_as_dict( 

197 record: dict[str, Any] | None, 

198 *, 

199 live_pr_state: str = "unknown", 

200 live_worktree_state: str = "unknown", 

201) -> dict[str, Any]: 

202 """Return a deterministic dry-run resume plan after live-state reconciliation.""" 

203 if live_pr_state not in LIVE_PR_STATES: 

204 raise CheckpointError("unsupported live_pr_state") 

205 if live_worktree_state not in LIVE_WORKTREE_STATES: 

206 raise CheckpointError("unsupported live_worktree_state") 

207 if record is None: 

208 return { 

209 "status": "no-checkpoint", 

210 "can_resume": False, 

211 "reason": "checkpoint file is missing", 

212 "next_step": None, 

213 "resume_action": None, 

214 "reconcile": _live_state(live_pr_state, live_worktree_state), 

215 "warnings": [], 

216 } 

217 validate_checkpoint(record) 

218 warnings: list[str] = [] 

219 state = record["state"] 

220 identifiers = record["identifiers"] 

221 current_step = record["position"]["current_step"] 

222 status = "ready" 

223 can_resume = True 

224 reason = "resume from the recorded safe boundary" 

225 next_step = current_step 

226 action = record["resume"]["action"] 

227 

228 if live_pr_state == "merged" or state["merge"] == "merged": 

229 if state["capture"] in {"not-started", "failed"}: 

230 status = "needs-capture" 

231 next_step = "s11" 

232 action = _IDEMPOTENT_STEPS["s11"] 

233 reason = "merge is complete; resume at capture without repeating merge" 

234 elif state["close"] in {"not-started", "failed"}: 

235 status = "needs-close" 

236 next_step = "s12" 

237 action = _IDEMPOTENT_STEPS["s12"] 

238 reason = "capture is complete or skipped; resume at close" 

239 else: 

240 status = "complete" 

241 can_resume = False 

242 next_step = None 

243 action = None 

244 reason = "merge, capture, and close are already complete" 

245 elif live_pr_state == "closed" and identifiers.get("pull_request"): 

246 status = "ambiguous" 

247 can_resume = False 

248 reason = "checkpoint references a pull request that live state reports closed" 

249 warnings.append("reopen the PR or reconcile the checkpoint before resuming") 

250 elif live_worktree_state == "missing" and identifiers.get("worktree"): 

251 status = "ambiguous" 

252 can_resume = False 

253 reason = "checkpoint references a worktree that live state reports missing" 

254 warnings.append("recreate or reconcile the recorded worktree before resuming") 

255 elif live_pr_state == "missing" and identifiers.get("pull_request"): 

256 status = "ambiguous" 

257 can_resume = False 

258 reason = "checkpoint references a pull request that live state reports missing" 

259 warnings.append("verify whether the PR was deleted or the checkpoint is stale") 

260 elif current_step == "s6": 

261 status = "waiting-on-ci" 

262 reason = "resume by rechecking CI before review, test, or merge" 

263 elif identifiers.get("pull_request"): 

264 status = "pr-open" 

265 reason = "resume against the recorded pull request after refreshing live state" 

266 

267 return { 

268 "status": status, 

269 "can_resume": can_resume, 

270 "reason": reason, 

271 "next_step": next_step, 

272 "resume_action": action, 

273 "checkpoint": record, 

274 "reconcile": _live_state(live_pr_state, live_worktree_state), 

275 "warnings": warnings, 

276 } 

277 

278 

279COVERAGE_STATES = ("covered", "missing", "stale-step") 

280 

281 

282def covering_checkpoint( 

283 record: dict[str, Any] | None, 

284 run_id: str, 

285 expected_step: str, 

286) -> dict[str, Any]: 

287 """Decide whether a checkpoint covers ``run_id`` at ``expected_step``. 

288 

289 Pure and deterministic — no I/O. ``record`` is the current checkpoint 

290 (or ``None`` when no checkpoint file exists); ``expected_step`` must be a 

291 backbone step id. A run is *covered* when its checkpoint is for the same 

292 ``run_id`` and the run actually progressed to ``expected_step`` (the 

293 recorded ``current_step`` is at or past it, or the step is in 

294 ``completed_steps``). The result distinguishes: 

295 

296 * ``covered`` — a current checkpoint for this run reached ``expected_step``; 

297 * ``missing`` — no checkpoint, or a checkpoint for a different run; 

298 * ``stale-step`` — a checkpoint for this run that has not reached the step. 

299 """ 

300 if expected_step not in STEP_IDS: 

301 raise CheckpointError("expected_step must be a backbone step id") 

302 expected_index = STEP_IDS.index(expected_step) 

303 if record is None: 

304 return { 

305 "status": "missing", 

306 "covered": False, 

307 "run_id": run_id, 

308 "expected_step": expected_step, 

309 "checkpoint_run_id": None, 

310 "checkpoint_step": None, 

311 "reason": f"no current checkpoint for run {run_id} at step {expected_step}", 

312 } 

313 validate_checkpoint(record) 

314 checkpoint_run_id = record.get("run_id") 

315 position = record["position"] 

316 current_step = position["current_step"] 

317 completed = position.get("completed_steps", []) 

318 base = { 

319 "run_id": run_id, 

320 "expected_step": expected_step, 

321 "checkpoint_run_id": checkpoint_run_id, 

322 "checkpoint_step": current_step, 

323 } 

324 if checkpoint_run_id != run_id: 

325 return { 

326 **base, 

327 "status": "missing", 

328 "covered": False, 

329 "reason": ( 

330 f"no current checkpoint for run {run_id} at step {expected_step} " 

331 f"(checkpoint is for run {checkpoint_run_id})" 

332 ), 

333 } 

334 reached = expected_step in completed or STEP_IDS.index(current_step) >= expected_index 

335 if not reached: 

336 return { 

337 **base, 

338 "status": "stale-step", 

339 "covered": False, 

340 "reason": ( 

341 f"no current checkpoint for run {run_id} at step {expected_step} " 

342 f"(run is at {current_step})" 

343 ), 

344 } 

345 return { 

346 **base, 

347 "status": "covered", 

348 "covered": True, 

349 "reason": f"checkpoint for run {run_id} reached step {expected_step}", 

350 } 

351 

352 

353def find_orphans( 

354 *, 

355 live_branches: list[str] | None = None, 

356 live_pull_requests: list[int] | None = None, 

357 checkpoint_record: dict[str, Any] | None = None, 

358 ledger_records: list[dict[str, Any]] | None = None, 

359) -> dict[str, Any]: 

360 """Return live branches/PRs with no covering checkpoint or ledger record. 

361 

362 Pure and deterministic — no I/O. An *orphan* is a branch or pull request 

363 that exists on the git/transport side but is referenced by neither the 

364 current checkpoint nor any ledger record, i.e. keel has no covering state 

365 for it (the GAP-13 hazard from the git side). Advisory only. 

366 """ 

367 known_branches, known_prs = _known_references(checkpoint_record, ledger_records) 

368 orphan_branches = [ 

369 branch for branch in (live_branches or []) if branch not in known_branches 

370 ] 

371 orphan_prs = [pr for pr in (live_pull_requests or []) if pr not in known_prs] 

372 return { 

373 "branches": orphan_branches, 

374 "pull_requests": orphan_prs, 

375 "orphan_count": len(orphan_branches) + len(orphan_prs), 

376 "known_branches": sorted(known_branches), 

377 "known_pull_requests": sorted(known_prs), 

378 } 

379 

380 

381def _known_references( 

382 checkpoint_record: dict[str, Any] | None, 

383 ledger_records: list[dict[str, Any]] | None, 

384) -> tuple[set[str], set[int]]: 

385 branches: set[str] = set() 

386 prs: set[int] = set() 

387 if checkpoint_record is not None: 

388 identifiers = checkpoint_record.get("identifiers", {}) 

389 branch = identifiers.get("branch") 

390 if isinstance(branch, str) and branch: 

391 branches.add(branch) 

392 pull_request = identifiers.get("pull_request") 

393 if isinstance(pull_request, int): 

394 prs.add(pull_request) 

395 for record in ledger_records or []: 

396 git = record.get("git") if isinstance(record.get("git"), dict) else {} 

397 branch = git.get("branch") 

398 if isinstance(branch, str) and branch: 

399 branches.add(branch) 

400 pr_entry = record.get("pull_request") 

401 if isinstance(pr_entry, dict): 

402 number = pr_entry.get("number") 

403 if isinstance(number, int): 

404 prs.add(number) 

405 return branches, prs 

406 

407 

408def validate_checkpoint(record: Any) -> None: 

409 """Validate the stable checkpoint shape.""" 

410 if not isinstance(record, dict): 

411 raise CheckpointError("checkpoint must be an object") 

412 if record.get("schema_version") != CHECKPOINT_SCHEMA_VERSION: 

413 raise CheckpointError("unsupported schema_version") 

414 if record.get("record_type") != RECORD_TYPE_RUN_CHECKPOINT: 

415 raise CheckpointError("unsupported record_type") 

416 if record.get("command") not in COMMANDS: 

417 raise CheckpointError("unsupported command") 

418 queue = record.get("queue") 

419 if not isinstance(queue, dict) or not isinstance(queue.get("issues"), list): 

420 raise CheckpointError("queue must be an object with issues") 

421 position = record.get("position") 

422 if not isinstance(position, dict) or position.get("current_step") not in _IDEMPOTENT_STEPS: 

423 raise CheckpointError("unsupported current_step") 

424 completed = position.get("completed_steps") 

425 if not isinstance(completed, list) or any(step not in STEP_IDS for step in completed): 

426 raise CheckpointError("unsupported completed_steps") 

427 identifiers = record.get("identifiers") 

428 if not isinstance(identifiers, dict) or "base_branch" not in identifiers: 

429 raise CheckpointError("identifiers must include base_branch") 

430 state = record.get("state") 

431 if not isinstance(state, dict): 

432 raise CheckpointError("state must be an object") 

433 if state.get("merge") not in MERGE_STATES: 

434 raise CheckpointError("unsupported merge state") 

435 if state.get("capture") not in CAPTURE_STATES: 

436 raise CheckpointError("unsupported capture state") 

437 if state.get("close") not in CLOSE_STATES: 

438 raise CheckpointError("unsupported close state") 

439 resume = record.get("resume") 

440 if not isinstance(resume, dict): 

441 raise CheckpointError("resume must be an object") 

442 if resume.get("action") != _IDEMPOTENT_STEPS[position["current_step"]]: 

443 raise CheckpointError("resume action does not match current_step") 

444 if not isinstance(resume.get("repeat_policy"), dict): 

445 raise CheckpointError("resume repeat_policy must be an object") 

446 

447 

448def _live_state(live_pr_state: str, live_worktree_state: str) -> dict[str, str]: 

449 return { 

450 "pull_request": live_pr_state, 

451 "worktree": live_worktree_state, 

452 "source": "live-git-github-state-or-adapter-supplied-dry-run-state", 

453 }