Coverage for src/keel/ledger.py: 100%

183 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Structured run ledger helpers for keel workflows.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from pathlib import Path 

7from typing import Any 

8 

9from . import capture, redaction 

10from . import config as cfg 

11 

12LEDGER_SCHEMA_VERSION = "keel.run-ledger.v1" 

13CAPTURE_HEALTH_SCHEMA_VERSION = "keel.capture-health.v1" 

14DEFAULT_LEDGER_PATH = ".keel/state/run-ledger.jsonl" 

15RECORD_TYPE_SHIP_RUN = "ship_run" 

16 

17 

18class LedgerError(ValueError): 

19 """Raised when a ledger file cannot be decoded as the stable schema.""" 

20 

21 

22def ledger_contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]: 

23 """Return the project-neutral ledger storage and schema contract.""" 

24 path, source = configured_ledger_path(config) 

25 return { 

26 "schema_version": LEDGER_SCHEMA_VERSION, 

27 "format": "jsonl", 

28 "path": path, 

29 "path_source": source, 

30 "missing_handling": "treat-as-empty", 

31 "append_owner": ["ship"], 

32 "readers": ["morning", "wrap", "overnight", "capture-verification", "ledger"], 

33 "consumer_neutral": True, 

34 "capture_redaction": redaction.contract_as_dict(config), 

35 "capture_contract": capture.contract_as_dict(config), 

36 "capture_health": capture_health_contract_as_dict(), 

37 "record_types": [RECORD_TYPE_SHIP_RUN], 

38 } 

39 

40 

41def capture_health_contract_as_dict() -> dict[str, Any]: 

42 """Return the ledger-derived capture-health summary contract.""" 

43 return { 

44 "schema_version": CAPTURE_HEALTH_SCHEMA_VERSION, 

45 "source": "run-ledger ship_run records", 

46 "readers": ["morning", "wrap", "status", "ledger"], 

47 "consumer_neutral": True, 

48 "missing_ledger_handling": "clean-empty-history", 

49 "dry_run": { 

50 "no_mutations": True, 

51 "safe_reconcile_actions_only": True, 

52 }, 

53 "states": ["clean", "needs-reconcile"], 

54 "item_statuses": ["applied", "deferred", "skipped", "missing-marker"], 

55 } 

56 

57 

58def configured_ledger_path(config: cfg.ProjectConfig) -> tuple[str, str]: 

59 """Return the configured ledger path and the config source that supplied it.""" 

60 pack = config.policy_pack or {} 

61 reports = pack.get("reports") if isinstance(pack.get("reports"), dict) else {} 

62 value = reports.get("run_ledger") 

63 if isinstance(value, str) and value.strip(): 

64 return value, "policy_pack.reports.run_ledger" 

65 return DEFAULT_LEDGER_PATH, "default" 

66 

67 

68def resolve_path(root: str | Path, config: cfg.ProjectConfig) -> Path: 

69 """Resolve the configured ledger path under ``root`` and reject escapes.""" 

70 raw, _ = configured_ledger_path(config) 

71 path = Path(raw) 

72 if path.is_absolute(): 

73 raise LedgerError("run ledger path must be relative to the project root") 

74 root_path = Path(root).resolve() 

75 resolved = (root_path / path).resolve() 

76 try: 

77 resolved.relative_to(root_path) 

78 except ValueError as exc: 

79 raise LedgerError("run ledger path escapes the project root") from exc 

80 return resolved 

81 

82 

83def build_ship_run_record( 

84 *, 

85 command: str, 

86 base_branch: str, 

87 changed_files: list[str], 

88 declared_files: list[str] | None = None, 

89 outcomes: list[Any], 

90 verdict: Any, 

91 assessment: Any, 

92 issue_intake: dict[str, Any] | None = None, 

93 target: str | None = None, 

94 run_id: str | None = None, 

95 issue_number: int | None = None, 

96 pr_number: int | None = None, 

97 branch: str | None = None, 

98 head_sha: str | None = None, 

99 capture_status: str | None = None, 

100 capture_reason: str | None = None, 

101 capture_artifact: str | None = None, 

102 issue_title: str | None = None, 

103 issue_labels: list[str] | tuple[str, ...] = (), 

104 existing_records: list[dict[str, Any]] | None = None, 

105 config: cfg.ProjectConfig | None = None, 

106 implementer: str | None = None, 

107 reviewer_agents: list[str] | None = None, 

108 tester: str | None = None, 

109 host_agent: str | None = None, 

110 transport: str | None = None, 

111 profile: str | None = None, 

112 jury_mode: str | None = None, 

113 consent_status: str | None = None, 

114 consent_scopes: list[str] | tuple[str, ...] | None = None, 

115 run_controls: dict[str, Any] | None = None, 

116) -> dict[str, Any]: 

117 """Build one deterministic consumer-neutral ship ledger record.""" 

118 return { 

119 "schema_version": LEDGER_SCHEMA_VERSION, 

120 "record_type": RECORD_TYPE_SHIP_RUN, 

121 "command": command, 

122 "run_id": run_id, 

123 "target": target, 

124 "issue": {"number": issue_number}, 

125 "pull_request": {"number": pr_number}, 

126 "git": { 

127 "base_branch": base_branch, 

128 "branch": branch, 

129 "head_sha": head_sha, 

130 }, 

131 "changes": { 

132 "file_count": len(changed_files), 

133 "files": list(changed_files), 

134 }, 

135 "declared": _declared_block(declared_files), 

136 "gates": [ 

137 { 

138 "gate": outcome.gate, 

139 "ok": outcome.ok, 

140 "skipped": outcome.skipped, 

141 "error": outcome.error, 

142 "finding_count": len(outcome.findings), 

143 } 

144 for outcome in outcomes 

145 ], 

146 "verdict": { 

147 "blocked": verdict.blocked, 

148 "counts": dict(verdict.counts), 

149 }, 

150 "assessment": { 

151 "tier": assessment.tier, 

152 "reviewers": assessment.reviewers, 

153 "window_open": assessment.window_open, 

154 "ci_ok": assessment.ci_ok, 

155 "merge": { 

156 "action": assessment.merge.action, 

157 "reason": assessment.merge.reason, 

158 }, 

159 "halted": assessment.halted, 

160 "bypassed_window": assessment.bypassed_window, 

161 }, 

162 "actors": { 

163 "implementer": implementer, 

164 "reviewers": list(reviewer_agents or ()), 

165 "tester": tester, 

166 }, 

167 "run_context": _run_context( 

168 host_agent=host_agent, 

169 transport=transport, 

170 profile=profile, 

171 jury_mode=jury_mode, 

172 consent_status=consent_status, 

173 consent_scopes=consent_scopes, 

174 ), 

175 "run_controls": run_controls, 

176 "issue_intake": issue_intake, 

177 "capture": capture.record_marker( 

178 pr_number=pr_number, 

179 status=capture_status, 

180 reason=capture_reason, 

181 artifact=capture_artifact, 

182 title=issue_title, 

183 labels=issue_labels, 

184 changed_files=changed_files, 

185 existing_records=existing_records or [], 

186 config=config, 

187 ), 

188 } 

189 

190 

191def _declared_block(declared_files: list[str] | None) -> dict[str, Any] | None: 

192 """Build the implementer's declared-scope block, or ``None`` when unset. 

193 

194 ``declared_files`` is the implementer's contract of which files the change is 

195 *supposed* to touch (distinct from the observed ``changes`` diff). When the 

196 implementer does not declare a scope, the block is omitted so readers can 

197 degrade to advisory back-compat behavior. 

198 """ 

199 if declared_files is None: 

200 return None 

201 files = [str(path) for path in declared_files] 

202 return {"file_count": len(files), "files": files} 

203 

204 

205def declared_files_for_record(record: dict[str, Any]) -> list[str] | None: 

206 """Return the implementer's declared file list from a ship_run ``record``. 

207 

208 Returns ``None`` when no declared scope was recorded (a missing or malformed 

209 ``declared`` block), letting ``scope-verify`` degrade to an advisory pass. 

210 """ 

211 declared = record.get("declared") 

212 if not isinstance(declared, dict): 

213 return None 

214 files = declared.get("files") 

215 if not isinstance(files, list): 

216 return None 

217 return [str(path) for path in files] 

218 

219 

220def _run_context( 

221 *, 

222 host_agent: str | None, 

223 transport: str | None, 

224 profile: str | None, 

225 jury_mode: str | None, 

226 consent_status: str | None, 

227 consent_scopes: list[str] | tuple[str, ...] | None, 

228) -> dict[str, Any]: 

229 """Build the deterministic consumer-neutral preflight run-context block. 

230 

231 Every field is optional; a missing scalar degrades to ``None`` so the 

232 closure renderer can present ``unknown``/``none`` without a schema change. 

233 Consent is a small summary: a status and the approved mutation scopes, 

234 reusing the operator/approve-scope inputs already resolved by the caller. 

235 """ 

236 scopes = [str(scope) for scope in (consent_scopes or ()) if str(scope).strip()] 

237 return { 

238 "host_agent": host_agent if _nonblank(host_agent) else None, 

239 "transport": transport if _nonblank(transport) else None, 

240 "profile": profile if _nonblank(profile) else None, 

241 "jury_mode": jury_mode if _nonblank(jury_mode) else None, 

242 "consent": { 

243 "status": consent_status if _nonblank(consent_status) else None, 

244 "scopes": scopes, 

245 }, 

246 } 

247 

248 

249def _nonblank(value: Any) -> bool: 

250 return isinstance(value, str) and bool(value.strip()) 

251 

252 

253def encode_record(record: dict[str, Any]) -> str: 

254 """Encode one ledger record as stable JSONL.""" 

255 _validate_record(record) 

256 return json.dumps(record, sort_keys=True, separators=(",", ":")) + "\n" 

257 

258 

259def parse_records(text: str) -> list[dict[str, Any]]: 

260 """Parse ledger JSONL text into validated records.""" 

261 records: list[dict[str, Any]] = [] 

262 for line_number, raw in enumerate(text.splitlines(), start=1): 

263 if not raw.strip(): 

264 continue 

265 try: 

266 record = json.loads(raw) 

267 except json.JSONDecodeError as exc: 

268 raise LedgerError(f"line {line_number}: invalid JSON") from exc 

269 _validate_record(record, line_number=line_number) 

270 records.append(record) 

271 return records 

272 

273 

274def read_records(path: str | Path) -> list[dict[str, Any]]: 

275 """Read a ledger file; a missing ledger is a valid empty history.""" 

276 ledger_path = Path(path) 

277 if not ledger_path.exists(): 

278 return [] 

279 return parse_records(ledger_path.read_text(encoding="utf-8")) 

280 

281 

282def latest_ship_run_for_pr( 

283 records: list[dict[str, Any]], 

284 pr_number: int, 

285) -> dict[str, Any] | None: 

286 """Return the last ship_run record whose pull_request matches ``pr_number``. 

287 

288 Records are appended in chronological order, so the last match is the most 

289 recent ship run for that PR. Returns ``None`` when no record matches. 

290 """ 

291 match: dict[str, Any] | None = None 

292 for record in records: 

293 if record.get("record_type") != RECORD_TYPE_SHIP_RUN: 

294 continue 

295 pull_request = record.get("pull_request") 

296 number = pull_request.get("number") if isinstance(pull_request, dict) else None 

297 if number == pr_number: 

298 match = record 

299 return match 

300 

301 

302def record_gates_passed(record: dict[str, Any]) -> bool: 

303 """Return whether a ship_run record's gates count as a clean pass. 

304 

305 A pass requires that the run was not blocked by findings and that every 

306 recorded gate either ran clean (``ok``) or was deliberately skipped, with no 

307 gate reporting an error. A missing or malformed ``gates``/``verdict`` block 

308 degrades to "not a pass" so a corrupt record can never authorize a merge. 

309 """ 

310 verdict = record.get("verdict") 

311 if not isinstance(verdict, dict) or verdict.get("blocked") is not False: 

312 return False 

313 gates = record.get("gates") 

314 if not isinstance(gates, list) or not gates: 

315 return False 

316 for gate in gates: 

317 if not isinstance(gate, dict): 

318 return False 

319 if gate.get("error"): 

320 return False 

321 if not (gate.get("ok") is True or gate.get("skipped") is True): 

322 return False 

323 return True 

324 

325 

326def gates_pass_for_head( 

327 records: list[dict[str, Any]], 

328 pr_number: int, 

329 head_sha: str, 

330) -> tuple[bool, dict[str, Any] | None]: 

331 """Find a passing gates run recorded against ``head_sha`` for ``pr_number``. 

332 

333 Returns ``(matched, record)``. ``matched`` is ``True`` only when some 

334 ship_run record for the PR carries the exact current ``head_sha`` *and* its 

335 gates passed (see :func:`record_gates_passed`). The most recent matching 

336 record is returned. A blank ``head_sha`` never matches — an unknown head 

337 must not be authorized by a stale green run. This is a pure function: it 

338 reads only its arguments and performs no I/O. 

339 """ 

340 if not isinstance(head_sha, str) or not head_sha.strip(): 

341 return False, None 

342 match: dict[str, Any] | None = None 

343 for record in records: 

344 if record.get("record_type") != RECORD_TYPE_SHIP_RUN: 

345 continue 

346 pull_request = record.get("pull_request") 

347 number = pull_request.get("number") if isinstance(pull_request, dict) else None 

348 if number != pr_number: 

349 continue 

350 git = record.get("git") 

351 record_sha = git.get("head_sha") if isinstance(git, dict) else None 

352 if record_sha != head_sha: 

353 continue 

354 if record_gates_passed(record): 

355 match = record 

356 return (match is not None), match 

357 

358 

359def capture_health_summary(records: list[dict[str, Any]]) -> dict[str, Any]: 

360 """Summarize capture visibility for morning, wrap, status, and ledger readers.""" 

361 items = [_capture_health_item(record) for record in records] 

362 counts = { 

363 "applied": 0, 

364 "marker_only": 0, 

365 "create_learning": 0, 

366 "duplicate_learning": 0, 

367 "deferred": 0, 

368 "skipped": 0, 

369 "missing_marker": 0, 

370 "needs_reconcile": 0, 

371 } 

372 skipped_by_reason: dict[str, int] = {} 

373 for item in items: 

374 status = item["status"] 

375 learning = item["learning_decision"] 

376 if status == "applied": 

377 counts["applied"] += 1 

378 elif status == "deferred": 

379 counts["deferred"] += 1 

380 elif status == "skipped": 

381 counts["skipped"] += 1 

382 skipped_by_reason[item["reason"] or "unspecified"] = ( 

383 skipped_by_reason.get(item["reason"] or "unspecified", 0) + 1 

384 ) 

385 else: 

386 counts["missing_marker"] += 1 

387 if learning == "marker-only": 

388 counts["marker_only"] += 1 

389 elif learning == "create-learning": 

390 counts["create_learning"] += 1 

391 elif learning == "duplicate": 

392 counts["duplicate_learning"] += 1 

393 if item["needs_reconcile"]: 

394 counts["needs_reconcile"] += 1 

395 return { 

396 "schema_version": CAPTURE_HEALTH_SCHEMA_VERSION, 

397 "status": "needs-reconcile" if counts["needs_reconcile"] else "clean", 

398 "record_count": len(records), 

399 "counts": counts, 

400 "skipped_by_reason": dict(sorted(skipped_by_reason.items())), 

401 "items": items, 

402 "reconcile_actions": [ 

403 action for item in items for action in item["reconcile_actions"] 

404 ], 

405 "dry_run": { 

406 "no_mutations": True, 

407 "description": "Morning and wrap surface these actions; they do not mutate " 

408 "ledger, GitHub, or capture destinations.", 

409 }, 

410 } 

411 

412 

413def append_record(path: str | Path, record: dict[str, Any]) -> None: 

414 """Append one validated JSONL record, creating parent directories as needed.""" 

415 ledger_path = Path(path) 

416 ledger_path.parent.mkdir(parents=True, exist_ok=True) 

417 with ledger_path.open("a", encoding="utf-8") as handle: 

418 handle.write(encode_record(record)) 

419 

420 

421def sanitize_record( 

422 record: dict[str, Any], 

423 config: cfg.ProjectConfig | None = None, 

424) -> dict[str, Any]: 

425 """Apply capture redaction before a ledger record becomes durable.""" 

426 result = redaction.sanitize(record, redaction.policy_from_config(config)) 

427 sanitized = dict(result.value) 

428 sanitized["redaction"] = result.audit 

429 return sanitized 

430 

431 

432def _validate_record(record: Any, *, line_number: int | None = None) -> None: 

433 prefix = f"line {line_number}: " if line_number is not None else "" 

434 if not isinstance(record, dict): 

435 raise LedgerError(f"{prefix}record must be an object") 

436 if record.get("schema_version") != LEDGER_SCHEMA_VERSION: 

437 raise LedgerError(f"{prefix}unsupported schema_version") 

438 if record.get("record_type") != RECORD_TYPE_SHIP_RUN: 

439 raise LedgerError(f"{prefix}unsupported record_type") 

440 

441 

442def _capture_health_item(record: dict[str, Any]) -> dict[str, Any]: 

443 block = record.get("capture") if isinstance(record.get("capture"), dict) else {} 

444 status = block.get("status") 

445 marker = block.get("marker") 

446 reason = block.get("marker_reason") or block.get("reason") 

447 learning = block.get("learning") if isinstance(block.get("learning"), dict) else {} 

448 item_status = _capture_health_status(status, marker) 

449 needs_reconcile = item_status in {"missing-marker", "deferred"} 

450 pr_number = (record.get("pull_request") or {}).get("number") 

451 item = { 

452 "run_id": record.get("run_id"), 

453 "issue": (record.get("issue") or {}).get("number"), 

454 "pull_request": pr_number, 

455 "status": item_status, 

456 "capture_status": status, 

457 "marker": marker if isinstance(marker, str) and marker.strip() else None, 

458 "reason": reason if isinstance(reason, str) and reason.strip() else None, 

459 "learning_decision": learning.get("decision"), 

460 "learning_reason": learning.get("reason"), 

461 "needs_reconcile": needs_reconcile, 

462 "reconcile_actions": [], 

463 } 

464 if needs_reconcile: 

465 item["reconcile_actions"].append(_capture_reconcile_action(pr_number, item_status)) 

466 return item 

467 

468 

469def _capture_health_status(status: Any, marker: Any) -> str: 

470 if not isinstance(marker, str) or not marker.strip(): 

471 return "missing-marker" 

472 if status == "deferred": 

473 return "deferred" 

474 if status == "skipped": 

475 return "skipped" 

476 return "applied" 

477 

478 

479def _capture_reconcile_action(pr_number: Any, status: str) -> dict[str, Any]: 

480 return { 

481 "type": "capture-reconcile", 

482 "reason": status, 

483 "pr": pr_number if isinstance(pr_number, int) else None, 

484 "command": ( 

485 f"keel capture-reconcile .keel/project.yaml --root . --merged-pr {pr_number}" 

486 if isinstance(pr_number, int) 

487 else "keel capture-reconcile .keel/project.yaml --root ." 

488 ), 

489 "dry_run": True, 

490 }