Coverage for src/keel/consent.py: 100%

124 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Operator consent contracts for live mutating keel runs.""" 

2 

3from __future__ import annotations 

4 

5from collections.abc import Iterable 

6from datetime import UTC, datetime 

7from typing import Any 

8 

9SCHEMA_VERSION = "keel.operator-consent.v1" 

10ESCALATION_SCHEMA_VERSION = "keel.risk-trust-escalation.v1" 

11 

12CONSENT_SCOPES = ( 

13 "filesystem", 

14 "git", 

15 "github", 

16 "secrets", 

17 "release", 

18 "production-adjacent", 

19) 

20 

21APPROVAL_SOURCES = ("none", "flag", "env", "config") 

22CONSENT_MODES = ("explicit", "standing", "agent") 

23RISK_TIERS = ("tier-1", "tier-2", "tier-3") 

24TRUST_SIGNALS = ("high", "medium", "low") 

25 

26_SIDE_EFFECT_SCOPES: dict[str, tuple[str, ...]] = { 

27 "capture": ("filesystem",), 

28 "deferral_queue": ("filesystem",), 

29 "file_edit": ("filesystem",), 

30 "report_write": ("filesystem",), 

31 "session_recap": ("filesystem",), 

32 "session_report": ("filesystem",), 

33 "git_branch": ("git",), 

34 "git_commit": ("git",), 

35 "git_checkout": ("git",), 

36 "git_push": ("git",), 

37 "git_worktree": ("filesystem", "git"), 

38 "comments": ("github",), 

39 "issue_close": ("github",), 

40 "issue_write": ("github",), 

41 "labels": ("github",), 

42 "merge": ("github",), 

43 "pull_request": ("github",), 

44 "reviews": ("github",), 

45 "package_publish": ("release",), 

46 "release": ("release",), 

47 "credential_access": ("secrets",), 

48 "secret_access": ("secrets",), 

49 "production_access": ("production-adjacent",), 

50 "production_write": ("production-adjacent",), 

51} 

52 

53_READ_ONLY_SIDE_EFFECTS: tuple[str, ...] = ( 

54 "check_runs", 

55 "issue_read", 

56 "pr_read", 

57) 

58 

59_CAPABILITY_SIDE_EFFECTS: dict[str, tuple[str, ...]] = { 

60 "filesystem-write": ("file_edit",), 

61 "worktree": ("git_worktree",), 

62 "release-publish": ("release",), 

63 "secret-access": ("secret_access",), 

64 "production-adjacent": ("production_access",), 

65 "private-setup": ("credential_access",), 

66} 

67 

68_SCOPE_ORDER = {scope: i for i, scope in enumerate(CONSENT_SCOPES)} 

69 

70 

71def side_effect_scopes(side_effects: Iterable[str]) -> tuple[str, ...]: 

72 """Map declared side effects to operator consent scopes.""" 

73 scopes: set[str] = set() 

74 unknown: list[str] = [] 

75 for effect in side_effects: 

76 if effect in _READ_ONLY_SIDE_EFFECTS: 

77 continue 

78 mapped = _SIDE_EFFECT_SCOPES.get(effect) 

79 if mapped is None: 

80 unknown.append(effect) 

81 else: 

82 scopes.update(mapped) 

83 if unknown: 

84 raise ValueError( 

85 f"unknown side effect {unknown[0]!r}; declare it as read-only or map it to consent" 

86 ) 

87 return _sort_scopes(scopes) 

88 

89 

90def capability_side_effects(capabilities: Iterable[str]) -> tuple[str, ...]: 

91 """Return consent side effects implied by generic runtime capabilities.""" 

92 effects: list[str] = [] 

93 for capability in capabilities: 

94 effects.extend(_CAPABILITY_SIDE_EFFECTS.get(capability, ())) 

95 return tuple(dict.fromkeys(effects)) 

96 

97 

98def escalation_contract_as_dict() -> dict[str, Any]: 

99 """Return the deterministic risk x trust escalation contract.""" 

100 return { 

101 "schema_version": ESCALATION_SCHEMA_VERSION, 

102 "consumer_neutral": True, 

103 "deterministic": True, 

104 "stdlib_only": True, 

105 "signals": { 

106 "risk": { 

107 "source": "s5 classify", 

108 "tiers": list(RISK_TIERS), 

109 }, 

110 "trust": { 

111 "source": "adapter/runtime trust signal", 

112 "values": list(TRUST_SIGNALS), 

113 "not_confidence_alone": True, 

114 }, 

115 }, 

116 "triggers": { 

117 "irreversible_or_side_effecting": "always gate", 

118 "repeated_retry": "retry_count >= 2", 

119 "conflicting_sources": "true", 

120 "large_diff": "changed_lines >= large_diff_threshold", 

121 }, 

122 "low_risk_sampling": { 

123 "deterministic": True, 

124 "sample_bucket_range": "0..99", 

125 }, 

126 "enforcement_boundary": "execution-layer", 

127 "agent_self_approval": False, 

128 } 

129 

130 

131def evaluate_escalation( 

132 *, 

133 risk_tier: str = "tier-1", 

134 trust_signal: str = "medium", 

135 side_effects: Iterable[str] = (), 

136 retry_count: int = 0, 

137 conflicting_sources: bool = False, 

138 changed_lines: int = 0, 

139 large_diff_threshold: int = 500, 

140 low_risk_sample_rate: int = 0, 

141 sample_bucket: int = 0, 

142) -> dict[str, Any]: 

143 """Evaluate whether operator escalation is required from risk x trust signals.""" 

144 risk = _risk_tier(risk_tier) 

145 trust = _trust_signal(trust_signal) 

146 consent_scope = side_effect_scopes(side_effects) 

147 triggers = { 

148 "irreversible_or_side_effecting": bool(consent_scope), 

149 "repeated_retry": retry_count >= 2, 

150 "conflicting_sources": bool(conflicting_sources), 

151 "large_diff": large_diff_threshold > 0 and changed_lines >= large_diff_threshold, 

152 } 

153 sample = _sample(low_risk_sample_rate, sample_bucket) 

154 operator_required, reason = _escalation_reason(risk, trust, triggers, sample) 

155 return { 

156 "schema_version": ESCALATION_SCHEMA_VERSION, 

157 "risk_tier": risk, 

158 "trust_signal": trust, 

159 "operator_required": operator_required, 

160 "decision": "operator-gate" if operator_required else "no-escalation", 

161 "reason": reason, 

162 "triggers": triggers, 

163 "consent_scope": list(consent_scope), 

164 "sample": sample, 

165 "enforcement_boundary": "execution-layer", 

166 "agent_can_self_approve": False, 

167 } 

168 

169 

170def normalize_scopes(scopes: Iterable[str]) -> tuple[str, ...]: 

171 """Normalize user-supplied consent scopes and reject unknown scope names.""" 

172 normalized = [] 

173 unknown = [] 

174 for raw in scopes: 

175 for part in str(raw).split(","): 

176 scope = part.strip() 

177 if not scope: 

178 continue 

179 if scope not in CONSENT_SCOPES: 

180 unknown.append(scope) 

181 else: 

182 normalized.append(scope) 

183 if unknown: 

184 raise ValueError( 

185 f"unknown consent scope {unknown[0]!r}; valid: {', '.join(CONSENT_SCOPES)}" 

186 ) 

187 return _sort_scopes(normalized) 

188 

189 

190def build_consent_contract( 

191 *, 

192 command: str, 

193 side_effects: Iterable[str], 

194 dry_run: bool, 

195 approved_scopes: Iterable[str] = (), 

196 approval_source: str = "flag", 

197 mode: str = "explicit", 

198 operator: str | None = None, 

199 target: str | None = None, 

200 now: datetime | None = None, 

201 risk_tier: str = "tier-1", 

202 trust_signal: str = "medium", 

203 retry_count: int = 0, 

204 conflicting_sources: bool = False, 

205 changed_lines: int = 0, 

206 large_diff_threshold: int = 500, 

207 low_risk_sample_rate: int = 0, 

208 sample_bucket: int = 0, 

209) -> dict[str, Any]: 

210 """Build a JSON-compatible consent block for a command contract.""" 

211 side_effects = tuple(side_effects) 

212 consent_scope = side_effect_scopes(side_effects) 

213 approved_scope = normalize_scopes(approved_scopes) 

214 if approval_source not in APPROVAL_SOURCES: 

215 raise ValueError( 

216 f"unknown approval source {approval_source!r}; valid: {', '.join(APPROVAL_SOURCES)}" 

217 ) 

218 if mode not in CONSENT_MODES: 

219 raise ValueError(f"unknown consent mode {mode!r}; valid: {', '.join(CONSENT_MODES)}") 

220 effective_approved_scope = tuple(scope for scope in consent_scope if scope in approved_scope) 

221 missing_scope = tuple(scope for scope in consent_scope if scope not in effective_approved_scope) 

222 would_require = bool(consent_scope) 

223 agent_delegated = (not dry_run) and mode == "agent" and would_require 

224 requires = (not dry_run) and bool(missing_scope) and not agent_delegated 

225 status = _status( 

226 dry_run=dry_run, 

227 would_require=would_require, 

228 requires=requires, 

229 agent_delegated=agent_delegated, 

230 ) 

231 approved_live = (not dry_run) and would_require and not missing_scope 

232 return { 

233 "schema_version": SCHEMA_VERSION, 

234 "requires_operator_consent": requires, 

235 "would_require_operator_consent": would_require, 

236 "status": status, 

237 "mode": mode, 

238 "consent_scope": list(consent_scope), 

239 "approved_scope": list(approved_scope), 

240 "approval_source": approval_source if approved_scope else "none", 

241 "effective_approved_scope": list(effective_approved_scope), 

242 "missing_scope": list(missing_scope if not dry_run else ()), 

243 "consent_prompt": _prompt(command, target, dry_run, consent_scope, missing_scope), 

244 "risk_trust_escalation": evaluate_escalation( 

245 risk_tier=risk_tier, 

246 trust_signal=trust_signal, 

247 side_effects=side_effects, 

248 retry_count=retry_count, 

249 conflicting_sources=conflicting_sources, 

250 changed_lines=changed_lines, 

251 large_diff_threshold=large_diff_threshold, 

252 low_risk_sample_rate=low_risk_sample_rate, 

253 sample_bucket=sample_bucket, 

254 ), 

255 "delegated_agent_scope": { 

256 "approved_mutation_scopes": list(effective_approved_scope if approved_live else ()), 

257 "scope_expansion_policy": "block-or-escalate", 

258 "secret_values_permitted_in_prompt": False, 

259 "secret_access_requires_explicit_scope": "secrets", 

260 }, 

261 "consent_record": ( 

262 _record(command, target, effective_approved_scope, operator, dry_run, now) 

263 | {"source": approval_source} 

264 if approved_live 

265 else None 

266 ), 

267 } 

268 

269 

270def assert_operator_consent(contract: dict[str, Any]) -> tuple[bool, str]: 

271 """Return whether a live run may proceed and the human-readable reason.""" 

272 if contract.get("requires_operator_consent"): 

273 missing = ", ".join(contract.get("missing_scope", [])) 

274 prompt = contract.get("consent_prompt") or "operator consent is required" 

275 return False, f"operator consent required: {prompt} Missing approved scope: {missing}." 

276 return True, "operator consent satisfied" 

277 

278 

279def _status(*, dry_run: bool, would_require: bool, requires: bool, agent_delegated: bool) -> str: 

280 if dry_run: 

281 return "not-required-dry-run" if would_require else "not-required-read-only" 

282 if agent_delegated: 

283 return "agent-delegated" 

284 if requires: 

285 return "missing" 

286 return "approved" if would_require else "not-required-read-only" 

287 

288 

289def _prompt( 

290 command: str, 

291 target: str | None, 

292 dry_run: bool, 

293 consent_scope: tuple[str, ...], 

294 missing_scope: tuple[str, ...], 

295) -> str: 

296 mode = "dry-run" if dry_run else "live" 

297 target_text = target or "unspecified target" 

298 scope_text = ", ".join(consent_scope) if consent_scope else "none" 

299 if dry_run: 

300 return ( 

301 f"Command {command!r} is planned for {mode} mode against {target_text}. " 

302 f"A live run would require operator approval for: {scope_text}." 

303 ) 

304 missing_text = ", ".join(missing_scope) if missing_scope else "none" 

305 return ( 

306 f"Approve command {command!r} for live mode against {target_text}. " 

307 f"Required scopes: {scope_text}. Missing approvals: {missing_text}." 

308 ) 

309 

310 

311def _record( 

312 command: str, 

313 target: str | None, 

314 approved_scope: tuple[str, ...], 

315 operator: str | None, 

316 dry_run: bool, 

317 now: datetime | None, 

318) -> dict[str, Any]: 

319 timestamp = (now or datetime.now(UTC)).astimezone(UTC) 

320 return { 

321 "timestamp": timestamp.isoformat().replace("+00:00", "Z"), 

322 "operator": operator, 

323 "workflow": command, 

324 "target": target, 

325 "scopes_approved": list(approved_scope), 

326 "dry_run": dry_run, 

327 "secret_values_recorded": False, 

328 } 

329 

330 

331def _sort_scopes(scopes: Iterable[str]) -> tuple[str, ...]: 

332 return tuple(sorted(set(scopes), key=lambda s: (_SCOPE_ORDER.get(s, 999), s))) 

333 

334 

335def _risk_tier(value: str) -> str: 

336 return value if value in RISK_TIERS else "tier-3" 

337 

338 

339def _trust_signal(value: str) -> str: 

340 return value if value in TRUST_SIGNALS else "low" 

341 

342 

343def _sample(rate: int, bucket: int) -> dict[str, Any]: 

344 sample_rate = min(100, max(0, int(rate))) 

345 sample_bucket = min(99, max(0, int(bucket))) 

346 return { 

347 "rate": sample_rate, 

348 "bucket": sample_bucket, 

349 "selected": sample_bucket < sample_rate, 

350 } 

351 

352 

353def _escalation_reason( 

354 risk: str, 

355 trust: str, 

356 triggers: dict[str, bool], 

357 sample: dict[str, Any], 

358) -> tuple[bool, str]: 

359 if triggers["irreversible_or_side_effecting"]: 

360 return True, "irreversible-or-side-effecting" 

361 if triggers["repeated_retry"]: 

362 return True, "repeated-retry" 

363 if triggers["conflicting_sources"]: 

364 return True, "conflicting-sources" 

365 if triggers["large_diff"]: 

366 return True, "large-diff" 

367 if risk == "tier-3" and trust != "high": 

368 return True, "risk-trust" 

369 if risk == "tier-2" and trust == "low": 

370 return True, "risk-trust" 

371 if sample["selected"]: 

372 return True, "low-risk-sample" 

373 return False, "below-escalation-threshold"