Coverage for src/keel/stepverifier.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Deterministic backbone step completion verification. 

2 

3Agentic ship steps may be performed by different runtimes, but advancing the 

4backbone must not depend on private prose. This module defines the shared 

5"done" contract for each step, the structured handoff shape between steps, and 

6the fail-closed transition check adapters can run before moving forward. 

7""" 

8 

9from __future__ import annotations 

10 

11from dataclasses import dataclass 

12from typing import Any 

13 

14from . import artifacts, evidence, model, provenance 

15 

16SCHEMA_VERSION = "keel.step-verification.v1" 

17HANDOFF_SCHEMA_VERSION = "keel.step-handoff.v1" 

18HANDOFF_MARKER = artifacts.STEP_HANDOFF_MARKER 

19COMPLETE_STATUS = "complete" 

20 

21 

22@dataclass(frozen=True) 

23class StepRequirement: 

24 """Required evidence for one backbone step.""" 

25 

26 step_id: str 

27 step_name: str 

28 required_evidence: tuple[str, ...] = () 

29 verifier: str = "keel.stepverifier.verify_step_completion" 

30 

31 def as_dict(self) -> dict[str, Any]: 

32 return { 

33 "step_id": self.step_id, 

34 "step_name": self.step_name, 

35 "required_evidence": list(self.required_evidence), 

36 "verifier": self.verifier, 

37 } 

38 

39 

40def contract_as_dict( 

41 review_contract: dict[str, Any], 

42 *, 

43 dry_run: bool = False, 

44 enforced: bool = True, 

45) -> dict[str, Any]: 

46 """Return the deterministic step-completion contract for ship-like flows.""" 

47 del dry_run # The contract describes the required done-state even for dry-run output. 

48 requirements = step_requirements(review_contract, dry_run=False, enforced=enforced) 

49 return { 

50 "schema_version": SCHEMA_VERSION, 

51 "consumer_neutral": True, 

52 "deterministic": True, 

53 "fail_closed": True, 

54 "dry_run_disables_runtime_gating": True, 

55 "source": "backbone_plan + evidence", 

56 "no_premature_termination": True, 

57 "handoff_schema": { 

58 "schema_version": HANDOFF_SCHEMA_VERSION, 

59 "required_fields": [ 

60 "schema_version", 

61 "step_id", 

62 "step_name", 

63 "status", 

64 "summary", 

65 "evidence_ids", 

66 "provenance", 

67 "rendered", 

68 ], 

69 "renderer": "keel.artifacts.render_step_handoff", 

70 "marker": HANDOFF_MARKER, 

71 }, 

72 "completion_rule": ( 

73 "A step may transition as success only when its structured handoff has " 

74 "status=complete and every required evidence id for that step is ok." 

75 ), 

76 "steps": [requirement.as_dict() for requirement in requirements], 

77 } 

78 

79 

80def step_requirements( 

81 review_contract: dict[str, Any], 

82 *, 

83 dry_run: bool = False, 

84 enforced: bool = True, 

85) -> tuple[StepRequirement, ...]: 

86 """Map the public evidence contract onto the fixed backbone steps.""" 

87 evidence_ids = [ 

88 item.id 

89 for item in evidence.required_items( 

90 review_contract, 

91 dry_run=dry_run, 

92 enforced=enforced, 

93 ) 

94 ] 

95 by_step = { 

96 "s7": tuple(item for item in evidence_ids if item.startswith("review-verdict-")), 

97 "s8": tuple(item for item in evidence_ids if item == "jury-verdict"), 

98 "s12": tuple(item for item in evidence_ids if item.startswith("closure-comment-")), 

99 } 

100 return tuple( 

101 StepRequirement( 

102 step_id=step.id, 

103 step_name=step.name, 

104 required_evidence=by_step.get(step.id, ()), 

105 ) 

106 for step in model.BACKBONE 

107 ) 

108 

109 

110def build_handoff( 

111 *, 

112 step_id: str, 

113 status: str = COMPLETE_STATUS, 

114 summary: str | None = None, 

115 evidence_ids: tuple[str, ...] | list[str] = (), 

116 next_step: str | None = None, 

117 producer: str | None = None, 

118 vendor: str | None = None, 

119 model_name: str | None = None, 

120 allowed_capabilities: tuple[str, ...] | list[str] = (), 

121) -> dict[str, Any]: 

122 """Build a structured handoff object rendered through canonical artifacts.""" 

123 step = model.get_step(step_id) 

124 clean_evidence = tuple( 

125 item.strip() for item in evidence_ids if isinstance(item, str) and item.strip() 

126 ) 

127 rendered = artifacts.render_step_handoff( 

128 step_id=step.id, 

129 step_name=step.name, 

130 status=status, 

131 summary=summary, 

132 next_step=next_step, 

133 evidence_ids=clean_evidence, 

134 ) 

135 return { 

136 "schema_version": HANDOFF_SCHEMA_VERSION, 

137 "step_id": step.id, 

138 "step_name": step.name, 

139 "status": status, 

140 "summary": summary or "No summary recorded.", 

141 "evidence_ids": list(clean_evidence), 

142 "next_step": next_step, 

143 "producer": producer, 

144 "provenance": provenance.source_tag( 

145 source_agent=producer, 

146 step_id=step.id, 

147 vendor=vendor, 

148 model=model_name, 

149 allowed_capabilities=allowed_capabilities, 

150 ), 

151 "rendered": rendered, 

152 } 

153 

154 

155def verify_step_completion( 

156 *, 

157 step_id: str, 

158 handoff: dict[str, Any] | None, 

159 evidence_report: dict[str, Any] | None, 

160 review_contract: dict[str, Any], 

161 dry_run: bool = False, 

162 enforced: bool = True, 

163) -> dict[str, Any]: 

164 """Verify that one step can be marked complete without trusting prose.""" 

165 requirement = _requirement_for( 

166 step_id, 

167 review_contract, 

168 dry_run=dry_run, 

169 enforced=enforced, 

170 ) 

171 checks = [ 

172 _check_handoff_schema(step_id, handoff), 

173 _check_handoff_status(handoff), 

174 _check_handoff_marker(handoff), 

175 _check_required_evidence(requirement, evidence_report), 

176 ] 

177 missing = [ 

178 reason 

179 for check in checks 

180 if not check["ok"] 

181 for reason in check["missing"] 

182 ] 

183 return { 

184 "schema_version": SCHEMA_VERSION, 

185 "step_id": step_id, 

186 "status": "pass" if not missing else "fail", 

187 "no_premature_termination": True, 

188 "required_evidence": list(requirement.required_evidence), 

189 "missing": missing, 

190 "checks": checks, 

191 } 

192 

193 

194def _requirement_for( 

195 step_id: str, 

196 review_contract: dict[str, Any], 

197 *, 

198 dry_run: bool, 

199 enforced: bool, 

200) -> StepRequirement: 

201 requirements = { 

202 requirement.step_id: requirement 

203 for requirement in step_requirements( 

204 review_contract, 

205 dry_run=dry_run, 

206 enforced=enforced, 

207 ) 

208 } 

209 if step_id not in requirements: 

210 raise KeyError(f"unknown backbone step: {step_id}") 

211 return requirements[step_id] 

212 

213 

214def _check_handoff_schema(step_id: str, handoff: dict[str, Any] | None) -> dict[str, Any]: 

215 if not isinstance(handoff, dict): 

216 return _check("handoff_schema", False, "handoff missing") 

217 if handoff.get("schema_version") != HANDOFF_SCHEMA_VERSION: 

218 return _check("handoff_schema", False, "handoff schema mismatch") 

219 if handoff.get("step_id") != step_id: 

220 return _check("handoff_schema", False, "handoff step mismatch") 

221 return _check("handoff_schema", True) 

222 

223 

224def _check_handoff_status(handoff: dict[str, Any] | None) -> dict[str, Any]: 

225 if not isinstance(handoff, dict): 

226 return _check("handoff_status", False, "handoff missing") 

227 if handoff.get("status") != COMPLETE_STATUS: 

228 return _check("handoff_status", False, "handoff not complete") 

229 return _check("handoff_status", True) 

230 

231 

232def _check_handoff_marker(handoff: dict[str, Any] | None) -> dict[str, Any]: 

233 if not isinstance(handoff, dict): 

234 return _check("handoff_renderer", False, "handoff missing") 

235 rendered = handoff.get("rendered") 

236 if not isinstance(rendered, str) or HANDOFF_MARKER not in rendered: 

237 return _check("handoff_renderer", False, "canonical handoff renderer missing") 

238 return _check("handoff_renderer", True) 

239 

240 

241def _check_required_evidence( 

242 requirement: StepRequirement, 

243 evidence_report: dict[str, Any] | None, 

244) -> dict[str, Any]: 

245 if not requirement.required_evidence: 

246 return _check("required_evidence", True) 

247 ok_ids = { 

248 result["id"] 

249 for result in _evidence_results(evidence_report) 

250 if result.get("ok") is True and isinstance(result.get("id"), str) 

251 } 

252 missing = [ 

253 evidence_id 

254 for evidence_id in requirement.required_evidence 

255 if evidence_id not in ok_ids 

256 ] 

257 return _check("required_evidence", not missing, *missing) 

258 

259 

260def _evidence_results(evidence_report: dict[str, Any] | None) -> tuple[dict[str, Any], ...]: 

261 if not isinstance(evidence_report, dict): 

262 return () 

263 results = evidence_report.get("results") 

264 if not isinstance(results, list): 

265 return () 

266 return tuple(item for item in results if isinstance(item, dict)) 

267 

268 

269def _check(name: str, ok: bool, *missing: str) -> dict[str, Any]: 

270 return { 

271 "name": name, 

272 "ok": ok, 

273 "missing": list(missing), 

274 }