Coverage for src/keel/consent.py: 100%
124 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Operator consent contracts for live mutating keel runs."""
3from __future__ import annotations
5from collections.abc import Iterable
6from datetime import UTC, datetime
7from typing import Any
9SCHEMA_VERSION = "keel.operator-consent.v1"
10ESCALATION_SCHEMA_VERSION = "keel.risk-trust-escalation.v1"
12CONSENT_SCOPES = (
13 "filesystem",
14 "git",
15 "github",
16 "secrets",
17 "release",
18 "production-adjacent",
19)
21APPROVAL_SOURCES = ("none", "flag", "env", "config")
22CONSENT_MODES = ("explicit", "standing", "agent")
23RISK_TIERS = ("tier-1", "tier-2", "tier-3")
24TRUST_SIGNALS = ("high", "medium", "low")
26_SIDE_EFFECT_SCOPES: dict[str, tuple[str, ...]] = {
27 "capture": ("filesystem",),
28 "deferral_queue": ("filesystem",),
29 "file_edit": ("filesystem",),
30 "report_write": ("filesystem",),
31 "session_recap": ("filesystem",),
32 "session_report": ("filesystem",),
33 "git_branch": ("git",),
34 "git_commit": ("git",),
35 "git_checkout": ("git",),
36 "git_push": ("git",),
37 "git_worktree": ("filesystem", "git"),
38 "comments": ("github",),
39 "issue_close": ("github",),
40 "issue_write": ("github",),
41 "labels": ("github",),
42 "merge": ("github",),
43 "pull_request": ("github",),
44 "reviews": ("github",),
45 "package_publish": ("release",),
46 "release": ("release",),
47 "credential_access": ("secrets",),
48 "secret_access": ("secrets",),
49 "production_access": ("production-adjacent",),
50 "production_write": ("production-adjacent",),
51}
53_READ_ONLY_SIDE_EFFECTS: tuple[str, ...] = (
54 "check_runs",
55 "issue_read",
56 "pr_read",
57)
59_CAPABILITY_SIDE_EFFECTS: dict[str, tuple[str, ...]] = {
60 "filesystem-write": ("file_edit",),
61 "worktree": ("git_worktree",),
62 "release-publish": ("release",),
63 "secret-access": ("secret_access",),
64 "production-adjacent": ("production_access",),
65 "private-setup": ("credential_access",),
66}
68_SCOPE_ORDER = {scope: i for i, scope in enumerate(CONSENT_SCOPES)}
71def side_effect_scopes(side_effects: Iterable[str]) -> tuple[str, ...]:
72 """Map declared side effects to operator consent scopes."""
73 scopes: set[str] = set()
74 unknown: list[str] = []
75 for effect in side_effects:
76 if effect in _READ_ONLY_SIDE_EFFECTS:
77 continue
78 mapped = _SIDE_EFFECT_SCOPES.get(effect)
79 if mapped is None:
80 unknown.append(effect)
81 else:
82 scopes.update(mapped)
83 if unknown:
84 raise ValueError(
85 f"unknown side effect {unknown[0]!r}; declare it as read-only or map it to consent"
86 )
87 return _sort_scopes(scopes)
90def capability_side_effects(capabilities: Iterable[str]) -> tuple[str, ...]:
91 """Return consent side effects implied by generic runtime capabilities."""
92 effects: list[str] = []
93 for capability in capabilities:
94 effects.extend(_CAPABILITY_SIDE_EFFECTS.get(capability, ()))
95 return tuple(dict.fromkeys(effects))
98def escalation_contract_as_dict() -> dict[str, Any]:
99 """Return the deterministic risk x trust escalation contract."""
100 return {
101 "schema_version": ESCALATION_SCHEMA_VERSION,
102 "consumer_neutral": True,
103 "deterministic": True,
104 "stdlib_only": True,
105 "signals": {
106 "risk": {
107 "source": "s5 classify",
108 "tiers": list(RISK_TIERS),
109 },
110 "trust": {
111 "source": "adapter/runtime trust signal",
112 "values": list(TRUST_SIGNALS),
113 "not_confidence_alone": True,
114 },
115 },
116 "triggers": {
117 "irreversible_or_side_effecting": "always gate",
118 "repeated_retry": "retry_count >= 2",
119 "conflicting_sources": "true",
120 "large_diff": "changed_lines >= large_diff_threshold",
121 },
122 "low_risk_sampling": {
123 "deterministic": True,
124 "sample_bucket_range": "0..99",
125 },
126 "enforcement_boundary": "execution-layer",
127 "agent_self_approval": False,
128 }
131def evaluate_escalation(
132 *,
133 risk_tier: str = "tier-1",
134 trust_signal: str = "medium",
135 side_effects: Iterable[str] = (),
136 retry_count: int = 0,
137 conflicting_sources: bool = False,
138 changed_lines: int = 0,
139 large_diff_threshold: int = 500,
140 low_risk_sample_rate: int = 0,
141 sample_bucket: int = 0,
142) -> dict[str, Any]:
143 """Evaluate whether operator escalation is required from risk x trust signals."""
144 risk = _risk_tier(risk_tier)
145 trust = _trust_signal(trust_signal)
146 consent_scope = side_effect_scopes(side_effects)
147 triggers = {
148 "irreversible_or_side_effecting": bool(consent_scope),
149 "repeated_retry": retry_count >= 2,
150 "conflicting_sources": bool(conflicting_sources),
151 "large_diff": large_diff_threshold > 0 and changed_lines >= large_diff_threshold,
152 }
153 sample = _sample(low_risk_sample_rate, sample_bucket)
154 operator_required, reason = _escalation_reason(risk, trust, triggers, sample)
155 return {
156 "schema_version": ESCALATION_SCHEMA_VERSION,
157 "risk_tier": risk,
158 "trust_signal": trust,
159 "operator_required": operator_required,
160 "decision": "operator-gate" if operator_required else "no-escalation",
161 "reason": reason,
162 "triggers": triggers,
163 "consent_scope": list(consent_scope),
164 "sample": sample,
165 "enforcement_boundary": "execution-layer",
166 "agent_can_self_approve": False,
167 }
170def normalize_scopes(scopes: Iterable[str]) -> tuple[str, ...]:
171 """Normalize user-supplied consent scopes and reject unknown scope names."""
172 normalized = []
173 unknown = []
174 for raw in scopes:
175 for part in str(raw).split(","):
176 scope = part.strip()
177 if not scope:
178 continue
179 if scope not in CONSENT_SCOPES:
180 unknown.append(scope)
181 else:
182 normalized.append(scope)
183 if unknown:
184 raise ValueError(
185 f"unknown consent scope {unknown[0]!r}; valid: {', '.join(CONSENT_SCOPES)}"
186 )
187 return _sort_scopes(normalized)
190def build_consent_contract(
191 *,
192 command: str,
193 side_effects: Iterable[str],
194 dry_run: bool,
195 approved_scopes: Iterable[str] = (),
196 approval_source: str = "flag",
197 mode: str = "explicit",
198 operator: str | None = None,
199 target: str | None = None,
200 now: datetime | None = None,
201 risk_tier: str = "tier-1",
202 trust_signal: str = "medium",
203 retry_count: int = 0,
204 conflicting_sources: bool = False,
205 changed_lines: int = 0,
206 large_diff_threshold: int = 500,
207 low_risk_sample_rate: int = 0,
208 sample_bucket: int = 0,
209) -> dict[str, Any]:
210 """Build a JSON-compatible consent block for a command contract."""
211 side_effects = tuple(side_effects)
212 consent_scope = side_effect_scopes(side_effects)
213 approved_scope = normalize_scopes(approved_scopes)
214 if approval_source not in APPROVAL_SOURCES:
215 raise ValueError(
216 f"unknown approval source {approval_source!r}; valid: {', '.join(APPROVAL_SOURCES)}"
217 )
218 if mode not in CONSENT_MODES:
219 raise ValueError(f"unknown consent mode {mode!r}; valid: {', '.join(CONSENT_MODES)}")
220 effective_approved_scope = tuple(scope for scope in consent_scope if scope in approved_scope)
221 missing_scope = tuple(scope for scope in consent_scope if scope not in effective_approved_scope)
222 would_require = bool(consent_scope)
223 agent_delegated = (not dry_run) and mode == "agent" and would_require
224 requires = (not dry_run) and bool(missing_scope) and not agent_delegated
225 status = _status(
226 dry_run=dry_run,
227 would_require=would_require,
228 requires=requires,
229 agent_delegated=agent_delegated,
230 )
231 approved_live = (not dry_run) and would_require and not missing_scope
232 return {
233 "schema_version": SCHEMA_VERSION,
234 "requires_operator_consent": requires,
235 "would_require_operator_consent": would_require,
236 "status": status,
237 "mode": mode,
238 "consent_scope": list(consent_scope),
239 "approved_scope": list(approved_scope),
240 "approval_source": approval_source if approved_scope else "none",
241 "effective_approved_scope": list(effective_approved_scope),
242 "missing_scope": list(missing_scope if not dry_run else ()),
243 "consent_prompt": _prompt(command, target, dry_run, consent_scope, missing_scope),
244 "risk_trust_escalation": evaluate_escalation(
245 risk_tier=risk_tier,
246 trust_signal=trust_signal,
247 side_effects=side_effects,
248 retry_count=retry_count,
249 conflicting_sources=conflicting_sources,
250 changed_lines=changed_lines,
251 large_diff_threshold=large_diff_threshold,
252 low_risk_sample_rate=low_risk_sample_rate,
253 sample_bucket=sample_bucket,
254 ),
255 "delegated_agent_scope": {
256 "approved_mutation_scopes": list(effective_approved_scope if approved_live else ()),
257 "scope_expansion_policy": "block-or-escalate",
258 "secret_values_permitted_in_prompt": False,
259 "secret_access_requires_explicit_scope": "secrets",
260 },
261 "consent_record": (
262 _record(command, target, effective_approved_scope, operator, dry_run, now)
263 | {"source": approval_source}
264 if approved_live
265 else None
266 ),
267 }
270def assert_operator_consent(contract: dict[str, Any]) -> tuple[bool, str]:
271 """Return whether a live run may proceed and the human-readable reason."""
272 if contract.get("requires_operator_consent"):
273 missing = ", ".join(contract.get("missing_scope", []))
274 prompt = contract.get("consent_prompt") or "operator consent is required"
275 return False, f"operator consent required: {prompt} Missing approved scope: {missing}."
276 return True, "operator consent satisfied"
279def _status(*, dry_run: bool, would_require: bool, requires: bool, agent_delegated: bool) -> str:
280 if dry_run:
281 return "not-required-dry-run" if would_require else "not-required-read-only"
282 if agent_delegated:
283 return "agent-delegated"
284 if requires:
285 return "missing"
286 return "approved" if would_require else "not-required-read-only"
289def _prompt(
290 command: str,
291 target: str | None,
292 dry_run: bool,
293 consent_scope: tuple[str, ...],
294 missing_scope: tuple[str, ...],
295) -> str:
296 mode = "dry-run" if dry_run else "live"
297 target_text = target or "unspecified target"
298 scope_text = ", ".join(consent_scope) if consent_scope else "none"
299 if dry_run:
300 return (
301 f"Command {command!r} is planned for {mode} mode against {target_text}. "
302 f"A live run would require operator approval for: {scope_text}."
303 )
304 missing_text = ", ".join(missing_scope) if missing_scope else "none"
305 return (
306 f"Approve command {command!r} for live mode against {target_text}. "
307 f"Required scopes: {scope_text}. Missing approvals: {missing_text}."
308 )
311def _record(
312 command: str,
313 target: str | None,
314 approved_scope: tuple[str, ...],
315 operator: str | None,
316 dry_run: bool,
317 now: datetime | None,
318) -> dict[str, Any]:
319 timestamp = (now or datetime.now(UTC)).astimezone(UTC)
320 return {
321 "timestamp": timestamp.isoformat().replace("+00:00", "Z"),
322 "operator": operator,
323 "workflow": command,
324 "target": target,
325 "scopes_approved": list(approved_scope),
326 "dry_run": dry_run,
327 "secret_values_recorded": False,
328 }
331def _sort_scopes(scopes: Iterable[str]) -> tuple[str, ...]:
332 return tuple(sorted(set(scopes), key=lambda s: (_SCOPE_ORDER.get(s, 999), s)))
335def _risk_tier(value: str) -> str:
336 return value if value in RISK_TIERS else "tier-3"
339def _trust_signal(value: str) -> str:
340 return value if value in TRUST_SIGNALS else "low"
343def _sample(rate: int, bucket: int) -> dict[str, Any]:
344 sample_rate = min(100, max(0, int(rate)))
345 sample_bucket = min(99, max(0, int(bucket)))
346 return {
347 "rate": sample_rate,
348 "bucket": sample_bucket,
349 "selected": sample_bucket < sample_rate,
350 }
353def _escalation_reason(
354 risk: str,
355 trust: str,
356 triggers: dict[str, bool],
357 sample: dict[str, Any],
358) -> tuple[bool, str]:
359 if triggers["irreversible_or_side_effecting"]:
360 return True, "irreversible-or-side-effecting"
361 if triggers["repeated_retry"]:
362 return True, "repeated-retry"
363 if triggers["conflicting_sources"]:
364 return True, "conflicting-sources"
365 if triggers["large_diff"]:
366 return True, "large-diff"
367 if risk == "tier-3" and trust != "high":
368 return True, "risk-trust"
369 if risk == "tier-2" and trust == "low":
370 return True, "risk-trust"
371 if sample["selected"]:
372 return True, "low-risk-sample"
373 return False, "below-escalation-threshold"