Coverage for src/keel/consentverify.py: 100%
65 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Pure consent-boundary reconciliation: observed PR side effects vs approved scopes.
3keel's consent scopes (see :mod:`keel.consent`) gate the CLI *contract* that an
4agent renders before a live run — they do not gate the side effects themselves.
5Every real mutation (git push, ``gh pr create``/``comment``/``merge``, label
6writes) is executed by the agent directly and never passes a consent check, and
7the consent ``status``/``scopes`` recorded on the ledger are whatever the agent
8passed. There is no deterministic process that checks the side effects actually
9*observed* on a PR against the scopes that were *approved*.
11This module is that process, in its lowest-friction core-pure form: a post-hoc
12reconcile. Given the side effects observed on a PR (the PR exists, comments were
13posted, it was merged, labels were written) and the approved consent scopes from
14the ledger's consent record, it maps each observed effect to its required scopes
15(reusing :func:`keel.consent.side_effect_scopes` — no parallel vocabulary) and
16flags any observed mutation not covered by an approved scope.
18Two verdicts, fail-closed only on a real boundary breach:
20* **advisory** — no consent record exists to reconcile against (a pre-consent or
21 agent-self-reported PR). Back-compat: nothing to check, so nothing fails.
22* **pass** / **fail** — a consent record exists. ``fail`` when an observed effect
23 requires a scope the record never approved; ``pass`` otherwise.
25Pure data in / structured report out: no network, subprocess, clock, or random.
26The CLI does the I/O (transport observation of PR state, comments, merged,
27labels; ledger consent record) and feeds the booleans/scopes here.
28"""
30from __future__ import annotations
32from dataclasses import dataclass
33from typing import Any
35from . import consent
37SCHEMA_VERSION = "keel.consent-verify.v1"
39VERDICT_ADVISORY = "advisory"
40VERDICT_PASS = "pass"
41VERDICT_FAIL = "fail"
43# Observed-effect flag name -> the consent.side_effect vocabulary entry it maps
44# to. Each side-effect resolves through ``consent.side_effect_scopes`` so the
45# required-scope set always tracks the canonical consent vocabulary.
46_EFFECT_SIDE_EFFECTS: dict[str, str] = {
47 # The PR existing at all means a branch was pushed and a PR opened: git push
48 # (scope ``git``) plus the gh pr create (scope ``github``).
49 "pr_exists": "git_push",
50 "pr_created": "pull_request",
51 "comment": "comments",
52 "merged": "merge",
53 "label": "labels",
54}
56# ``pr_exists`` expands to two side effects (push + open) because a PR's mere
57# existence implies both a ``git`` push and a ``github`` create.
58_EFFECT_EXTRA_SIDE_EFFECTS: dict[str, tuple[str, ...]] = {
59 "pr_exists": ("pull_request",),
60}
62OBSERVED_EFFECT_KINDS: tuple[str, ...] = tuple(_EFFECT_SIDE_EFFECTS)
65@dataclass(frozen=True)
66class ObservedEffects:
67 """The mutating side effects observed on one PR (each defaults to absent).
69 ``pr_exists`` is the baseline: a PR that exists implies a branch push and a
70 ``gh pr create``. ``merged``/``commented``/``labeled`` layer on the heavier
71 mutations. All offline-supplyable so tests are deterministic.
72 """
74 pr_exists: bool = False
75 commented: bool = False
76 merged: bool = False
77 labeled: bool = False
79 def as_kinds(self) -> tuple[str, ...]:
80 """Return the observed effect-kind names in a stable order."""
81 kinds: list[str] = []
82 if self.pr_exists:
83 kinds.append("pr_exists")
84 if self.commented:
85 kinds.append("comment")
86 if self.merged:
87 kinds.append("merged")
88 if self.labeled:
89 kinds.append("label")
90 return tuple(kinds)
93def required_scopes_for_effect(effect_kind: str) -> tuple[str, ...]:
94 """Return the consent scopes an observed ``effect_kind`` requires.
96 Resolves through :func:`keel.consent.side_effect_scopes`, so the mapping
97 reuses the canonical scope vocabulary rather than inventing a parallel one.
98 Raises ``ValueError`` for an unknown effect kind so a typo can never silently
99 map to "no scopes required" (which would wrongly pass reconciliation).
100 """
101 side_effect = _EFFECT_SIDE_EFFECTS.get(effect_kind)
102 if side_effect is None:
103 raise ValueError(
104 f"unknown observed effect {effect_kind!r}; "
105 f"valid: {', '.join(OBSERVED_EFFECT_KINDS)}"
106 )
107 side_effects = (side_effect, *_EFFECT_EXTRA_SIDE_EFFECTS.get(effect_kind, ()))
108 return consent.side_effect_scopes(side_effects)
111def scope_effect_table() -> dict[str, list[str]]:
112 """Return the deterministic observed-effect -> required-scope mapping table.
114 Surfaced in the command contract and docs so operators can audit exactly how
115 each observed mutation is scored without reading the code.
116 """
117 return {kind: list(required_scopes_for_effect(kind)) for kind in OBSERVED_EFFECT_KINDS}
120def reconcile(
121 observed: ObservedEffects,
122 approved_scopes: list[str] | tuple[str, ...] | None,
123 *,
124 has_consent_record: bool,
125) -> dict[str, Any]:
126 """Reconcile observed PR side effects against approved consent scopes.
128 ``observed`` are the mutations seen on the PR. ``approved_scopes`` are the
129 scopes the ledger's consent record approved. ``has_consent_record`` is
130 whether a consent record exists at all for the PR — when ``False`` there is
131 nothing to reconcile against, so the verdict is ``advisory`` (back-compat for
132 pre-consent PRs) regardless of what was observed.
134 Returns a structured report: the per-effect coverage, a flat list of uncovered
135 mutations (each naming the effect and the missing scope), the verdict, and a
136 summary. Pure — reads only its arguments.
137 """
138 approved = consent.normalize_scopes(approved_scopes or ())
139 effects = []
140 uncovered: list[dict[str, Any]] = []
141 for kind in observed.as_kinds():
142 required = required_scopes_for_effect(kind)
143 missing = tuple(scope for scope in required if scope not in approved)
144 covered = not missing
145 effects.append({
146 "effect": kind,
147 "required_scopes": list(required),
148 "missing_scopes": list(missing),
149 "covered": covered,
150 })
151 if not covered and has_consent_record:
152 uncovered.append({
153 "effect": kind,
154 "required_scopes": list(required),
155 "missing_scopes": list(missing),
156 "message": (
157 f"mutation {kind} not covered by approved consent scopes "
158 f"(requires {', '.join(required)}; missing {', '.join(missing)})"
159 ),
160 })
161 verdict = _verdict(has_consent_record=has_consent_record, uncovered=uncovered)
162 return {
163 "schema_version": SCHEMA_VERSION,
164 "verdict": verdict,
165 "ok": verdict != VERDICT_FAIL,
166 "has_consent_record": has_consent_record,
167 "approved_scopes": list(approved),
168 "observed_effects": list(observed.as_kinds()),
169 "effects": effects,
170 "uncovered": uncovered,
171 "summary": {
172 "observed": len(effects),
173 "covered": sum(1 for effect in effects if effect["covered"]),
174 "uncovered": len(uncovered),
175 },
176 }
179def _verdict(*, has_consent_record: bool, uncovered: list[dict[str, Any]]) -> str:
180 if not has_consent_record:
181 return VERDICT_ADVISORY
182 return VERDICT_FAIL if uncovered else VERDICT_PASS
185def consent_record_from_ledger(
186 record: dict[str, Any] | None,
187) -> tuple[bool, tuple[str, ...]]:
188 """Extract ``(has_record, approved_scopes)`` from a ship_run ledger record.
190 The ledger stores consent under ``run_context.consent`` as a ``status`` plus
191 the approved mutation ``scopes`` (see :func:`keel.ledger._run_context`). A
192 consent record is considered to *exist* only when ``status`` is a non-blank
193 string — a missing record, a missing/empty ``run_context``, or a blank status
194 all degrade to "no record" so the verdict falls back to advisory rather than
195 failing a pre-consent PR. Pure — no I/O.
196 """
197 if not isinstance(record, dict):
198 return False, ()
199 run_context = record.get("run_context")
200 consent_block = run_context.get("consent") if isinstance(run_context, dict) else None
201 if not isinstance(consent_block, dict):
202 return False, ()
203 status = consent_block.get("status")
204 has_record = isinstance(status, str) and bool(status.strip())
205 raw_scopes = consent_block.get("scopes")
206 scopes = (
207 tuple(str(scope) for scope in raw_scopes if str(scope).strip())
208 if isinstance(raw_scopes, list)
209 else ()
210 )
211 return has_record, scopes