Coverage for src/keel/consentverify.py: 100%

65 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Pure consent-boundary reconciliation: observed PR side effects vs approved scopes. 

2 

3keel's consent scopes (see :mod:`keel.consent`) gate the CLI *contract* that an 

4agent renders before a live run — they do not gate the side effects themselves. 

5Every real mutation (git push, ``gh pr create``/``comment``/``merge``, label 

6writes) is executed by the agent directly and never passes a consent check, and 

7the consent ``status``/``scopes`` recorded on the ledger are whatever the agent 

8passed. There is no deterministic process that checks the side effects actually 

9*observed* on a PR against the scopes that were *approved*. 

10 

11This module is that process, in its lowest-friction core-pure form: a post-hoc 

12reconcile. Given the side effects observed on a PR (the PR exists, comments were 

13posted, it was merged, labels were written) and the approved consent scopes from 

14the ledger's consent record, it maps each observed effect to its required scopes 

15(reusing :func:`keel.consent.side_effect_scopes` — no parallel vocabulary) and 

16flags any observed mutation not covered by an approved scope. 

17 

18Two verdicts, fail-closed only on a real boundary breach: 

19 

20* **advisory** — no consent record exists to reconcile against (a pre-consent or 

21 agent-self-reported PR). Back-compat: nothing to check, so nothing fails. 

22* **pass** / **fail** — a consent record exists. ``fail`` when an observed effect 

23 requires a scope the record never approved; ``pass`` otherwise. 

24 

25Pure data in / structured report out: no network, subprocess, clock, or random. 

26The CLI does the I/O (transport observation of PR state, comments, merged, 

27labels; ledger consent record) and feeds the booleans/scopes here. 

28""" 

29 

30from __future__ import annotations 

31 

32from dataclasses import dataclass 

33from typing import Any 

34 

35from . import consent 

36 

37SCHEMA_VERSION = "keel.consent-verify.v1" 

38 

39VERDICT_ADVISORY = "advisory" 

40VERDICT_PASS = "pass" 

41VERDICT_FAIL = "fail" 

42 

43# Observed-effect flag name -> the consent.side_effect vocabulary entry it maps 

44# to. Each side-effect resolves through ``consent.side_effect_scopes`` so the 

45# required-scope set always tracks the canonical consent vocabulary. 

46_EFFECT_SIDE_EFFECTS: dict[str, str] = { 

47 # The PR existing at all means a branch was pushed and a PR opened: git push 

48 # (scope ``git``) plus the gh pr create (scope ``github``). 

49 "pr_exists": "git_push", 

50 "pr_created": "pull_request", 

51 "comment": "comments", 

52 "merged": "merge", 

53 "label": "labels", 

54} 

55 

56# ``pr_exists`` expands to two side effects (push + open) because a PR's mere 

57# existence implies both a ``git`` push and a ``github`` create. 

58_EFFECT_EXTRA_SIDE_EFFECTS: dict[str, tuple[str, ...]] = { 

59 "pr_exists": ("pull_request",), 

60} 

61 

62OBSERVED_EFFECT_KINDS: tuple[str, ...] = tuple(_EFFECT_SIDE_EFFECTS) 

63 

64 

65@dataclass(frozen=True) 

66class ObservedEffects: 

67 """The mutating side effects observed on one PR (each defaults to absent). 

68 

69 ``pr_exists`` is the baseline: a PR that exists implies a branch push and a 

70 ``gh pr create``. ``merged``/``commented``/``labeled`` layer on the heavier 

71 mutations. All offline-supplyable so tests are deterministic. 

72 """ 

73 

74 pr_exists: bool = False 

75 commented: bool = False 

76 merged: bool = False 

77 labeled: bool = False 

78 

79 def as_kinds(self) -> tuple[str, ...]: 

80 """Return the observed effect-kind names in a stable order.""" 

81 kinds: list[str] = [] 

82 if self.pr_exists: 

83 kinds.append("pr_exists") 

84 if self.commented: 

85 kinds.append("comment") 

86 if self.merged: 

87 kinds.append("merged") 

88 if self.labeled: 

89 kinds.append("label") 

90 return tuple(kinds) 

91 

92 

93def required_scopes_for_effect(effect_kind: str) -> tuple[str, ...]: 

94 """Return the consent scopes an observed ``effect_kind`` requires. 

95 

96 Resolves through :func:`keel.consent.side_effect_scopes`, so the mapping 

97 reuses the canonical scope vocabulary rather than inventing a parallel one. 

98 Raises ``ValueError`` for an unknown effect kind so a typo can never silently 

99 map to "no scopes required" (which would wrongly pass reconciliation). 

100 """ 

101 side_effect = _EFFECT_SIDE_EFFECTS.get(effect_kind) 

102 if side_effect is None: 

103 raise ValueError( 

104 f"unknown observed effect {effect_kind!r}; " 

105 f"valid: {', '.join(OBSERVED_EFFECT_KINDS)}" 

106 ) 

107 side_effects = (side_effect, *_EFFECT_EXTRA_SIDE_EFFECTS.get(effect_kind, ())) 

108 return consent.side_effect_scopes(side_effects) 

109 

110 

111def scope_effect_table() -> dict[str, list[str]]: 

112 """Return the deterministic observed-effect -> required-scope mapping table. 

113 

114 Surfaced in the command contract and docs so operators can audit exactly how 

115 each observed mutation is scored without reading the code. 

116 """ 

117 return {kind: list(required_scopes_for_effect(kind)) for kind in OBSERVED_EFFECT_KINDS} 

118 

119 

120def reconcile( 

121 observed: ObservedEffects, 

122 approved_scopes: list[str] | tuple[str, ...] | None, 

123 *, 

124 has_consent_record: bool, 

125) -> dict[str, Any]: 

126 """Reconcile observed PR side effects against approved consent scopes. 

127 

128 ``observed`` are the mutations seen on the PR. ``approved_scopes`` are the 

129 scopes the ledger's consent record approved. ``has_consent_record`` is 

130 whether a consent record exists at all for the PR — when ``False`` there is 

131 nothing to reconcile against, so the verdict is ``advisory`` (back-compat for 

132 pre-consent PRs) regardless of what was observed. 

133 

134 Returns a structured report: the per-effect coverage, a flat list of uncovered 

135 mutations (each naming the effect and the missing scope), the verdict, and a 

136 summary. Pure — reads only its arguments. 

137 """ 

138 approved = consent.normalize_scopes(approved_scopes or ()) 

139 effects = [] 

140 uncovered: list[dict[str, Any]] = [] 

141 for kind in observed.as_kinds(): 

142 required = required_scopes_for_effect(kind) 

143 missing = tuple(scope for scope in required if scope not in approved) 

144 covered = not missing 

145 effects.append({ 

146 "effect": kind, 

147 "required_scopes": list(required), 

148 "missing_scopes": list(missing), 

149 "covered": covered, 

150 }) 

151 if not covered and has_consent_record: 

152 uncovered.append({ 

153 "effect": kind, 

154 "required_scopes": list(required), 

155 "missing_scopes": list(missing), 

156 "message": ( 

157 f"mutation {kind} not covered by approved consent scopes " 

158 f"(requires {', '.join(required)}; missing {', '.join(missing)})" 

159 ), 

160 }) 

161 verdict = _verdict(has_consent_record=has_consent_record, uncovered=uncovered) 

162 return { 

163 "schema_version": SCHEMA_VERSION, 

164 "verdict": verdict, 

165 "ok": verdict != VERDICT_FAIL, 

166 "has_consent_record": has_consent_record, 

167 "approved_scopes": list(approved), 

168 "observed_effects": list(observed.as_kinds()), 

169 "effects": effects, 

170 "uncovered": uncovered, 

171 "summary": { 

172 "observed": len(effects), 

173 "covered": sum(1 for effect in effects if effect["covered"]), 

174 "uncovered": len(uncovered), 

175 }, 

176 } 

177 

178 

179def _verdict(*, has_consent_record: bool, uncovered: list[dict[str, Any]]) -> str: 

180 if not has_consent_record: 

181 return VERDICT_ADVISORY 

182 return VERDICT_FAIL if uncovered else VERDICT_PASS 

183 

184 

185def consent_record_from_ledger( 

186 record: dict[str, Any] | None, 

187) -> tuple[bool, tuple[str, ...]]: 

188 """Extract ``(has_record, approved_scopes)`` from a ship_run ledger record. 

189 

190 The ledger stores consent under ``run_context.consent`` as a ``status`` plus 

191 the approved mutation ``scopes`` (see :func:`keel.ledger._run_context`). A 

192 consent record is considered to *exist* only when ``status`` is a non-blank 

193 string — a missing record, a missing/empty ``run_context``, or a blank status 

194 all degrade to "no record" so the verdict falls back to advisory rather than 

195 failing a pre-consent PR. Pure — no I/O. 

196 """ 

197 if not isinstance(record, dict): 

198 return False, () 

199 run_context = record.get("run_context") 

200 consent_block = run_context.get("consent") if isinstance(run_context, dict) else None 

201 if not isinstance(consent_block, dict): 

202 return False, () 

203 status = consent_block.get("status") 

204 has_record = isinstance(status, str) and bool(status.strip()) 

205 raw_scopes = consent_block.get("scopes") 

206 scopes = ( 

207 tuple(str(scope) for scope in raw_scopes if str(scope).strip()) 

208 if isinstance(raw_scopes, list) 

209 else () 

210 ) 

211 return has_record, scopes