Coverage for src/keel/evidence.py: 100%

324 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Deterministic pre-merge evidence verification. 

2 

3The ship adapter is agentic, but the artifacts it must leave behind are not: 

4reviewer verdict comments/reviews, the optional jury verdict, and the stable 

5closure comment marker. This module keeps the check pure so CI can enforce it 

6without trusting prose in an agent prompt. 

7""" 

8 

9from __future__ import annotations 

10 

11import hashlib 

12import re 

13from collections.abc import Sequence 

14from dataclasses import dataclass 

15from typing import Any 

16 

17from . import agents, closure 

18 

19SCHEMA_VERSION = "keel.evidence.v1" 

20AGENT_LABEL_PREFIX = "agent:" 

21REVIEW_VERDICT_MARKER = "keel.review-verdict.v1" 

22JURY_VERDICT_MARKER = "keel.jury-verdict.v1" 

23SHIP_ASSESSMENT_HEADING = "### \U0001f6a2 keel ship" 

24DEFAULT_WAIVER_LABEL = "keel:evidence-waived" 

25TRUSTED_AUTHOR_ASSOCIATIONS = frozenset({"OWNER", "MEMBER", "COLLABORATOR"}) 

26TRUSTED_SHIP_ASSESSMENT_BOTS = frozenset({"github-actions", "github-actions[bot]"}) 

27 

28_FIELD_RE = re.compile(r"^\s*(?P<key>reviewer|head|vendor|model)\s*:\s*(?P<value>\S+)\s*$", 

29 re.IGNORECASE | re.MULTILINE) 

30_SHIP_BRANCH_RE = re.compile(r"^(feature|fix|chore|docs|test)/issue-\d+(?:-|$)") 

31 

32 

33@dataclass(frozen=True) 

34class EvidenceItem: 

35 id: str 

36 kind: str 

37 required: bool 

38 description: str 

39 

40 def as_dict(self) -> dict[str, Any]: 

41 return { 

42 "id": self.id, 

43 "kind": self.kind, 

44 "required": self.required, 

45 "description": self.description, 

46 } 

47 

48 

49def gate_active(labels: Sequence[str] | None, gate_label: str) -> bool: 

50 """Return whether ``gate_label`` is present in ``labels`` (None/empty -> False). 

51 

52 An empty ``gate_label`` is never active, so a misconfigured (blank) label can 

53 never silently match — the schema also forbids an empty ``evidence_gate_label``. 

54 """ 

55 if not gate_label: 

56 return False 

57 return gate_label in set(labels or ()) 

58 

59 

60def gate_decision( 

61 labels: Sequence[str] | None, 

62 gate_label: str, 

63 *, 

64 waiver_label: str = DEFAULT_WAIVER_LABEL, 

65 head_ref: str | None = None, 

66 pr_comments: list[dict[str, Any]] | None = None, 

67 pr_reviews: list[dict[str, Any]] | None = None, 

68 ledger_records: Sequence[object] | None = None, 

69) -> dict[str, Any]: 

70 """Return the fail-closed evidence-gate arming decision. 

71 

72 Ship provenance arms the gate by default. The only disarm path is an explicit 

73 waiver label applied by an operator; the legacy gate label remains an 

74 additional arming signal for already-installed workflows. 

75 """ 

76 label_set = set(labels or ()) 

77 if waiver_label and waiver_label in label_set: 

78 return _gate_decision(False, "operator-waiver-label", waiver_label, waived=True) 

79 if gate_active(labels, gate_label): 

80 return _gate_decision(True, "gate-label", gate_label) 

81 if head_ref and _SHIP_BRANCH_RE.search(head_ref): 

82 return _gate_decision(True, "ship-branch", head_ref) 

83 if _has_trusted_ship_assessment(pr_comments or []): 

84 return _gate_decision(True, "ship-assessment-comment", SHIP_ASSESSMENT_HEADING) 

85 if _has_trusted_review_marker([*(pr_comments or []), *(pr_reviews or [])]): 

86 return _gate_decision(True, "review-verdict-marker", REVIEW_VERDICT_MARKER) 

87 if ledger_records: 

88 return _gate_decision(True, "ship-run-ledger", "ship_run") 

89 return _gate_decision(False, "no-ship-provenance", None) 

90 

91 

92def _gate_decision( 

93 enforced: bool, 

94 reason: str, 

95 source: str | None, 

96 *, 

97 waived: bool = False, 

98) -> dict[str, Any]: 

99 return { 

100 "schema_version": SCHEMA_VERSION, 

101 "enforced": enforced, 

102 "waived": waived, 

103 "reason": reason, 

104 "source": source, 

105 } 

106 

107 

108def _has_trusted_ship_assessment(items: list[dict[str, Any]]) -> bool: 

109 return any( 

110 _is_ship_assessment_source(item) and _is_ship_assessment(_body(item)) 

111 for item in items 

112 ) 

113 

114 

115def _is_ship_assessment_source(item: dict[str, Any]) -> bool: 

116 if _is_trusted_source(item, enforced=True): 

117 return True 

118 user = item.get("user") if isinstance(item.get("user"), dict) else {} 

119 login = user.get("login") if isinstance(user.get("login"), str) else None 

120 return bool(login and login.lower() in TRUSTED_SHIP_ASSESSMENT_BOTS) 

121 

122 

123def contract_as_dict( 

124 review_contract: dict[str, Any], 

125 *, 

126 dry_run: bool = False, 

127 enforced: bool = True, 

128 deferrals: tuple[str, ...] = (), 

129) -> dict[str, Any]: 

130 """Return the required evidence set derived from review/jury flags.""" 

131 return { 

132 "schema_version": SCHEMA_VERSION, 

133 "enforced": enforced, 

134 "source": "review_merge_contract + closure_comment", 

135 "dry_run_disables_gating": True, 

136 "fail_closed": True, 

137 "require_distinct_vendors": _require_distinct_vendors(review_contract), 

138 "accepted_sources": { 

139 "closure": ( 

140 "trusted issue/PR comments carrying keel.closure-comment.v1" 

141 ), 

142 "review": ( 

143 "trusted PR review/comment carrying keel.review-verdict.v1 and current head" 

144 ), 

145 "jury": "trusted PR comment carrying keel.jury-verdict.v1 and current head", 

146 }, 

147 "not_accepted": [ 

148 "pull_request_body", 

149 "chat_summary", 

150 "untrusted_public_comment", 

151 "keel_ship_assessment_comment", 

152 ], 

153 "deferrals": list(deferrals), 

154 "required": [ 

155 item.as_dict() 

156 for item in required_items(review_contract, dry_run=False, enforced=enforced) 

157 ], 

158 "active_required": [ 

159 item.as_dict() 

160 for item in required_items(review_contract, dry_run=dry_run, enforced=enforced) 

161 ], 

162 } 

163 

164 

165def required_items( 

166 review_contract: dict[str, Any], 

167 *, 

168 dry_run: bool = False, 

169 enforced: bool = True, 

170) -> tuple[EvidenceItem, ...]: 

171 """Return the tier/flag-derived evidence requirements.""" 

172 if dry_run or not enforced: 

173 return () 

174 reviewers = review_contract.get("reviewers") 

175 reviewer_count = reviewers.get("count") if isinstance(reviewers, dict) else 0 

176 reviewer_count = reviewer_count if isinstance(reviewer_count, int) and reviewer_count > 0 else 0 

177 jury = review_contract.get("jury") 

178 jury_required = ( 

179 isinstance(jury, dict) 

180 and bool(jury.get("enabled")) 

181 and jury.get("mode") == "gating" 

182 ) 

183 items: list[EvidenceItem] = [ 

184 EvidenceItem( 

185 "closure-comment-pr", 

186 "closure", 

187 True, 

188 "PR conversation comment with keel.closure-comment.v1 marker", 

189 ), 

190 EvidenceItem( 

191 "closure-comment-issue", 

192 "closure", 

193 True, 

194 "Linked issue comment with keel.closure-comment.v1 marker", 

195 ), 

196 ] 

197 for index in range(1, reviewer_count + 1): 

198 items.append(EvidenceItem( 

199 f"review-verdict-{index}", 

200 "review", 

201 True, 

202 "Distinct posted s7 reviewer verdict for the current PR", 

203 )) 

204 if jury_required: 

205 items.append(EvidenceItem( 

206 "jury-verdict", 

207 "jury", 

208 True, 

209 "Posted gating jury verdict comment for the current PR", 

210 )) 

211 return tuple(items) 

212 

213 

214def verify( 

215 review_contract: dict[str, Any], 

216 *, 

217 pr_comments: list[dict[str, Any]] | None = None, 

218 issue_comments: list[dict[str, Any]] | None = None, 

219 pr_reviews: list[dict[str, Any]] | None = None, 

220 pr_body: str | None = None, 

221 pr_labels: Sequence[str] | None = None, 

222 head_sha: str | None = None, 

223 ledger_record: dict[str, Any] | None = None, 

224 dry_run: bool = False, 

225 enforced: bool = True, 

226 deferrals: tuple[str, ...] = (), 

227) -> dict[str, Any]: 

228 """Verify required evidence artifacts and return a deterministic report. 

229 

230 When ``ledger_record`` is the ship_run record for this PR, a closure comment 

231 only counts when its content matches the canonical render of that record 

232 (closure-comment fidelity). Without a record the marker-only behavior holds. 

233 

234 When the gate is active, ``pr_labels`` are additionally checked for the 

235 mandatory ``agent:<vendor>`` attribution label (and cross-checked against the 

236 ledger implementer vendor when a record is present); see 

237 :func:`attribution_check`. 

238 """ 

239 del pr_body # Explicitly not accepted as evidence. 

240 items = required_items(review_contract, dry_run=dry_run, enforced=enforced) 

241 deferred = set(deferrals) 

242 counts = _evidence_counts( 

243 pr_comments=pr_comments or [], 

244 issue_comments=issue_comments or [], 

245 pr_reviews=pr_reviews or [], 

246 head_sha=head_sha, 

247 enforced=enforced, 

248 ledger_record=ledger_record, 

249 ) 

250 findings = _run_context_findings( 

251 pr_comments=pr_comments or [], 

252 issue_comments=issue_comments or [], 

253 enforced=enforced, 

254 ledger_record=ledger_record, 

255 ) 

256 mismatch = _closure_mismatch_scopes( 

257 pr_comments=pr_comments or [], 

258 issue_comments=issue_comments or [], 

259 enforced=enforced, 

260 ledger_record=ledger_record, 

261 ) 

262 results = [] 

263 for item in items: 

264 present = _is_present(item, counts) 

265 is_deferred = item.id in deferred or item.kind in deferred or "all" in deferred 

266 ok = present or is_deferred 

267 results.append({ 

268 "id": item.id, 

269 "kind": item.kind, 

270 "required": item.required, 

271 "present": present, 

272 "deferred": is_deferred, 

273 "ok": ok, 

274 "reason": None if ok else _result_reason(item, mismatch), 

275 }) 

276 missing = [result["id"] for result in results if not result["ok"]] 

277 distinct = _distinct_vendor_finding( 

278 review_contract, 

279 items=items, 

280 deferred=deferred, 

281 pr_comments=pr_comments or [], 

282 pr_reviews=pr_reviews or [], 

283 head_sha=head_sha, 

284 enforced=enforced, 

285 ) 

286 if distinct is not None: 

287 findings = [*findings, distinct] 

288 attribution = _attribution_finding( 

289 pr_labels=pr_labels, 

290 enforced=enforced and not dry_run, 

291 ledger_record=ledger_record, 

292 ) 

293 if attribution is not None: 

294 findings = [*findings, attribution] 

295 blocking_findings = [finding for finding in findings if finding["severity"] == "major"] 

296 return { 

297 "schema_version": SCHEMA_VERSION, 

298 "status": "pass" if not missing and not blocking_findings else "fail", 

299 "dry_run": dry_run, 

300 "enforced": enforced, 

301 "required_count": len(items), 

302 "missing": missing, 

303 "results": results, 

304 "counts": counts, 

305 "findings": findings, 

306 } 

307 

308 

309def _require_distinct_vendors(review_contract: dict[str, Any]) -> bool: 

310 reviewers = review_contract.get("reviewers") 

311 return bool(reviewers.get("require_distinct_vendors")) if isinstance(reviewers, dict) else False 

312 

313 

314def _distinct_vendor_finding( 

315 review_contract: dict[str, Any], 

316 *, 

317 items: tuple[EvidenceItem, ...], 

318 deferred: set[str], 

319 pr_comments: list[dict[str, Any]], 

320 pr_reviews: list[dict[str, Any]], 

321 head_sha: str | None, 

322 enforced: bool, 

323) -> dict[str, Any] | None: 

324 """Return a blocking finding when the optional vendor-distinctness check fails. 

325 

326 Off by default: ``None`` unless ``reviewers.require_distinct_vendors`` is set 

327 on the contract. Skipped when review evidence is deferred so the knob never 

328 overrides an explicit deferral. 

329 """ 

330 if not _require_distinct_vendors(review_contract): 

331 return None 

332 if "review" in deferred or "all" in deferred: 

333 return None 

334 required = sum(1 for item in items if item.kind == "review" and item.id not in deferred) 

335 if required <= 0: 

336 return None 

337 provenance = _review_vendor_provenance( 

338 [*pr_comments, *pr_reviews], 

339 head_sha=head_sha, 

340 enforced=enforced, 

341 ) 

342 result = distinct_vendor_check(list(provenance.values()), required_count=required) 

343 if result["ok"]: 

344 return None 

345 return { 

346 "id": "review-vendor-distinctness", 

347 "severity": "major", 

348 "kind": "review", 

349 "message": f"require_distinct_vendors: {result['reason']}.", 

350 } 

351 

352 

353_CLOSURE_MISMATCH_REASON = ( 

354 "closure comment does not match the ship_run ledger record" 

355) 

356 

357 

358def _result_reason(item: EvidenceItem, mismatch: set[str]) -> str: 

359 if item.id == "closure-comment-pr" and "pr" in mismatch: 

360 return _CLOSURE_MISMATCH_REASON 

361 if item.id == "closure-comment-issue" and "issue" in mismatch: 

362 return _CLOSURE_MISMATCH_REASON 

363 return f"missing required evidence: {item.id}" 

364 

365 

366def _closure_mismatch_scopes( 

367 *, 

368 pr_comments: list[dict[str, Any]], 

369 issue_comments: list[dict[str, Any]], 

370 enforced: bool, 

371 ledger_record: dict[str, Any] | None, 

372) -> set[str]: 

373 """Return scopes ({"pr"}/{"issue"}) where a marker closure mismatched the ledger. 

374 

375 A scope is reported only when a trusted marker-bearing closure exists but none 

376 of them match the record — so a stale comment alongside a correct re-post does 

377 not produce a misleading mismatch reason. 

378 """ 

379 if ledger_record is None: 

380 return set() 

381 scopes: set[str] = set() 

382 for scope, comments in (("pr", pr_comments), ("issue", issue_comments)): 

383 markered = [ 

384 comment for comment in comments 

385 if _is_trusted_source(comment, enforced=enforced) 

386 and _has_closure_marker(_body(comment)) 

387 ] 

388 if markered and not any( 

389 closure_body_matches_record(_body(comment), ledger_record) 

390 for comment in markered 

391 ): 

392 scopes.add(scope) 

393 return scopes 

394 

395 

396def _evidence_counts( 

397 *, 

398 pr_comments: list[dict[str, Any]], 

399 issue_comments: list[dict[str, Any]], 

400 pr_reviews: list[dict[str, Any]], 

401 head_sha: str | None = None, 

402 enforced: bool = True, 

403 ledger_record: dict[str, Any] | None = None, 

404) -> dict[str, int]: 

405 review_keys = _review_evidence_keys( 

406 [*pr_comments, *pr_reviews], 

407 head_sha=head_sha, 

408 enforced=enforced, 

409 ) 

410 return { 

411 "closure_pr": sum( 

412 _is_closure_comment(comment, enforced=enforced, record=ledger_record) 

413 for comment in pr_comments 

414 ), 

415 "closure_issue": sum( 

416 _is_closure_comment(comment, enforced=enforced, record=ledger_record) 

417 for comment in issue_comments 

418 ), 

419 "review_verdict": len(review_keys), 

420 "jury_verdict": sum(_is_jury_verdict(comment, head_sha=head_sha, enforced=enforced) 

421 for comment in pr_comments), 

422 } 

423 

424 

425def _is_present(item: EvidenceItem, counts: dict[str, int]) -> bool: 

426 if item.id == "closure-comment-pr": 

427 return counts["closure_pr"] >= 1 

428 if item.id == "closure-comment-issue": 

429 return counts["closure_issue"] >= 1 

430 if item.kind == "review": 

431 index = int(item.id.rsplit("-", 1)[1]) 

432 return counts["review_verdict"] >= index 

433 if item.id == "jury-verdict": 

434 return counts["jury_verdict"] >= 1 

435 return False 

436 

437 

438def _body(item: dict[str, Any]) -> str: 

439 body = item.get("body") 

440 return body if isinstance(body, str) else "" 

441 

442 

443def _has_closure_marker(body: str) -> bool: 

444 return closure.COMMENT_MARKER in body 

445 

446 

447def _normalize_closure_body(body: str) -> str: 

448 """Normalize a closure body for content comparison. 

449 

450 Robust to harmless formatting drift but sensitive to real content changes: 

451 trailing whitespace is stripped per line, runs of blank lines collapse to a 

452 single blank line, and leading/trailing blank lines are dropped. 

453 """ 

454 lines = [line.rstrip() for line in body.splitlines()] 

455 normalized: list[str] = [] 

456 for line in lines: 

457 if not line and (not normalized or not normalized[-1]): 

458 continue 

459 normalized.append(line) 

460 while normalized and not normalized[-1]: 

461 normalized.pop() 

462 return "\n".join(normalized) 

463 

464 

465def closure_body_matches_record(body: str, record: dict[str, Any]) -> bool: 

466 """Return whether ``body`` matches the canonical render of ``record``.""" 

467 expected = closure.render_closure_comment(record) 

468 return _normalize_closure_body(body) == _normalize_closure_body(expected) 

469 

470 

471def _is_closure_comment( 

472 item: dict[str, Any], 

473 *, 

474 enforced: bool = True, 

475 record: dict[str, Any] | None = None, 

476) -> bool: 

477 if not _is_trusted_source(item, enforced=enforced): 

478 return False 

479 if not _has_closure_marker(_body(item)): 

480 return False 

481 if record is None: 

482 return True 

483 return closure_body_matches_record(_body(item), record) 

484 

485 

486def _run_context_findings( 

487 *, 

488 pr_comments: list[dict[str, Any]], 

489 issue_comments: list[dict[str, Any]], 

490 enforced: bool, 

491 ledger_record: dict[str, Any] | None = None, 

492) -> list[dict[str, Any]]: 

493 comments = [*pr_comments, *issue_comments] 

494 findings: list[dict[str, Any]] = [] 

495 for item in comments: 

496 if not _is_closure_comment(item, enforced=enforced, record=ledger_record): 

497 continue 

498 body = _body(item) 

499 if _has_empty_run_context(body): 

500 findings.append({ 

501 "id": "run-context-empty", 

502 "severity": "major" if enforced else "minor", 

503 "kind": "closure", 

504 "message": "Closure comment Run context is fully degraded.", 

505 }) 

506 return findings 

507 

508 

509def _has_empty_run_context(body: str) -> bool: 

510 if "### Run context" not in body: 

511 return False 

512 fields = _run_context_fields(body) 

513 return fields == { 

514 "host agent": "unknown", 

515 "transport": "unknown", 

516 "profile": "unknown", 

517 "jury": "off", 

518 "consent": "unknown (scopes: none)", 

519 } 

520 

521 

522def _run_context_fields(body: str) -> dict[str, str]: 

523 fields: dict[str, str] = {} 

524 in_block = False 

525 for line in body.splitlines(): 

526 if line.strip() == "### Run context": 

527 in_block = True 

528 continue 

529 if in_block and line.startswith("### "): 

530 break 

531 if not in_block: 

532 continue 

533 match = re.match(r"^-\s+\*\*(?P<key>[^*]+):\*\*\s+(?P<value>.+?)\s*$", line) 

534 if match: 

535 fields[match.group("key").strip().lower()] = match.group("value").strip().lower() 

536 return fields 

537 

538 

539def _is_trusted_source(item: dict[str, Any], *, enforced: bool = True) -> bool: 

540 """Return whether GitHub marks this evidence source as trusted. 

541 

542 Live GitHub comment/review payloads include ``author_association``. Enforced 

543 evidence fails closed when that field is absent because offline fixtures are 

544 agent-writable and must not manufacture trust. Untrusted explicit 

545 associations fail closed even if the author type is ``Bot``. 

546 """ 

547 association = item.get("author_association") 

548 if association is None: 

549 return not enforced 

550 if isinstance(association, str) and association.upper() in TRUSTED_AUTHOR_ASSOCIATIONS: 

551 return True 

552 return False 

553 

554 

555def _is_ship_assessment(body: str) -> bool: 

556 return SHIP_ASSESSMENT_HEADING in body or "keel ship \u2014" in body 

557 

558 

559def count_review_verdicts( 

560 pr_comments: list[dict[str, Any]] | None = None, 

561 pr_reviews: list[dict[str, Any]] | None = None, 

562 *, 

563 head_sha: str | None = None, 

564 enforced: bool = True, 

565) -> int: 

566 """Count distinct trusted review-verdict reviewers for a PR. 

567 

568 This is the same evidence-side counting the verify report uses for the 

569 ``review`` items: it collapses idempotent re-posts by the same reviewer to 

570 one verdict and only counts trusted, head-bound verdicts. Reused by capture 

571 reconcile to cross-check the ledger's recorded reviewer count. 

572 """ 

573 keys = _review_evidence_keys( 

574 [*(pr_comments or []), *(pr_reviews or [])], 

575 head_sha=head_sha, 

576 enforced=enforced, 

577 ) 

578 return len(keys) 

579 

580 

581def _review_evidence_keys( 

582 items: list[dict[str, Any]], 

583 *, 

584 head_sha: str | None = None, 

585 enforced: bool = True, 

586) -> set[str]: 

587 keys: set[str] = set() 

588 for item in items: 

589 if not _is_trusted_source(item, enforced=enforced): 

590 continue 

591 body = _body(item) 

592 if not _is_review_verdict_body(body): 

593 continue 

594 if not _matches_head(item, body, head_sha): 

595 continue 

596 keys.add(_reviewer_key(item, body)) 

597 return keys 

598 

599 

600def _review_vendor_provenance( 

601 items: list[dict[str, Any]], 

602 *, 

603 head_sha: str | None = None, 

604 enforced: bool = True, 

605) -> dict[str, str | None]: 

606 """Map each accepted review-verdict reviewer-key to its declared vendor. 

607 

608 The value is the lower-cased ``vendor:`` provenance for that verdict, or 

609 ``None`` when the verdict carries no vendor field. Keys mirror 

610 :func:`_review_evidence_keys`, so duplicate reviewer-keys collapse to one 

611 entry (idempotent re-posts do not inflate the vendor set). 

612 """ 

613 provenance: dict[str, str | None] = {} 

614 for item in items: 

615 if not _is_trusted_source(item, enforced=enforced): 

616 continue 

617 body = _body(item) 

618 if not _is_review_verdict_body(body): 

619 continue 

620 if not _matches_head(item, body, head_sha): 

621 continue 

622 key = _reviewer_key(item, body) 

623 if key in provenance: 

624 continue 

625 vendor = _fields(body).get("vendor") 

626 provenance[key] = vendor.lower() if vendor else None 

627 return provenance 

628 

629 

630def distinct_vendor_check( 

631 vendors: Sequence[str | None], 

632 *, 

633 required_count: int, 

634) -> dict[str, Any]: 

635 """Pure vendor-distinctness check over review-verdict provenance. 

636 

637 ``vendors`` is one entry per accepted review verdict: the declared vendor, or 

638 ``None`` when the verdict carries no vendor provenance. The check passes only 

639 when at least ``required_count`` verdicts each declare a vendor and those 

640 vendors are all distinct. It fails when a required verdict is missing vendor 

641 provenance, or when two required verdicts share a vendor. 

642 

643 Returns ``{ok, reason, duplicated, missing_provenance}``. No I/O — fully 

644 unit-testable. A non-positive ``required_count`` always passes (nothing to 

645 require). 

646 """ 

647 if required_count <= 0: 

648 return {"ok": True, "reason": None, "duplicated": [], "missing_provenance": 0} 

649 present = [vendor for vendor in vendors if vendor] 

650 missing = len(vendors) - len(present) 

651 seen: set[str] = set() 

652 duplicated: list[str] = [] 

653 for vendor in present: 

654 if vendor in seen and vendor not in duplicated: 

655 duplicated.append(vendor) 

656 seen.add(vendor) 

657 if len(present) < required_count: 

658 return { 

659 "ok": False, 

660 "reason": "missing vendor provenance on required review verdict(s)", 

661 "duplicated": duplicated, 

662 "missing_provenance": missing, 

663 } 

664 if duplicated: 

665 return { 

666 "ok": False, 

667 "reason": f"review verdicts share a vendor: {', '.join(sorted(duplicated))}", 

668 "duplicated": sorted(duplicated), 

669 "missing_provenance": missing, 

670 } 

671 return {"ok": True, "reason": None, "duplicated": [], "missing_provenance": missing} 

672 

673 

674def agent_label_vendors(labels: Sequence[str] | None) -> list[str]: 

675 """Return the lower-cased vendor slugs from every ``agent:<vendor>`` label. 

676 

677 A blank vendor (a bare ``agent:`` label) is ignored. Order is preserved and 

678 duplicates are kept so callers can reason about the raw label set; this is a 

679 pure helper with no I/O. 

680 """ 

681 vendors: list[str] = [] 

682 for label in labels or (): 

683 if not isinstance(label, str) or not label.startswith(AGENT_LABEL_PREFIX): 

684 continue 

685 vendor = label[len(AGENT_LABEL_PREFIX):].strip().lower() 

686 if vendor: 

687 vendors.append(vendor) 

688 return vendors 

689 

690 

691def ledger_implementer_vendor(ledger_record: dict[str, Any] | None) -> str | None: 

692 """Return the implementer's vendor slug from a ship_run ``ledger_record``. 

693 

694 The ledger stores the effective implementer as a codename or ``vendor:model`` 

695 string under ``actors.implementer``; the vendor is the part before the first 

696 ``:``. Returns ``None`` when no record, no implementer, or a blank implementer 

697 is recorded so the cross-check can degrade to presence-only. Pure — no I/O. 

698 """ 

699 if not isinstance(ledger_record, dict): 

700 return None 

701 actors = ledger_record.get("actors") 

702 implementer = actors.get("implementer") if isinstance(actors, dict) else None 

703 if not isinstance(implementer, str) or not implementer.strip(): 

704 return None 

705 vendor, _ = agents.split_delegate(implementer.strip()) 

706 vendor = vendor.strip().lower() 

707 return vendor or None 

708 

709 

710def attribution_check( 

711 labels: Sequence[str] | None, 

712 *, 

713 implementer_vendor: str | None = None, 

714) -> dict[str, Any]: 

715 """Pure attribution-label check over a PR's labels and the ledger implementer. 

716 

717 Two layers, both fail-closed only on a real contradiction: 

718 

719 * **Presence** — at least one non-blank ``agent:<vendor>`` label must exist. 

720 Missing one is a ``missing-label`` finding. 

721 * **Cross-check** — when ``implementer_vendor`` is known (a ship_run record 

722 recorded an implementer), one of the PR's ``agent:*`` vendors must match it. 

723 A mismatch is a ``vendor-mismatch`` finding. When ``implementer_vendor`` is 

724 ``None`` (no record / no implementer) only the presence layer runs, so PRs 

725 that predate attribution recording are not broken. 

726 

727 Returns ``{ok, reason, label_vendors, implementer_vendor}``. No I/O. 

728 """ 

729 label_vendors = agent_label_vendors(labels) 

730 implementer = implementer_vendor.strip().lower() if implementer_vendor else None 

731 if not label_vendors: 

732 return { 

733 "ok": False, 

734 "reason": "missing-label", 

735 "label_vendors": label_vendors, 

736 "implementer_vendor": implementer, 

737 } 

738 if implementer is not None and implementer not in label_vendors: 

739 return { 

740 "ok": False, 

741 "reason": "vendor-mismatch", 

742 "label_vendors": label_vendors, 

743 "implementer_vendor": implementer, 

744 } 

745 return { 

746 "ok": True, 

747 "reason": None, 

748 "label_vendors": label_vendors, 

749 "implementer_vendor": implementer, 

750 } 

751 

752 

753def _attribution_finding( 

754 *, 

755 pr_labels: Sequence[str] | None, 

756 enforced: bool, 

757 ledger_record: dict[str, Any] | None, 

758) -> dict[str, Any] | None: 

759 """Return a blocking attribution finding when the gate is active, else ``None``. 

760 

761 Only runs when the evidence gate is active (``enforced``) *and* PR labels were 

762 actually fetched (``pr_labels is not None``): the presence check is cheap and 

763 default-on, while the vendor cross-check engages only when the ledger recorded 

764 an implementer vendor. Degrades gracefully — labels not available skips the 

765 check entirely, no record means presence-only, and a gate-inactive run skips 

766 the check (back-compat with callers that never pass labels). 

767 """ 

768 if not enforced or pr_labels is None: 

769 return None 

770 implementer_vendor = ledger_implementer_vendor(ledger_record) 

771 result = attribution_check(pr_labels, implementer_vendor=implementer_vendor) 

772 if result["ok"]: 

773 return None 

774 if result["reason"] == "missing-label": 

775 message = "PR is missing a mandatory agent:<vendor> attribution label." 

776 else: 

777 message = ( 

778 "PR agent:<vendor> attribution " 

779 f"({', '.join(result['label_vendors'])}) does not match the ship_run " 

780 f"ledger implementer vendor ({result['implementer_vendor']})." 

781 ) 

782 return { 

783 "id": "attribution-label", 

784 "severity": "major", 

785 "kind": "attribution", 

786 "message": message, 

787 } 

788 

789 

790def _reviewer_key(item: dict[str, Any], body: str) -> str: 

791 fields = _fields(body) 

792 reviewer = fields.get("reviewer") 

793 if reviewer: 

794 return f"reviewer:{reviewer.lower()}" 

795 user = item.get("user") 

796 if isinstance(user, dict) and isinstance(user.get("login"), str) and user["login"]: 

797 return f"user:{user['login'].lower()}" 

798 digest = hashlib.sha256(body.encode("utf-8")).hexdigest() 

799 return f"body:{digest}" 

800 

801 

802def _matches_head(item: dict[str, Any], body: str, head_sha: str | None) -> bool: 

803 if not head_sha: 

804 return True 

805 fields = _fields(body) 

806 recorded = fields.get("head") 

807 if recorded: 

808 return recorded == head_sha 

809 commit_id = item.get("commit_id") 

810 return isinstance(commit_id, str) and commit_id == head_sha 

811 

812 

813def _fields(body: str) -> dict[str, str]: 

814 return {match.group("key").lower(): match.group("value") 

815 for match in _FIELD_RE.finditer(body)} 

816 

817 

818def _is_review_verdict_body(body: str) -> bool: 

819 if not body or _is_ship_assessment(body) or _has_closure_marker(body): 

820 return False 

821 if JURY_VERDICT_MARKER in body: 

822 return False 

823 return REVIEW_VERDICT_MARKER in body 

824 

825 

826def _has_trusted_review_marker(items: list[dict[str, Any]]) -> bool: 

827 return any( 

828 _is_trusted_source(item, enforced=True) and REVIEW_VERDICT_MARKER in _body(item) 

829 for item in items 

830 ) 

831 

832 

833def _is_jury_verdict( 

834 item: dict[str, Any], 

835 *, 

836 head_sha: str | None = None, 

837 enforced: bool = True, 

838) -> bool: 

839 if not _is_trusted_source(item, enforced=enforced): 

840 return False 

841 body = _body(item) 

842 if not body or _is_ship_assessment(body) or _has_closure_marker(body): 

843 return False 

844 return JURY_VERDICT_MARKER in body and _matches_head(item, body, head_sha)