Coverage for src/keel/runcontrols.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Deterministic run budgets, step caps, and oscillation detection.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from typing import Any 

7 

8from . import artifacts, model 

9 

10SCHEMA_VERSION = "keel.run-controls.v1" 

11DEFAULT_RUN_BUDGET = 250 

12DEFAULT_STEP_CAP = 1 

13DEFAULT_FIXLOOP_CAP = 3 

14DEFAULT_REVIEWER_CAP = 3 

15DEFAULT_TEST_CAP = 2 

16DEFAULT_IDENTICAL_THRESHOLD = 3 

17DEFAULT_ALTERNATION_WINDOW = 4 

18 

19 

20@dataclass(frozen=True) 

21class HaltReason: 

22 """One deterministic hard-halt reason.""" 

23 

24 control: str 

25 reason: str 

26 scope: str 

27 observed: int | str 

28 limit: int | str 

29 action: str = "halt" 

30 

31 def as_dict(self) -> dict[str, Any]: 

32 return { 

33 "control": self.control, 

34 "reason": self.reason, 

35 "scope": self.scope, 

36 "observed": self.observed, 

37 "limit": self.limit, 

38 "action": self.action, 

39 "rendered": artifacts.render_run_control_halt( 

40 control=self.control, 

41 reason=self.reason, 

42 scope=self.scope, 

43 observed=self.observed, 

44 limit=self.limit, 

45 action=self.action, 

46 ), 

47 } 

48 

49 

50def contract_as_dict() -> dict[str, Any]: 

51 """Return the pure-core run-control contract for agentic commands.""" 

52 return { 

53 "schema_version": SCHEMA_VERSION, 

54 "consumer_neutral": True, 

55 "deterministic": True, 

56 "stdlib_only": True, 

57 "fail_closed": True, 

58 "wall_clock_timeouts": False, 

59 "hard_halts": [ 

60 "run-budget-exceeded", 

61 "step-cap-exceeded", 

62 "oscillation-detected", 

63 ], 

64 "fail_soft_preserved": ( 

65 "soft failures are recorded by their owning step; only budget, cap, or " 

66 "oscillation breaches produce a hard halt" 

67 ), 

68 "budget": { 

69 "unit": "work_unit", 

70 "default_max_work_units": DEFAULT_RUN_BUDGET, 

71 "breach_policy": "halt-fail-closed", 

72 }, 

73 "step_caps": { 

74 "applies_to_slots": list(model.SLOTS), 

75 "default_max_iterations": DEFAULT_STEP_CAP, 

76 "overrides": _default_step_caps(), 

77 "breach_policy": "halt-fail-closed", 

78 }, 

79 "oscillation": { 

80 "identical_action_threshold": DEFAULT_IDENTICAL_THRESHOLD, 

81 "alternating_diff_window": DEFAULT_ALTERNATION_WINDOW, 

82 "breach_policy": "halt-fail-closed", 

83 }, 

84 "renderer": { 

85 "halt_reason": "keel.artifacts.render_run_control_halt", 

86 "marker": artifacts.RUN_CONTROL_HALT_MARKER, 

87 }, 

88 } 

89 

90 

91def evaluate_run_controls( 

92 events: list[dict[str, Any]] | tuple[dict[str, Any], ...], 

93 *, 

94 max_work_units: int = DEFAULT_RUN_BUDGET, 

95 default_step_cap: int = DEFAULT_STEP_CAP, 

96 step_caps: dict[str, int] | None = None, 

97 identical_action_threshold: int = DEFAULT_IDENTICAL_THRESHOLD, 

98 alternating_diff_window: int = DEFAULT_ALTERNATION_WINDOW, 

99) -> dict[str, Any]: 

100 """Evaluate run controls and return pass or a structured hard halt.""" 

101 normalized = [_normalize_event(event) for event in events if isinstance(event, dict)] 

102 reason = ( 

103 _budget_halt(normalized, max_work_units) 

104 or _step_cap_halt(normalized, default_step_cap, step_caps or _default_step_caps()) 

105 or _oscillation_halt( 

106 normalized, 

107 identical_action_threshold=identical_action_threshold, 

108 alternating_diff_window=alternating_diff_window, 

109 ) 

110 ) 

111 return { 

112 "schema_version": SCHEMA_VERSION, 

113 "status": "halt" if reason else "pass", 

114 "hard_halt": reason is not None, 

115 "fail_closed": reason is not None, 

116 "reason": reason.as_dict() if reason else None, 

117 "summary": { 

118 "event_count": len(normalized), 

119 "work_units": sum(event["work_units"] for event in normalized), 

120 }, 

121 } 

122 

123 

124def _default_step_caps() -> dict[str, int]: 

125 return { 

126 "fixloop": DEFAULT_FIXLOOP_CAP, 

127 "reviewers": DEFAULT_REVIEWER_CAP, 

128 "tester": DEFAULT_TEST_CAP, 

129 "test": DEFAULT_TEST_CAP, 

130 } 

131 

132 

133def _normalize_event(event: dict[str, Any]) -> dict[str, Any]: 

134 return { 

135 "step_id": _string(event.get("step_id")), 

136 "slot": _string(event.get("slot")), 

137 "action": _string(event.get("action")), 

138 "output_fingerprint": _string(event.get("output_fingerprint")), 

139 "diff_fingerprint": _string(event.get("diff_fingerprint")), 

140 "work_units": _positive_int(event.get("work_units"), default=1), 

141 "soft_failure": bool(event.get("soft_failure")), 

142 } 

143 

144 

145def _budget_halt(events: list[dict[str, Any]], max_work_units: int) -> HaltReason | None: 

146 limit = _positive_int(max_work_units, default=DEFAULT_RUN_BUDGET) 

147 observed = sum(event["work_units"] for event in events) 

148 if observed > limit: 

149 return HaltReason( 

150 control="run-budget", 

151 reason="run-budget-exceeded", 

152 scope="run", 

153 observed=observed, 

154 limit=limit, 

155 ) 

156 return None 

157 

158 

159def _step_cap_halt( 

160 events: list[dict[str, Any]], 

161 default_step_cap: int, 

162 step_caps: dict[str, int], 

163) -> HaltReason | None: 

164 default_limit = _positive_int(default_step_cap, default=DEFAULT_STEP_CAP) 

165 counts: dict[str, int] = {} 

166 for event in events: 

167 scope = event["slot"] or event["step_id"] 

168 if not scope: 

169 continue 

170 counts[scope] = counts.get(scope, 0) + 1 

171 limit = _step_limit(scope, step_caps, default_limit) 

172 if counts[scope] > limit: 

173 return HaltReason( 

174 control="step-cap", 

175 reason="step-cap-exceeded", 

176 scope=scope, 

177 observed=counts[scope], 

178 limit=limit, 

179 ) 

180 return None 

181 

182 

183def _oscillation_halt( 

184 events: list[dict[str, Any]], 

185 *, 

186 identical_action_threshold: int, 

187 alternating_diff_window: int, 

188) -> HaltReason | None: 

189 repeated = _repeated_identical_action( 

190 events, 

191 threshold=_positive_int( 

192 identical_action_threshold, 

193 default=DEFAULT_IDENTICAL_THRESHOLD, 

194 ), 

195 ) 

196 if repeated: 

197 return repeated 

198 return _alternating_diff( 

199 events, 

200 window=_positive_int( 

201 alternating_diff_window, 

202 default=DEFAULT_ALTERNATION_WINDOW, 

203 ), 

204 ) 

205 

206 

207def _repeated_identical_action( 

208 events: list[dict[str, Any]], 

209 *, 

210 threshold: int, 

211) -> HaltReason | None: 

212 if threshold <= 1: 

213 threshold = DEFAULT_IDENTICAL_THRESHOLD 

214 streak = 0 

215 previous: tuple[str, str, str, str] | None = None 

216 for event in events: 

217 current = ( 

218 event["step_id"], 

219 event["slot"], 

220 event["action"], 

221 event["output_fingerprint"], 

222 ) 

223 if not any(current): 

224 previous = None 

225 streak = 0 

226 continue 

227 streak = streak + 1 if current == previous else 1 

228 previous = current 

229 if streak >= threshold: 

230 return HaltReason( 

231 control="oscillation", 

232 reason="repeated-identical-action", 

233 scope=event["slot"] or event["step_id"] or "run", 

234 observed=streak, 

235 limit=threshold, 

236 ) 

237 return None 

238 

239 

240def _alternating_diff(events: list[dict[str, Any]], *, window: int) -> HaltReason | None: 

241 if window < 4 or window % 2: 

242 window = DEFAULT_ALTERNATION_WINDOW 

243 diffs = [event["diff_fingerprint"] for event in events if event["diff_fingerprint"]] 

244 if len(diffs) < window: 

245 return None 

246 tail = diffs[-window:] 

247 left = tail[: window // 2] 

248 right = tail[window // 2:] 

249 if left == right and len(set(left)) > 1: 

250 return HaltReason( 

251 control="oscillation", 

252 reason="alternating-diff-fingerprint", 

253 scope="diff", 

254 observed=",".join(tail), 

255 limit=f"no {window}-round alternation", 

256 ) 

257 return None 

258 

259 

260def _string(value: Any) -> str: 

261 return value.strip() if isinstance(value, str) and value.strip() else "" 

262 

263 

264def _positive_int(value: Any, *, default: int) -> int: 

265 return value if isinstance(value, int) and value > 0 else default 

266 

267 

268def _step_limit(scope: str, step_caps: dict[str, int], default_limit: int) -> int: 

269 default_caps = _default_step_caps() 

270 scope_default = default_caps.get(scope, default_limit) 

271 configured = step_caps.get(scope) 

272 return configured if isinstance(configured, int) and configured > 0 else scope_default