Coverage for src/keel/dryrunverify.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Pure post-hoc dry-run integrity reconciliation. 

2 

3``--dry-run`` is a promise: a rehearsal redirects every state-changing 

4``git``/``gh`` write to a logged ``DRY-RUN:`` line and mutates nothing (see the 

5ship adapter and :func:`keel.cli`'s dry-run paths). But the promise is prose — 

6nothing *detects* a leak where a dry run actually created a ledger record, a 

7branch, or a PR. That is the audit gap (GAP-14) this module closes. 

8 

9The check is a before/after snapshot diff. The operator captures a snapshot of 

10the relevant artifacts *before* the dry run, runs the dry run, then this module 

11diffs a fresh *after* snapshot against the before one and flags any new artifact 

12attributable to the rehearsed run: 

13 

14* **new ledger record** — a ship_run ``run_id`` equal to the dry run's id that 

15 appears in the after snapshot but not the before one. A dry run must append no 

16 record, so any such record is a leak. 

17* **new branch** — a branch matching the run's issue (``<type>/issue-<N>`` in the 

18 ship-branch vocabulary) present after but not before. 

19* **new PR** — a pull request present after but not before. The CLI scopes the 

20 PR snapshot to the run's issue, so any new entry is attributable. 

21 

22The verdict is **clean** when the after snapshot adds nothing, **violated** when 

23it adds at least one attributable artifact. The findings are an integrity 

24*detection* — the rehearsal already ran, so this cannot prevent a leak, only 

25surface it post-hoc for an operator (hence the audit-grade, advisory framing). 

26 

27**Trust model — the before-snapshot is unauthenticated input.** This check is 

28only as honest as the baseline it is handed: an operator (or agent) that 

29captures the "before" snapshot *after* the leak, or hand-writes a baseline that 

30already lists the branch/PR it intends to leak, gets a clean verdict. It 

31therefore assumes the baseline was captured by an honest party *before* the 

32rehearsal, and provides no defense against an agent forging its own baseline. A 

33stronger future design would have keel itself emit the before-snapshot as a 

34side effect of starting the dry run rather than trusting a supplied file. 

35 

36**Attribution scope (deliberately narrow, to avoid false positives).** The 

37ledger leg keys on an exact ``run_id`` match — a record mis-attributed to a 

38different run_id is out of scope. The branch leg recognises only keel's 

39ship-branch vocabulary (``<type>/issue-<N>``); a leak on a non-conforming branch 

40name with no accompanying PR is not detected. These mirror keel's own write 

41paths, so a leak that *follows keel's conventions* is caught. 

42 

43**The after-snapshot is read fail-closed by the CLI** (a failed git/gh read 

44raises rather than degrading to empty), because an empty-on-error after snapshot 

45would compute ``after − before = ∅`` and wrongly report "clean" — the worst 

46fail direction for a detector. A verifier that cannot observe says "cannot 

47certify", never "clean". 

48 

49Pure data in / structured report out: no network, subprocess, clock, or random. 

50The CLI does the I/O (reads the ledger, lists branches, lists the issue's PRs) 

51and feeds the snapshots here. 

52""" 

53 

54from __future__ import annotations 

55 

56import re 

57from dataclasses import dataclass, field 

58from typing import Any 

59 

60SCHEMA_VERSION = "keel.dryrun-verify.v1" 

61 

62VERDICT_CLEAN = "clean" 

63VERDICT_VIOLATED = "violated" 

64 

65FINDING_NEW_LEDGER_RECORD = "new-ledger-record" 

66FINDING_NEW_BRANCH = "new-branch" 

67FINDING_NEW_PR = "new-pr" 

68 

69# The ship-branch type prefixes (mirrors keel.evidence._SHIP_BRANCH_RE). A dry 

70# run for issue N must not create a branch named ``<type>/issue-N``. 

71_BRANCH_TYPES = ("feature", "fix", "chore", "docs", "test") 

72 

73 

74def issue_branch_pattern(issue: int) -> re.Pattern[str]: 

75 """Return the compiled branch pattern that a dry run for ``issue`` must not create. 

76 

77 Matches ``<type>/issue-<N>`` optionally followed by ``-<slug>`` — the same 

78 shape keel's ship-branch detection recognises, pinned to the specific issue 

79 number so a branch for a *different* issue is never mis-attributed. 

80 """ 

81 types = "|".join(_BRANCH_TYPES) 

82 return re.compile(rf"^({types})/issue-{issue}(?:-|$)") 

83 

84 

85@dataclass(frozen=True) 

86class ArtifactSnapshot: 

87 """A snapshot of the artifacts a dry run could leak. 

88 

89 ``ledger_run_ids`` are the ship_run ``run_id`` values present in the run 

90 ledger; ``branches`` are local/remote branch names; ``pr_numbers`` are pull 

91 request numbers (CLI-scoped to the run's issue). All offline-supplyable so 

92 tests are deterministic. 

93 """ 

94 

95 ledger_run_ids: tuple[str, ...] = field(default_factory=tuple) 

96 branches: tuple[str, ...] = field(default_factory=tuple) 

97 pr_numbers: tuple[int, ...] = field(default_factory=tuple) 

98 

99 

100def reconcile( 

101 before: ArtifactSnapshot, 

102 after: ArtifactSnapshot, 

103 *, 

104 run_id: str, 

105 issue: int, 

106) -> dict[str, Any]: 

107 """Diff a before/after snapshot pair and flag dry-run integrity leaks. 

108 

109 Flags a new ledger record whose ``run_id`` equals ``run_id``, any new branch 

110 matching ``issue``'s ship-branch pattern, and any new PR. ``before``/``after`` 

111 are the artifact snapshots taken around the rehearsed run. Returns a 

112 structured report: the per-artifact additions, a flat findings list, the 

113 verdict, and a summary. Pure — reads only its arguments. 

114 """ 

115 pattern = issue_branch_pattern(issue) 

116 new_run_ids = _added(after.ledger_run_ids, before.ledger_run_ids) 

117 new_branches = _added(after.branches, before.branches) 

118 new_prs = _added(after.pr_numbers, before.pr_numbers) 

119 

120 findings: list[dict[str, Any]] = [] 

121 leaked_records = [rid for rid in new_run_ids if rid == run_id] 

122 for rid in leaked_records: 

123 findings.append({ 

124 "finding": FINDING_NEW_LEDGER_RECORD, 

125 "artifact": rid, 

126 "message": ( 

127 f"dry run {run_id!r} left a new ledger record (run_id {rid!r}); " 

128 f"a rehearsal must append no record" 

129 ), 

130 }) 

131 leaked_branches = [name for name in new_branches if pattern.search(name)] 

132 for name in leaked_branches: 

133 findings.append({ 

134 "finding": FINDING_NEW_BRANCH, 

135 "artifact": name, 

136 "message": ( 

137 f"dry run {run_id!r} left a new branch {name!r} for issue " 

138 f"#{issue}; a rehearsal must create no branch" 

139 ), 

140 }) 

141 for number in new_prs: 

142 findings.append({ 

143 "finding": FINDING_NEW_PR, 

144 "artifact": number, 

145 "message": ( 

146 f"dry run {run_id!r} left a new PR #{number} for issue #{issue}; " 

147 f"a rehearsal must open no PR" 

148 ), 

149 }) 

150 

151 verdict = VERDICT_VIOLATED if findings else VERDICT_CLEAN 

152 return { 

153 "schema_version": SCHEMA_VERSION, 

154 "verdict": verdict, 

155 "ok": verdict == VERDICT_CLEAN, 

156 "run_id": run_id, 

157 "issue": issue, 

158 "added": { 

159 "ledger_run_ids": list(new_run_ids), 

160 "branches": list(new_branches), 

161 "pr_numbers": list(new_prs), 

162 }, 

163 "findings": findings, 

164 "summary": { 

165 "new_ledger_records": len(leaked_records), 

166 "new_branches": len(leaked_branches), 

167 "new_prs": len(new_prs), 

168 "findings": len(findings), 

169 }, 

170 } 

171 

172 

173def _added(after: tuple[Any, ...], before: tuple[Any, ...]) -> list[Any]: 

174 """Return items present in ``after`` but not ``before``, in ``after`` order. 

175 

176 Order-preserving and de-duplicated so the report is deterministic regardless 

177 of how the CLI gathered the snapshots. 

178 """ 

179 seen = set(before) 

180 out: list[Any] = [] 

181 emitted: set[Any] = set() 

182 for item in after: 

183 if item not in seen and item not in emitted: 

184 out.append(item) 

185 emitted.add(item) 

186 return out