Coverage for src/keel/runcontrols.py: 100%
92 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Deterministic run budgets, step caps, and oscillation detection."""
3from __future__ import annotations
5from dataclasses import dataclass
6from typing import Any
8from . import artifacts, model
10SCHEMA_VERSION = "keel.run-controls.v1"
11DEFAULT_RUN_BUDGET = 250
12DEFAULT_STEP_CAP = 1
13DEFAULT_FIXLOOP_CAP = 3
14DEFAULT_REVIEWER_CAP = 3
15DEFAULT_TEST_CAP = 2
16DEFAULT_IDENTICAL_THRESHOLD = 3
17DEFAULT_ALTERNATION_WINDOW = 4
20@dataclass(frozen=True)
21class HaltReason:
22 """One deterministic hard-halt reason."""
24 control: str
25 reason: str
26 scope: str
27 observed: int | str
28 limit: int | str
29 action: str = "halt"
31 def as_dict(self) -> dict[str, Any]:
32 return {
33 "control": self.control,
34 "reason": self.reason,
35 "scope": self.scope,
36 "observed": self.observed,
37 "limit": self.limit,
38 "action": self.action,
39 "rendered": artifacts.render_run_control_halt(
40 control=self.control,
41 reason=self.reason,
42 scope=self.scope,
43 observed=self.observed,
44 limit=self.limit,
45 action=self.action,
46 ),
47 }
50def contract_as_dict() -> dict[str, Any]:
51 """Return the pure-core run-control contract for agentic commands."""
52 return {
53 "schema_version": SCHEMA_VERSION,
54 "consumer_neutral": True,
55 "deterministic": True,
56 "stdlib_only": True,
57 "fail_closed": True,
58 "wall_clock_timeouts": False,
59 "hard_halts": [
60 "run-budget-exceeded",
61 "step-cap-exceeded",
62 "oscillation-detected",
63 ],
64 "fail_soft_preserved": (
65 "soft failures are recorded by their owning step; only budget, cap, or "
66 "oscillation breaches produce a hard halt"
67 ),
68 "budget": {
69 "unit": "work_unit",
70 "default_max_work_units": DEFAULT_RUN_BUDGET,
71 "breach_policy": "halt-fail-closed",
72 },
73 "step_caps": {
74 "applies_to_slots": list(model.SLOTS),
75 "default_max_iterations": DEFAULT_STEP_CAP,
76 "overrides": _default_step_caps(),
77 "breach_policy": "halt-fail-closed",
78 },
79 "oscillation": {
80 "identical_action_threshold": DEFAULT_IDENTICAL_THRESHOLD,
81 "alternating_diff_window": DEFAULT_ALTERNATION_WINDOW,
82 "breach_policy": "halt-fail-closed",
83 },
84 "renderer": {
85 "halt_reason": "keel.artifacts.render_run_control_halt",
86 "marker": artifacts.RUN_CONTROL_HALT_MARKER,
87 },
88 }
91def evaluate_run_controls(
92 events: list[dict[str, Any]] | tuple[dict[str, Any], ...],
93 *,
94 max_work_units: int = DEFAULT_RUN_BUDGET,
95 default_step_cap: int = DEFAULT_STEP_CAP,
96 step_caps: dict[str, int] | None = None,
97 identical_action_threshold: int = DEFAULT_IDENTICAL_THRESHOLD,
98 alternating_diff_window: int = DEFAULT_ALTERNATION_WINDOW,
99) -> dict[str, Any]:
100 """Evaluate run controls and return pass or a structured hard halt."""
101 normalized = [_normalize_event(event) for event in events if isinstance(event, dict)]
102 reason = (
103 _budget_halt(normalized, max_work_units)
104 or _step_cap_halt(normalized, default_step_cap, step_caps or _default_step_caps())
105 or _oscillation_halt(
106 normalized,
107 identical_action_threshold=identical_action_threshold,
108 alternating_diff_window=alternating_diff_window,
109 )
110 )
111 return {
112 "schema_version": SCHEMA_VERSION,
113 "status": "halt" if reason else "pass",
114 "hard_halt": reason is not None,
115 "fail_closed": reason is not None,
116 "reason": reason.as_dict() if reason else None,
117 "summary": {
118 "event_count": len(normalized),
119 "work_units": sum(event["work_units"] for event in normalized),
120 },
121 }
124def _default_step_caps() -> dict[str, int]:
125 return {
126 "fixloop": DEFAULT_FIXLOOP_CAP,
127 "reviewers": DEFAULT_REVIEWER_CAP,
128 "tester": DEFAULT_TEST_CAP,
129 "test": DEFAULT_TEST_CAP,
130 }
133def _normalize_event(event: dict[str, Any]) -> dict[str, Any]:
134 return {
135 "step_id": _string(event.get("step_id")),
136 "slot": _string(event.get("slot")),
137 "action": _string(event.get("action")),
138 "output_fingerprint": _string(event.get("output_fingerprint")),
139 "diff_fingerprint": _string(event.get("diff_fingerprint")),
140 "work_units": _positive_int(event.get("work_units"), default=1),
141 "soft_failure": bool(event.get("soft_failure")),
142 }
145def _budget_halt(events: list[dict[str, Any]], max_work_units: int) -> HaltReason | None:
146 limit = _positive_int(max_work_units, default=DEFAULT_RUN_BUDGET)
147 observed = sum(event["work_units"] for event in events)
148 if observed > limit:
149 return HaltReason(
150 control="run-budget",
151 reason="run-budget-exceeded",
152 scope="run",
153 observed=observed,
154 limit=limit,
155 )
156 return None
159def _step_cap_halt(
160 events: list[dict[str, Any]],
161 default_step_cap: int,
162 step_caps: dict[str, int],
163) -> HaltReason | None:
164 default_limit = _positive_int(default_step_cap, default=DEFAULT_STEP_CAP)
165 counts: dict[str, int] = {}
166 for event in events:
167 scope = event["slot"] or event["step_id"]
168 if not scope:
169 continue
170 counts[scope] = counts.get(scope, 0) + 1
171 limit = _step_limit(scope, step_caps, default_limit)
172 if counts[scope] > limit:
173 return HaltReason(
174 control="step-cap",
175 reason="step-cap-exceeded",
176 scope=scope,
177 observed=counts[scope],
178 limit=limit,
179 )
180 return None
183def _oscillation_halt(
184 events: list[dict[str, Any]],
185 *,
186 identical_action_threshold: int,
187 alternating_diff_window: int,
188) -> HaltReason | None:
189 repeated = _repeated_identical_action(
190 events,
191 threshold=_positive_int(
192 identical_action_threshold,
193 default=DEFAULT_IDENTICAL_THRESHOLD,
194 ),
195 )
196 if repeated:
197 return repeated
198 return _alternating_diff(
199 events,
200 window=_positive_int(
201 alternating_diff_window,
202 default=DEFAULT_ALTERNATION_WINDOW,
203 ),
204 )
207def _repeated_identical_action(
208 events: list[dict[str, Any]],
209 *,
210 threshold: int,
211) -> HaltReason | None:
212 if threshold <= 1:
213 threshold = DEFAULT_IDENTICAL_THRESHOLD
214 streak = 0
215 previous: tuple[str, str, str, str] | None = None
216 for event in events:
217 current = (
218 event["step_id"],
219 event["slot"],
220 event["action"],
221 event["output_fingerprint"],
222 )
223 if not any(current):
224 previous = None
225 streak = 0
226 continue
227 streak = streak + 1 if current == previous else 1
228 previous = current
229 if streak >= threshold:
230 return HaltReason(
231 control="oscillation",
232 reason="repeated-identical-action",
233 scope=event["slot"] or event["step_id"] or "run",
234 observed=streak,
235 limit=threshold,
236 )
237 return None
240def _alternating_diff(events: list[dict[str, Any]], *, window: int) -> HaltReason | None:
241 if window < 4 or window % 2:
242 window = DEFAULT_ALTERNATION_WINDOW
243 diffs = [event["diff_fingerprint"] for event in events if event["diff_fingerprint"]]
244 if len(diffs) < window:
245 return None
246 tail = diffs[-window:]
247 left = tail[: window // 2]
248 right = tail[window // 2:]
249 if left == right and len(set(left)) > 1:
250 return HaltReason(
251 control="oscillation",
252 reason="alternating-diff-fingerprint",
253 scope="diff",
254 observed=",".join(tail),
255 limit=f"no {window}-round alternation",
256 )
257 return None
260def _string(value: Any) -> str:
261 return value.strip() if isinstance(value, str) and value.strip() else ""
264def _positive_int(value: Any, *, default: int) -> int:
265 return value if isinstance(value, int) and value > 0 else default
268def _step_limit(scope: str, step_caps: dict[str, int], default_limit: int) -> int:
269 default_caps = _default_step_caps()
270 scope_default = default_caps.get(scope, default_limit)
271 configured = step_caps.get(scope)
272 return configured if isinstance(configured, int) and configured > 0 else scope_default