Coverage for src/keel/dryrunverify.py: 100%
44 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Pure post-hoc dry-run integrity reconciliation.
3``--dry-run`` is a promise: a rehearsal redirects every state-changing
4``git``/``gh`` write to a logged ``DRY-RUN:`` line and mutates nothing (see the
5ship adapter and :func:`keel.cli`'s dry-run paths). But the promise is prose —
6nothing *detects* a leak where a dry run actually created a ledger record, a
7branch, or a PR. That is the audit gap (GAP-14) this module closes.
9The check is a before/after snapshot diff. The operator captures a snapshot of
10the relevant artifacts *before* the dry run, runs the dry run, then this module
11diffs a fresh *after* snapshot against the before one and flags any new artifact
12attributable to the rehearsed run:
14* **new ledger record** — a ship_run ``run_id`` equal to the dry run's id that
15 appears in the after snapshot but not the before one. A dry run must append no
16 record, so any such record is a leak.
17* **new branch** — a branch matching the run's issue (``<type>/issue-<N>`` in the
18 ship-branch vocabulary) present after but not before.
19* **new PR** — a pull request present after but not before. The CLI scopes the
20 PR snapshot to the run's issue, so any new entry is attributable.
22The verdict is **clean** when the after snapshot adds nothing, **violated** when
23it adds at least one attributable artifact. The findings are an integrity
24*detection* — the rehearsal already ran, so this cannot prevent a leak, only
25surface it post-hoc for an operator (hence the audit-grade, advisory framing).
27**Trust model — the before-snapshot is unauthenticated input.** This check is
28only as honest as the baseline it is handed: an operator (or agent) that
29captures the "before" snapshot *after* the leak, or hand-writes a baseline that
30already lists the branch/PR it intends to leak, gets a clean verdict. It
31therefore assumes the baseline was captured by an honest party *before* the
32rehearsal, and provides no defense against an agent forging its own baseline. A
33stronger future design would have keel itself emit the before-snapshot as a
34side effect of starting the dry run rather than trusting a supplied file.
36**Attribution scope (deliberately narrow, to avoid false positives).** The
37ledger leg keys on an exact ``run_id`` match — a record mis-attributed to a
38different run_id is out of scope. The branch leg recognises only keel's
39ship-branch vocabulary (``<type>/issue-<N>``); a leak on a non-conforming branch
40name with no accompanying PR is not detected. These mirror keel's own write
41paths, so a leak that *follows keel's conventions* is caught.
43**The after-snapshot is read fail-closed by the CLI** (a failed git/gh read
44raises rather than degrading to empty), because an empty-on-error after snapshot
45would compute ``after − before = ∅`` and wrongly report "clean" — the worst
46fail direction for a detector. A verifier that cannot observe says "cannot
47certify", never "clean".
49Pure data in / structured report out: no network, subprocess, clock, or random.
50The CLI does the I/O (reads the ledger, lists branches, lists the issue's PRs)
51and feeds the snapshots here.
52"""
54from __future__ import annotations
56import re
57from dataclasses import dataclass, field
58from typing import Any
60SCHEMA_VERSION = "keel.dryrun-verify.v1"
62VERDICT_CLEAN = "clean"
63VERDICT_VIOLATED = "violated"
65FINDING_NEW_LEDGER_RECORD = "new-ledger-record"
66FINDING_NEW_BRANCH = "new-branch"
67FINDING_NEW_PR = "new-pr"
69# The ship-branch type prefixes (mirrors keel.evidence._SHIP_BRANCH_RE). A dry
70# run for issue N must not create a branch named ``<type>/issue-N``.
71_BRANCH_TYPES = ("feature", "fix", "chore", "docs", "test")
74def issue_branch_pattern(issue: int) -> re.Pattern[str]:
75 """Return the compiled branch pattern that a dry run for ``issue`` must not create.
77 Matches ``<type>/issue-<N>`` optionally followed by ``-<slug>`` — the same
78 shape keel's ship-branch detection recognises, pinned to the specific issue
79 number so a branch for a *different* issue is never mis-attributed.
80 """
81 types = "|".join(_BRANCH_TYPES)
82 return re.compile(rf"^({types})/issue-{issue}(?:-|$)")
85@dataclass(frozen=True)
86class ArtifactSnapshot:
87 """A snapshot of the artifacts a dry run could leak.
89 ``ledger_run_ids`` are the ship_run ``run_id`` values present in the run
90 ledger; ``branches`` are local/remote branch names; ``pr_numbers`` are pull
91 request numbers (CLI-scoped to the run's issue). All offline-supplyable so
92 tests are deterministic.
93 """
95 ledger_run_ids: tuple[str, ...] = field(default_factory=tuple)
96 branches: tuple[str, ...] = field(default_factory=tuple)
97 pr_numbers: tuple[int, ...] = field(default_factory=tuple)
100def reconcile(
101 before: ArtifactSnapshot,
102 after: ArtifactSnapshot,
103 *,
104 run_id: str,
105 issue: int,
106) -> dict[str, Any]:
107 """Diff a before/after snapshot pair and flag dry-run integrity leaks.
109 Flags a new ledger record whose ``run_id`` equals ``run_id``, any new branch
110 matching ``issue``'s ship-branch pattern, and any new PR. ``before``/``after``
111 are the artifact snapshots taken around the rehearsed run. Returns a
112 structured report: the per-artifact additions, a flat findings list, the
113 verdict, and a summary. Pure — reads only its arguments.
114 """
115 pattern = issue_branch_pattern(issue)
116 new_run_ids = _added(after.ledger_run_ids, before.ledger_run_ids)
117 new_branches = _added(after.branches, before.branches)
118 new_prs = _added(after.pr_numbers, before.pr_numbers)
120 findings: list[dict[str, Any]] = []
121 leaked_records = [rid for rid in new_run_ids if rid == run_id]
122 for rid in leaked_records:
123 findings.append({
124 "finding": FINDING_NEW_LEDGER_RECORD,
125 "artifact": rid,
126 "message": (
127 f"dry run {run_id!r} left a new ledger record (run_id {rid!r}); "
128 f"a rehearsal must append no record"
129 ),
130 })
131 leaked_branches = [name for name in new_branches if pattern.search(name)]
132 for name in leaked_branches:
133 findings.append({
134 "finding": FINDING_NEW_BRANCH,
135 "artifact": name,
136 "message": (
137 f"dry run {run_id!r} left a new branch {name!r} for issue "
138 f"#{issue}; a rehearsal must create no branch"
139 ),
140 })
141 for number in new_prs:
142 findings.append({
143 "finding": FINDING_NEW_PR,
144 "artifact": number,
145 "message": (
146 f"dry run {run_id!r} left a new PR #{number} for issue #{issue}; "
147 f"a rehearsal must open no PR"
148 ),
149 })
151 verdict = VERDICT_VIOLATED if findings else VERDICT_CLEAN
152 return {
153 "schema_version": SCHEMA_VERSION,
154 "verdict": verdict,
155 "ok": verdict == VERDICT_CLEAN,
156 "run_id": run_id,
157 "issue": issue,
158 "added": {
159 "ledger_run_ids": list(new_run_ids),
160 "branches": list(new_branches),
161 "pr_numbers": list(new_prs),
162 },
163 "findings": findings,
164 "summary": {
165 "new_ledger_records": len(leaked_records),
166 "new_branches": len(leaked_branches),
167 "new_prs": len(new_prs),
168 "findings": len(findings),
169 },
170 }
173def _added(after: tuple[Any, ...], before: tuple[Any, ...]) -> list[Any]:
174 """Return items present in ``after`` but not ``before``, in ``after`` order.
176 Order-preserving and de-duplicated so the report is deterministic regardless
177 of how the CLI gathered the snapshots.
178 """
179 seen = set(before)
180 out: list[Any] = []
181 emitted: set[Any] = set()
182 for item in after:
183 if item not in seen and item not in emitted:
184 out.append(item)
185 emitted.add(item)
186 return out