Coverage for src/keel/ledger.py: 100%
183 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Structured run ledger helpers for keel workflows."""
3from __future__ import annotations
5import json
6from pathlib import Path
7from typing import Any
9from . import capture, redaction
10from . import config as cfg
12LEDGER_SCHEMA_VERSION = "keel.run-ledger.v1"
13CAPTURE_HEALTH_SCHEMA_VERSION = "keel.capture-health.v1"
14DEFAULT_LEDGER_PATH = ".keel/state/run-ledger.jsonl"
15RECORD_TYPE_SHIP_RUN = "ship_run"
18class LedgerError(ValueError):
19 """Raised when a ledger file cannot be decoded as the stable schema."""
22def ledger_contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]:
23 """Return the project-neutral ledger storage and schema contract."""
24 path, source = configured_ledger_path(config)
25 return {
26 "schema_version": LEDGER_SCHEMA_VERSION,
27 "format": "jsonl",
28 "path": path,
29 "path_source": source,
30 "missing_handling": "treat-as-empty",
31 "append_owner": ["ship"],
32 "readers": ["morning", "wrap", "overnight", "capture-verification", "ledger"],
33 "consumer_neutral": True,
34 "capture_redaction": redaction.contract_as_dict(config),
35 "capture_contract": capture.contract_as_dict(config),
36 "capture_health": capture_health_contract_as_dict(),
37 "record_types": [RECORD_TYPE_SHIP_RUN],
38 }
41def capture_health_contract_as_dict() -> dict[str, Any]:
42 """Return the ledger-derived capture-health summary contract."""
43 return {
44 "schema_version": CAPTURE_HEALTH_SCHEMA_VERSION,
45 "source": "run-ledger ship_run records",
46 "readers": ["morning", "wrap", "status", "ledger"],
47 "consumer_neutral": True,
48 "missing_ledger_handling": "clean-empty-history",
49 "dry_run": {
50 "no_mutations": True,
51 "safe_reconcile_actions_only": True,
52 },
53 "states": ["clean", "needs-reconcile"],
54 "item_statuses": ["applied", "deferred", "skipped", "missing-marker"],
55 }
58def configured_ledger_path(config: cfg.ProjectConfig) -> tuple[str, str]:
59 """Return the configured ledger path and the config source that supplied it."""
60 pack = config.policy_pack or {}
61 reports = pack.get("reports") if isinstance(pack.get("reports"), dict) else {}
62 value = reports.get("run_ledger")
63 if isinstance(value, str) and value.strip():
64 return value, "policy_pack.reports.run_ledger"
65 return DEFAULT_LEDGER_PATH, "default"
68def resolve_path(root: str | Path, config: cfg.ProjectConfig) -> Path:
69 """Resolve the configured ledger path under ``root`` and reject escapes."""
70 raw, _ = configured_ledger_path(config)
71 path = Path(raw)
72 if path.is_absolute():
73 raise LedgerError("run ledger path must be relative to the project root")
74 root_path = Path(root).resolve()
75 resolved = (root_path / path).resolve()
76 try:
77 resolved.relative_to(root_path)
78 except ValueError as exc:
79 raise LedgerError("run ledger path escapes the project root") from exc
80 return resolved
83def build_ship_run_record(
84 *,
85 command: str,
86 base_branch: str,
87 changed_files: list[str],
88 declared_files: list[str] | None = None,
89 outcomes: list[Any],
90 verdict: Any,
91 assessment: Any,
92 issue_intake: dict[str, Any] | None = None,
93 target: str | None = None,
94 run_id: str | None = None,
95 issue_number: int | None = None,
96 pr_number: int | None = None,
97 branch: str | None = None,
98 head_sha: str | None = None,
99 capture_status: str | None = None,
100 capture_reason: str | None = None,
101 capture_artifact: str | None = None,
102 issue_title: str | None = None,
103 issue_labels: list[str] | tuple[str, ...] = (),
104 existing_records: list[dict[str, Any]] | None = None,
105 config: cfg.ProjectConfig | None = None,
106 implementer: str | None = None,
107 reviewer_agents: list[str] | None = None,
108 tester: str | None = None,
109 host_agent: str | None = None,
110 transport: str | None = None,
111 profile: str | None = None,
112 jury_mode: str | None = None,
113 consent_status: str | None = None,
114 consent_scopes: list[str] | tuple[str, ...] | None = None,
115 run_controls: dict[str, Any] | None = None,
116) -> dict[str, Any]:
117 """Build one deterministic consumer-neutral ship ledger record."""
118 return {
119 "schema_version": LEDGER_SCHEMA_VERSION,
120 "record_type": RECORD_TYPE_SHIP_RUN,
121 "command": command,
122 "run_id": run_id,
123 "target": target,
124 "issue": {"number": issue_number},
125 "pull_request": {"number": pr_number},
126 "git": {
127 "base_branch": base_branch,
128 "branch": branch,
129 "head_sha": head_sha,
130 },
131 "changes": {
132 "file_count": len(changed_files),
133 "files": list(changed_files),
134 },
135 "declared": _declared_block(declared_files),
136 "gates": [
137 {
138 "gate": outcome.gate,
139 "ok": outcome.ok,
140 "skipped": outcome.skipped,
141 "error": outcome.error,
142 "finding_count": len(outcome.findings),
143 }
144 for outcome in outcomes
145 ],
146 "verdict": {
147 "blocked": verdict.blocked,
148 "counts": dict(verdict.counts),
149 },
150 "assessment": {
151 "tier": assessment.tier,
152 "reviewers": assessment.reviewers,
153 "window_open": assessment.window_open,
154 "ci_ok": assessment.ci_ok,
155 "merge": {
156 "action": assessment.merge.action,
157 "reason": assessment.merge.reason,
158 },
159 "halted": assessment.halted,
160 "bypassed_window": assessment.bypassed_window,
161 },
162 "actors": {
163 "implementer": implementer,
164 "reviewers": list(reviewer_agents or ()),
165 "tester": tester,
166 },
167 "run_context": _run_context(
168 host_agent=host_agent,
169 transport=transport,
170 profile=profile,
171 jury_mode=jury_mode,
172 consent_status=consent_status,
173 consent_scopes=consent_scopes,
174 ),
175 "run_controls": run_controls,
176 "issue_intake": issue_intake,
177 "capture": capture.record_marker(
178 pr_number=pr_number,
179 status=capture_status,
180 reason=capture_reason,
181 artifact=capture_artifact,
182 title=issue_title,
183 labels=issue_labels,
184 changed_files=changed_files,
185 existing_records=existing_records or [],
186 config=config,
187 ),
188 }
191def _declared_block(declared_files: list[str] | None) -> dict[str, Any] | None:
192 """Build the implementer's declared-scope block, or ``None`` when unset.
194 ``declared_files`` is the implementer's contract of which files the change is
195 *supposed* to touch (distinct from the observed ``changes`` diff). When the
196 implementer does not declare a scope, the block is omitted so readers can
197 degrade to advisory back-compat behavior.
198 """
199 if declared_files is None:
200 return None
201 files = [str(path) for path in declared_files]
202 return {"file_count": len(files), "files": files}
205def declared_files_for_record(record: dict[str, Any]) -> list[str] | None:
206 """Return the implementer's declared file list from a ship_run ``record``.
208 Returns ``None`` when no declared scope was recorded (a missing or malformed
209 ``declared`` block), letting ``scope-verify`` degrade to an advisory pass.
210 """
211 declared = record.get("declared")
212 if not isinstance(declared, dict):
213 return None
214 files = declared.get("files")
215 if not isinstance(files, list):
216 return None
217 return [str(path) for path in files]
220def _run_context(
221 *,
222 host_agent: str | None,
223 transport: str | None,
224 profile: str | None,
225 jury_mode: str | None,
226 consent_status: str | None,
227 consent_scopes: list[str] | tuple[str, ...] | None,
228) -> dict[str, Any]:
229 """Build the deterministic consumer-neutral preflight run-context block.
231 Every field is optional; a missing scalar degrades to ``None`` so the
232 closure renderer can present ``unknown``/``none`` without a schema change.
233 Consent is a small summary: a status and the approved mutation scopes,
234 reusing the operator/approve-scope inputs already resolved by the caller.
235 """
236 scopes = [str(scope) for scope in (consent_scopes or ()) if str(scope).strip()]
237 return {
238 "host_agent": host_agent if _nonblank(host_agent) else None,
239 "transport": transport if _nonblank(transport) else None,
240 "profile": profile if _nonblank(profile) else None,
241 "jury_mode": jury_mode if _nonblank(jury_mode) else None,
242 "consent": {
243 "status": consent_status if _nonblank(consent_status) else None,
244 "scopes": scopes,
245 },
246 }
249def _nonblank(value: Any) -> bool:
250 return isinstance(value, str) and bool(value.strip())
253def encode_record(record: dict[str, Any]) -> str:
254 """Encode one ledger record as stable JSONL."""
255 _validate_record(record)
256 return json.dumps(record, sort_keys=True, separators=(",", ":")) + "\n"
259def parse_records(text: str) -> list[dict[str, Any]]:
260 """Parse ledger JSONL text into validated records."""
261 records: list[dict[str, Any]] = []
262 for line_number, raw in enumerate(text.splitlines(), start=1):
263 if not raw.strip():
264 continue
265 try:
266 record = json.loads(raw)
267 except json.JSONDecodeError as exc:
268 raise LedgerError(f"line {line_number}: invalid JSON") from exc
269 _validate_record(record, line_number=line_number)
270 records.append(record)
271 return records
274def read_records(path: str | Path) -> list[dict[str, Any]]:
275 """Read a ledger file; a missing ledger is a valid empty history."""
276 ledger_path = Path(path)
277 if not ledger_path.exists():
278 return []
279 return parse_records(ledger_path.read_text(encoding="utf-8"))
282def latest_ship_run_for_pr(
283 records: list[dict[str, Any]],
284 pr_number: int,
285) -> dict[str, Any] | None:
286 """Return the last ship_run record whose pull_request matches ``pr_number``.
288 Records are appended in chronological order, so the last match is the most
289 recent ship run for that PR. Returns ``None`` when no record matches.
290 """
291 match: dict[str, Any] | None = None
292 for record in records:
293 if record.get("record_type") != RECORD_TYPE_SHIP_RUN:
294 continue
295 pull_request = record.get("pull_request")
296 number = pull_request.get("number") if isinstance(pull_request, dict) else None
297 if number == pr_number:
298 match = record
299 return match
302def record_gates_passed(record: dict[str, Any]) -> bool:
303 """Return whether a ship_run record's gates count as a clean pass.
305 A pass requires that the run was not blocked by findings and that every
306 recorded gate either ran clean (``ok``) or was deliberately skipped, with no
307 gate reporting an error. A missing or malformed ``gates``/``verdict`` block
308 degrades to "not a pass" so a corrupt record can never authorize a merge.
309 """
310 verdict = record.get("verdict")
311 if not isinstance(verdict, dict) or verdict.get("blocked") is not False:
312 return False
313 gates = record.get("gates")
314 if not isinstance(gates, list) or not gates:
315 return False
316 for gate in gates:
317 if not isinstance(gate, dict):
318 return False
319 if gate.get("error"):
320 return False
321 if not (gate.get("ok") is True or gate.get("skipped") is True):
322 return False
323 return True
326def gates_pass_for_head(
327 records: list[dict[str, Any]],
328 pr_number: int,
329 head_sha: str,
330) -> tuple[bool, dict[str, Any] | None]:
331 """Find a passing gates run recorded against ``head_sha`` for ``pr_number``.
333 Returns ``(matched, record)``. ``matched`` is ``True`` only when some
334 ship_run record for the PR carries the exact current ``head_sha`` *and* its
335 gates passed (see :func:`record_gates_passed`). The most recent matching
336 record is returned. A blank ``head_sha`` never matches — an unknown head
337 must not be authorized by a stale green run. This is a pure function: it
338 reads only its arguments and performs no I/O.
339 """
340 if not isinstance(head_sha, str) or not head_sha.strip():
341 return False, None
342 match: dict[str, Any] | None = None
343 for record in records:
344 if record.get("record_type") != RECORD_TYPE_SHIP_RUN:
345 continue
346 pull_request = record.get("pull_request")
347 number = pull_request.get("number") if isinstance(pull_request, dict) else None
348 if number != pr_number:
349 continue
350 git = record.get("git")
351 record_sha = git.get("head_sha") if isinstance(git, dict) else None
352 if record_sha != head_sha:
353 continue
354 if record_gates_passed(record):
355 match = record
356 return (match is not None), match
359def capture_health_summary(records: list[dict[str, Any]]) -> dict[str, Any]:
360 """Summarize capture visibility for morning, wrap, status, and ledger readers."""
361 items = [_capture_health_item(record) for record in records]
362 counts = {
363 "applied": 0,
364 "marker_only": 0,
365 "create_learning": 0,
366 "duplicate_learning": 0,
367 "deferred": 0,
368 "skipped": 0,
369 "missing_marker": 0,
370 "needs_reconcile": 0,
371 }
372 skipped_by_reason: dict[str, int] = {}
373 for item in items:
374 status = item["status"]
375 learning = item["learning_decision"]
376 if status == "applied":
377 counts["applied"] += 1
378 elif status == "deferred":
379 counts["deferred"] += 1
380 elif status == "skipped":
381 counts["skipped"] += 1
382 skipped_by_reason[item["reason"] or "unspecified"] = (
383 skipped_by_reason.get(item["reason"] or "unspecified", 0) + 1
384 )
385 else:
386 counts["missing_marker"] += 1
387 if learning == "marker-only":
388 counts["marker_only"] += 1
389 elif learning == "create-learning":
390 counts["create_learning"] += 1
391 elif learning == "duplicate":
392 counts["duplicate_learning"] += 1
393 if item["needs_reconcile"]:
394 counts["needs_reconcile"] += 1
395 return {
396 "schema_version": CAPTURE_HEALTH_SCHEMA_VERSION,
397 "status": "needs-reconcile" if counts["needs_reconcile"] else "clean",
398 "record_count": len(records),
399 "counts": counts,
400 "skipped_by_reason": dict(sorted(skipped_by_reason.items())),
401 "items": items,
402 "reconcile_actions": [
403 action for item in items for action in item["reconcile_actions"]
404 ],
405 "dry_run": {
406 "no_mutations": True,
407 "description": "Morning and wrap surface these actions; they do not mutate "
408 "ledger, GitHub, or capture destinations.",
409 },
410 }
413def append_record(path: str | Path, record: dict[str, Any]) -> None:
414 """Append one validated JSONL record, creating parent directories as needed."""
415 ledger_path = Path(path)
416 ledger_path.parent.mkdir(parents=True, exist_ok=True)
417 with ledger_path.open("a", encoding="utf-8") as handle:
418 handle.write(encode_record(record))
421def sanitize_record(
422 record: dict[str, Any],
423 config: cfg.ProjectConfig | None = None,
424) -> dict[str, Any]:
425 """Apply capture redaction before a ledger record becomes durable."""
426 result = redaction.sanitize(record, redaction.policy_from_config(config))
427 sanitized = dict(result.value)
428 sanitized["redaction"] = result.audit
429 return sanitized
432def _validate_record(record: Any, *, line_number: int | None = None) -> None:
433 prefix = f"line {line_number}: " if line_number is not None else ""
434 if not isinstance(record, dict):
435 raise LedgerError(f"{prefix}record must be an object")
436 if record.get("schema_version") != LEDGER_SCHEMA_VERSION:
437 raise LedgerError(f"{prefix}unsupported schema_version")
438 if record.get("record_type") != RECORD_TYPE_SHIP_RUN:
439 raise LedgerError(f"{prefix}unsupported record_type")
442def _capture_health_item(record: dict[str, Any]) -> dict[str, Any]:
443 block = record.get("capture") if isinstance(record.get("capture"), dict) else {}
444 status = block.get("status")
445 marker = block.get("marker")
446 reason = block.get("marker_reason") or block.get("reason")
447 learning = block.get("learning") if isinstance(block.get("learning"), dict) else {}
448 item_status = _capture_health_status(status, marker)
449 needs_reconcile = item_status in {"missing-marker", "deferred"}
450 pr_number = (record.get("pull_request") or {}).get("number")
451 item = {
452 "run_id": record.get("run_id"),
453 "issue": (record.get("issue") or {}).get("number"),
454 "pull_request": pr_number,
455 "status": item_status,
456 "capture_status": status,
457 "marker": marker if isinstance(marker, str) and marker.strip() else None,
458 "reason": reason if isinstance(reason, str) and reason.strip() else None,
459 "learning_decision": learning.get("decision"),
460 "learning_reason": learning.get("reason"),
461 "needs_reconcile": needs_reconcile,
462 "reconcile_actions": [],
463 }
464 if needs_reconcile:
465 item["reconcile_actions"].append(_capture_reconcile_action(pr_number, item_status))
466 return item
469def _capture_health_status(status: Any, marker: Any) -> str:
470 if not isinstance(marker, str) or not marker.strip():
471 return "missing-marker"
472 if status == "deferred":
473 return "deferred"
474 if status == "skipped":
475 return "skipped"
476 return "applied"
479def _capture_reconcile_action(pr_number: Any, status: str) -> dict[str, Any]:
480 return {
481 "type": "capture-reconcile",
482 "reason": status,
483 "pr": pr_number if isinstance(pr_number, int) else None,
484 "command": (
485 f"keel capture-reconcile .keel/project.yaml --root . --merged-pr {pr_number}"
486 if isinstance(pr_number, int)
487 else "keel capture-reconcile .keel/project.yaml --root ."
488 ),
489 "dry_run": True,
490 }