Coverage for src/keel/review.py: 100%
105 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Pure orchestration for ``keel review`` — the evidence-bundle orchestrator.
3The host agent runs the actual reviewers and produces the review *content*. This
4module takes that supplied content and deterministically decides what to render
5and where to post it: one head-pinned reviewer verdict per supplied review, an
6optional closure comment posted to both the PR and the linked issue, and the
7run-id sub-keys that bind each post to a stable, idempotent comment.
9Everything here is pure — no network, no subprocess, no clock, no randomness.
10Rendering uses the already-pure artifact/closure renderers. The CLI handler owns
11the head-SHA fetch, the actual posting, and the optional re-verify.
12"""
14from __future__ import annotations
16from dataclasses import dataclass, field
17from typing import Any
19from . import artifacts, closure, evidence
21SCHEMA_VERSION = "keel.review.v1"
24class ReviewError(ValueError):
25 """Raised when the supplied review bundle is malformed or under-count."""
28@dataclass(frozen=True)
29class ReviewItem:
30 """One parsed reviewer verdict supplied by the host agent."""
32 reviewer: str
33 verdict: str
34 scope: str | None
35 findings: tuple[dict[str, Any], ...]
36 testing: str | None
37 vendor: str | None = None
38 model: str | None = None
41@dataclass(frozen=True)
42class PostTarget:
43 """A single planned post: a rendered body bound to a target and run-id sub-key."""
45 artifact: str
46 target_kind: str
47 target_number: int
48 run_id: str
49 marker: str
50 body: str
52 def as_dict(self) -> dict[str, Any]:
53 return {
54 "artifact": self.artifact,
55 "target": {"kind": self.target_kind, "number": self.target_number},
56 "run_id": self.run_id,
57 "marker": self.marker,
58 "body": self.body,
59 }
62@dataclass(frozen=True)
63class ReviewPlan:
64 """The deterministic plan: rendered verdicts, optional closure, post targets."""
66 pull_request: int
67 issue: int | None
68 head_sha: str | None
69 run_id: str
70 tier: int | None
71 required_count: int
72 supplied_count: int
73 posts: tuple[PostTarget, ...] = field(default_factory=tuple)
75 def as_dict(self) -> dict[str, Any]:
76 return {
77 "schema_version": SCHEMA_VERSION,
78 "pull_request": self.pull_request,
79 "issue": self.issue,
80 "head_sha": self.head_sha,
81 "run_id": self.run_id,
82 "tier": self.tier,
83 "required_count": self.required_count,
84 "supplied_count": self.supplied_count,
85 "posts": [post.as_dict() for post in self.posts],
86 }
89def parse_reviews(raw: object) -> tuple[ReviewItem, ...]:
90 """Parse and validate a ``--reviews`` JSON payload into ``ReviewItem`` records."""
91 if not isinstance(raw, list):
92 raise ReviewError("reviews file must contain a JSON array of review objects")
93 items: list[ReviewItem] = []
94 for index, entry in enumerate(raw):
95 items.append(_parse_review(entry, index))
96 return tuple(items)
99def _parse_review(entry: object, index: int) -> ReviewItem:
100 if not isinstance(entry, dict):
101 raise ReviewError(f"review #{index + 1} must be a JSON object")
102 reviewer = entry.get("reviewer")
103 if not isinstance(reviewer, str) or not reviewer.strip():
104 raise ReviewError(f"review #{index + 1} requires a non-empty 'reviewer' string")
105 verdict = entry.get("verdict")
106 if not isinstance(verdict, str) or not verdict.strip():
107 raise ReviewError(f"review #{index + 1} requires a non-empty 'verdict' string")
108 scope = entry.get("scope")
109 if scope is not None and not isinstance(scope, str):
110 raise ReviewError(f"review #{index + 1} 'scope' must be a string when present")
111 testing = entry.get("testing")
112 if testing is not None and not isinstance(testing, str):
113 raise ReviewError(f"review #{index + 1} 'testing' must be a string when present")
114 vendor = _parse_provenance_field(entry.get("vendor"), index, "vendor")
115 model = _parse_provenance_field(entry.get("model"), index, "model")
116 findings = _parse_findings(entry.get("findings"), index)
117 return ReviewItem(
118 reviewer=reviewer.strip(),
119 verdict=verdict.strip(),
120 scope=scope,
121 findings=findings,
122 testing=testing,
123 vendor=vendor,
124 model=model,
125 )
128def _parse_provenance_field(value: object, index: int, name: str) -> str | None:
129 """Parse an optional ``vendor``/``model`` provenance string from a review entry."""
130 if value is None:
131 return None
132 if not isinstance(value, str):
133 raise ReviewError(f"review #{index + 1} '{name}' must be a string when present")
134 cleaned = value.strip()
135 return cleaned or None
138def _parse_findings(raw: object, index: int) -> tuple[dict[str, Any], ...]:
139 if raw is None:
140 return ()
141 if not isinstance(raw, list):
142 raise ReviewError(f"review #{index + 1} 'findings' must be a list when present")
143 findings: list[dict[str, Any]] = []
144 for finding_index, finding in enumerate(raw):
145 if not isinstance(finding, dict):
146 raise ReviewError(
147 f"review #{index + 1} finding #{finding_index + 1} must be a JSON object"
148 )
149 findings.append(dict(finding))
150 return tuple(findings)
153def review_run_id(run_id: str, reviewer: str) -> str:
154 """Stable per-reviewer run-id sub-key, e.g. ``<run-id>:rv-<reviewer-slug>``."""
155 return f"{run_id}:rv-{artifacts.slug(reviewer)}"
158def closure_run_id(run_id: str) -> str:
159 """Stable run-id sub-key for the closure-comment artifact."""
160 return f"{run_id}:closure"
163def build_review_plan(
164 reviews: tuple[ReviewItem, ...],
165 *,
166 required_count: int,
167 head_sha: str | None,
168 pull_request: int,
169 issue: int | None,
170 run_id: str,
171 tier: int | None,
172 closure_record: dict[str, Any] | None = None,
173) -> ReviewPlan:
174 """Validate the bundle against the required count and build the post plan.
176 Fewer supplied reviews than required fails; exact or more is allowed. Each
177 review renders as a head-pinned verdict posted to the PR. A closure record,
178 when supplied, renders once and posts to both the PR and the linked issue.
179 """
180 supplied = len(reviews)
181 if supplied < required_count:
182 raise ReviewError(
183 f"supplied {supplied} review(s) but tier requires at least "
184 f"{required_count}; refusing to under-post evidence"
185 )
186 posts: list[PostTarget] = []
187 for item in reviews:
188 body = artifacts.render_review_verdict(
189 reviewer=item.reviewer,
190 head_sha=head_sha,
191 verdict=item.verdict,
192 scope=item.scope,
193 findings=list(item.findings),
194 testing=item.testing,
195 vendor=item.vendor,
196 model=item.model,
197 )
198 posts.append(
199 PostTarget(
200 artifact="review-verdict",
201 target_kind="pr",
202 target_number=pull_request,
203 run_id=review_run_id(run_id, item.reviewer),
204 marker=evidence.REVIEW_VERDICT_MARKER,
205 body=body,
206 )
207 )
208 if closure_record is not None:
209 posts.extend(
210 _closure_posts(
211 closure_record,
212 pull_request=pull_request,
213 issue=issue,
214 run_id=run_id,
215 )
216 )
217 return ReviewPlan(
218 pull_request=pull_request,
219 issue=issue,
220 head_sha=head_sha,
221 run_id=run_id,
222 tier=tier,
223 required_count=required_count,
224 supplied_count=supplied,
225 posts=tuple(posts),
226 )
229def _closure_posts(
230 closure_record: dict[str, Any],
231 *,
232 pull_request: int,
233 issue: int | None,
234 run_id: str,
235) -> list[PostTarget]:
236 if not isinstance(closure_record, dict):
237 raise ReviewError("closure file must contain a JSON object")
238 body = closure.render_closure_comment(closure_record)
239 sub_run_id = closure_run_id(run_id)
240 targets: list[PostTarget] = [
241 PostTarget(
242 artifact="closure-comment",
243 target_kind="pr",
244 target_number=pull_request,
245 run_id=sub_run_id,
246 marker=closure.COMMENT_MARKER,
247 body=body,
248 )
249 ]
250 if issue is not None:
251 targets.append(
252 PostTarget(
253 artifact="closure-comment",
254 target_kind="issue",
255 target_number=issue,
256 run_id=sub_run_id,
257 marker=closure.COMMENT_MARKER,
258 body=body,
259 )
260 )
261 return targets