Coverage for src/keel/captureverify.py: 100%
51 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Pure capture reconciliation: cross-check merged PRs against the ledger.
3``keel capture-verify`` historically trusted the agent to pass every merged PR
4via ``--merged-pr`` and to self-report ``--capture-status applied`` with no
5proof. This module hardens that accounting with three additive checks, all
6pure data in / pure findings out (no network, subprocess, clock, or random):
81. **missing-marker** — every PR in the derived merged set must have a valid
9 capture marker in the ledger. A merged PR with no marker is a finding, so a
10 merged PR can no longer silently vanish from capture accounting by being
11 omitted from the args.
122. **applied-without-artifact** — an ``applied`` capture must carry a durable
13 capture artifact reference (path/hash) in its ledger record. ``applied``
14 with no artifact is a finding. ``deferred``/``skipped`` need no artifact.
153. **reviewer-count-mismatch** — the ledger record's ``actors.reviewers`` count
16 for a PR is cross-checked against the evidence-side review-verdict count for
17 that PR. Recording more reviewers than verdicts posted is a finding.
19The CLI does the I/O (transport query for merged PRs, marker/verdict fetch) and
20feeds the results here. The base pass/fail semantics of ``verify_session`` are
21preserved; these are strictly additional findings.
22"""
24from __future__ import annotations
26from typing import Any
28from . import capture, ledger
30RECONCILE_SCHEMA_VERSION = "keel.capture-verify-reconcile.v1"
32FINDING_MISSING_MARKER = "missing-marker"
33FINDING_APPLIED_WITHOUT_ARTIFACT = "applied-without-artifact"
34FINDING_REVIEWER_COUNT_MISMATCH = "reviewer-count-mismatch"
37def reconcile(
38 records: list[dict[str, Any]],
39 merged_prs: list[int] | tuple[int, ...],
40 *,
41 verdict_counts: dict[int, int] | None = None,
42) -> dict[str, Any]:
43 """Cross-check the derived merged-PR set against the ledger.
45 ``records`` is the run ledger. ``merged_prs`` is the authoritative merged-PR
46 set (derived from the transport, not the agent's args). ``verdict_counts``
47 maps a PR number to the evidence-side review-verdict count; a PR omitted from
48 the mapping skips the reviewer cross-check (the count is unknown offline, so
49 it degrades to advisory rather than failing).
51 Returns a structured report: per-PR results plus a flat findings list and a
52 summary. ``ok`` is true only when no findings were raised.
53 """
54 counts = verdict_counts or {}
55 results = [_reconcile_pr(records, pr, counts) for pr in merged_prs]
56 findings = [finding for result in results for finding in result["findings"]]
57 by_type = {
58 FINDING_MISSING_MARKER: 0,
59 FINDING_APPLIED_WITHOUT_ARTIFACT: 0,
60 FINDING_REVIEWER_COUNT_MISMATCH: 0,
61 }
62 for finding in findings:
63 by_type[finding["type"]] += 1
64 return {
65 "schema_version": RECONCILE_SCHEMA_VERSION,
66 "ok": not findings,
67 "merged_prs": list(merged_prs),
68 "results": results,
69 "findings": findings,
70 "summary": {
71 "checked": len(results),
72 "findings": len(findings),
73 **by_type,
74 },
75 }
78def _reconcile_pr(
79 records: list[dict[str, Any]],
80 pr_number: int,
81 verdict_counts: dict[int, int],
82) -> dict[str, Any]:
83 verification = capture._verify_pr(records, pr_number)
84 record = ledger.latest_ship_run_for_pr(records, pr_number)
85 findings: list[dict[str, Any]] = []
87 if not verification["ok"] and verification["status"] == "missing":
88 findings.append(
89 _finding(
90 FINDING_MISSING_MARKER,
91 pr_number,
92 "merged PR has no capture marker in the ledger",
93 )
94 )
96 artifact = _capture_artifact(record)
97 if verification.get("status") == "applied" and not artifact:
98 findings.append(
99 _finding(
100 FINDING_APPLIED_WITHOUT_ARTIFACT,
101 pr_number,
102 "capture status is applied but no capture artifact was recorded",
103 )
104 )
106 recorded_reviewers = _recorded_reviewer_count(record)
107 verdicts = verdict_counts.get(pr_number)
108 if verdicts is not None and recorded_reviewers > verdicts:
109 findings.append(
110 _finding(
111 FINDING_REVIEWER_COUNT_MISMATCH,
112 pr_number,
113 f"ledger records {recorded_reviewers} reviewer(s) but only "
114 f"{verdicts} review verdict(s) were posted",
115 recorded_reviewers=recorded_reviewers,
116 posted_verdicts=verdicts,
117 )
118 )
120 return {
121 "pr": pr_number,
122 "ok": not findings,
123 "marker_status": verification["status"],
124 "marker": verification.get("marker"),
125 "artifact": artifact,
126 "recorded_reviewers": recorded_reviewers,
127 "posted_verdicts": verdicts,
128 "findings": findings,
129 }
132def _finding(finding_type: str, pr_number: int, reason: str, **extra: Any) -> dict[str, Any]:
133 finding = {"type": finding_type, "pr": pr_number, "reason": reason}
134 finding.update(extra)
135 return finding
138def _capture_artifact(record: dict[str, Any] | None) -> str | None:
139 if not isinstance(record, dict):
140 return None
141 block = record.get("capture")
142 if not isinstance(block, dict):
143 return None
144 artifact = block.get("artifact")
145 return artifact if isinstance(artifact, str) and artifact.strip() else None
148def _recorded_reviewer_count(record: dict[str, Any] | None) -> int:
149 if not isinstance(record, dict):
150 return 0
151 actors = record.get("actors")
152 if not isinstance(actors, dict):
153 return 0
154 reviewers = actors.get("reviewers")
155 if not isinstance(reviewers, list):
156 return 0
157 return sum(1 for reviewer in reviewers if isinstance(reviewer, str) and reviewer.strip())