Coverage for src/keel/review.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Pure orchestration for ``keel review`` — the evidence-bundle orchestrator. 

2 

3The host agent runs the actual reviewers and produces the review *content*. This 

4module takes that supplied content and deterministically decides what to render 

5and where to post it: one head-pinned reviewer verdict per supplied review, an 

6optional closure comment posted to both the PR and the linked issue, and the 

7run-id sub-keys that bind each post to a stable, idempotent comment. 

8 

9Everything here is pure — no network, no subprocess, no clock, no randomness. 

10Rendering uses the already-pure artifact/closure renderers. The CLI handler owns 

11the head-SHA fetch, the actual posting, and the optional re-verify. 

12""" 

13 

14from __future__ import annotations 

15 

16from dataclasses import dataclass, field 

17from typing import Any 

18 

19from . import artifacts, closure, evidence 

20 

21SCHEMA_VERSION = "keel.review.v1" 

22 

23 

24class ReviewError(ValueError): 

25 """Raised when the supplied review bundle is malformed or under-count.""" 

26 

27 

28@dataclass(frozen=True) 

29class ReviewItem: 

30 """One parsed reviewer verdict supplied by the host agent.""" 

31 

32 reviewer: str 

33 verdict: str 

34 scope: str | None 

35 findings: tuple[dict[str, Any], ...] 

36 testing: str | None 

37 vendor: str | None = None 

38 model: str | None = None 

39 

40 

41@dataclass(frozen=True) 

42class PostTarget: 

43 """A single planned post: a rendered body bound to a target and run-id sub-key.""" 

44 

45 artifact: str 

46 target_kind: str 

47 target_number: int 

48 run_id: str 

49 marker: str 

50 body: str 

51 

52 def as_dict(self) -> dict[str, Any]: 

53 return { 

54 "artifact": self.artifact, 

55 "target": {"kind": self.target_kind, "number": self.target_number}, 

56 "run_id": self.run_id, 

57 "marker": self.marker, 

58 "body": self.body, 

59 } 

60 

61 

62@dataclass(frozen=True) 

63class ReviewPlan: 

64 """The deterministic plan: rendered verdicts, optional closure, post targets.""" 

65 

66 pull_request: int 

67 issue: int | None 

68 head_sha: str | None 

69 run_id: str 

70 tier: int | None 

71 required_count: int 

72 supplied_count: int 

73 posts: tuple[PostTarget, ...] = field(default_factory=tuple) 

74 

75 def as_dict(self) -> dict[str, Any]: 

76 return { 

77 "schema_version": SCHEMA_VERSION, 

78 "pull_request": self.pull_request, 

79 "issue": self.issue, 

80 "head_sha": self.head_sha, 

81 "run_id": self.run_id, 

82 "tier": self.tier, 

83 "required_count": self.required_count, 

84 "supplied_count": self.supplied_count, 

85 "posts": [post.as_dict() for post in self.posts], 

86 } 

87 

88 

89def parse_reviews(raw: object) -> tuple[ReviewItem, ...]: 

90 """Parse and validate a ``--reviews`` JSON payload into ``ReviewItem`` records.""" 

91 if not isinstance(raw, list): 

92 raise ReviewError("reviews file must contain a JSON array of review objects") 

93 items: list[ReviewItem] = [] 

94 for index, entry in enumerate(raw): 

95 items.append(_parse_review(entry, index)) 

96 return tuple(items) 

97 

98 

99def _parse_review(entry: object, index: int) -> ReviewItem: 

100 if not isinstance(entry, dict): 

101 raise ReviewError(f"review #{index + 1} must be a JSON object") 

102 reviewer = entry.get("reviewer") 

103 if not isinstance(reviewer, str) or not reviewer.strip(): 

104 raise ReviewError(f"review #{index + 1} requires a non-empty 'reviewer' string") 

105 verdict = entry.get("verdict") 

106 if not isinstance(verdict, str) or not verdict.strip(): 

107 raise ReviewError(f"review #{index + 1} requires a non-empty 'verdict' string") 

108 scope = entry.get("scope") 

109 if scope is not None and not isinstance(scope, str): 

110 raise ReviewError(f"review #{index + 1} 'scope' must be a string when present") 

111 testing = entry.get("testing") 

112 if testing is not None and not isinstance(testing, str): 

113 raise ReviewError(f"review #{index + 1} 'testing' must be a string when present") 

114 vendor = _parse_provenance_field(entry.get("vendor"), index, "vendor") 

115 model = _parse_provenance_field(entry.get("model"), index, "model") 

116 findings = _parse_findings(entry.get("findings"), index) 

117 return ReviewItem( 

118 reviewer=reviewer.strip(), 

119 verdict=verdict.strip(), 

120 scope=scope, 

121 findings=findings, 

122 testing=testing, 

123 vendor=vendor, 

124 model=model, 

125 ) 

126 

127 

128def _parse_provenance_field(value: object, index: int, name: str) -> str | None: 

129 """Parse an optional ``vendor``/``model`` provenance string from a review entry.""" 

130 if value is None: 

131 return None 

132 if not isinstance(value, str): 

133 raise ReviewError(f"review #{index + 1} '{name}' must be a string when present") 

134 cleaned = value.strip() 

135 return cleaned or None 

136 

137 

138def _parse_findings(raw: object, index: int) -> tuple[dict[str, Any], ...]: 

139 if raw is None: 

140 return () 

141 if not isinstance(raw, list): 

142 raise ReviewError(f"review #{index + 1} 'findings' must be a list when present") 

143 findings: list[dict[str, Any]] = [] 

144 for finding_index, finding in enumerate(raw): 

145 if not isinstance(finding, dict): 

146 raise ReviewError( 

147 f"review #{index + 1} finding #{finding_index + 1} must be a JSON object" 

148 ) 

149 findings.append(dict(finding)) 

150 return tuple(findings) 

151 

152 

153def review_run_id(run_id: str, reviewer: str) -> str: 

154 """Stable per-reviewer run-id sub-key, e.g. ``<run-id>:rv-<reviewer-slug>``.""" 

155 return f"{run_id}:rv-{artifacts.slug(reviewer)}" 

156 

157 

158def closure_run_id(run_id: str) -> str: 

159 """Stable run-id sub-key for the closure-comment artifact.""" 

160 return f"{run_id}:closure" 

161 

162 

163def build_review_plan( 

164 reviews: tuple[ReviewItem, ...], 

165 *, 

166 required_count: int, 

167 head_sha: str | None, 

168 pull_request: int, 

169 issue: int | None, 

170 run_id: str, 

171 tier: int | None, 

172 closure_record: dict[str, Any] | None = None, 

173) -> ReviewPlan: 

174 """Validate the bundle against the required count and build the post plan. 

175 

176 Fewer supplied reviews than required fails; exact or more is allowed. Each 

177 review renders as a head-pinned verdict posted to the PR. A closure record, 

178 when supplied, renders once and posts to both the PR and the linked issue. 

179 """ 

180 supplied = len(reviews) 

181 if supplied < required_count: 

182 raise ReviewError( 

183 f"supplied {supplied} review(s) but tier requires at least " 

184 f"{required_count}; refusing to under-post evidence" 

185 ) 

186 posts: list[PostTarget] = [] 

187 for item in reviews: 

188 body = artifacts.render_review_verdict( 

189 reviewer=item.reviewer, 

190 head_sha=head_sha, 

191 verdict=item.verdict, 

192 scope=item.scope, 

193 findings=list(item.findings), 

194 testing=item.testing, 

195 vendor=item.vendor, 

196 model=item.model, 

197 ) 

198 posts.append( 

199 PostTarget( 

200 artifact="review-verdict", 

201 target_kind="pr", 

202 target_number=pull_request, 

203 run_id=review_run_id(run_id, item.reviewer), 

204 marker=evidence.REVIEW_VERDICT_MARKER, 

205 body=body, 

206 ) 

207 ) 

208 if closure_record is not None: 

209 posts.extend( 

210 _closure_posts( 

211 closure_record, 

212 pull_request=pull_request, 

213 issue=issue, 

214 run_id=run_id, 

215 ) 

216 ) 

217 return ReviewPlan( 

218 pull_request=pull_request, 

219 issue=issue, 

220 head_sha=head_sha, 

221 run_id=run_id, 

222 tier=tier, 

223 required_count=required_count, 

224 supplied_count=supplied, 

225 posts=tuple(posts), 

226 ) 

227 

228 

229def _closure_posts( 

230 closure_record: dict[str, Any], 

231 *, 

232 pull_request: int, 

233 issue: int | None, 

234 run_id: str, 

235) -> list[PostTarget]: 

236 if not isinstance(closure_record, dict): 

237 raise ReviewError("closure file must contain a JSON object") 

238 body = closure.render_closure_comment(closure_record) 

239 sub_run_id = closure_run_id(run_id) 

240 targets: list[PostTarget] = [ 

241 PostTarget( 

242 artifact="closure-comment", 

243 target_kind="pr", 

244 target_number=pull_request, 

245 run_id=sub_run_id, 

246 marker=closure.COMMENT_MARKER, 

247 body=body, 

248 ) 

249 ] 

250 if issue is not None: 

251 targets.append( 

252 PostTarget( 

253 artifact="closure-comment", 

254 target_kind="issue", 

255 target_number=issue, 

256 run_id=sub_run_id, 

257 marker=closure.COMMENT_MARKER, 

258 body=body, 

259 ) 

260 ) 

261 return targets