Coverage for src/keel/evidence.py: 100%
324 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Deterministic pre-merge evidence verification.
3The ship adapter is agentic, but the artifacts it must leave behind are not:
4reviewer verdict comments/reviews, the optional jury verdict, and the stable
5closure comment marker. This module keeps the check pure so CI can enforce it
6without trusting prose in an agent prompt.
7"""
9from __future__ import annotations
11import hashlib
12import re
13from collections.abc import Sequence
14from dataclasses import dataclass
15from typing import Any
17from . import agents, closure
19SCHEMA_VERSION = "keel.evidence.v1"
20AGENT_LABEL_PREFIX = "agent:"
21REVIEW_VERDICT_MARKER = "keel.review-verdict.v1"
22JURY_VERDICT_MARKER = "keel.jury-verdict.v1"
23SHIP_ASSESSMENT_HEADING = "### \U0001f6a2 keel ship"
24DEFAULT_WAIVER_LABEL = "keel:evidence-waived"
25TRUSTED_AUTHOR_ASSOCIATIONS = frozenset({"OWNER", "MEMBER", "COLLABORATOR"})
26TRUSTED_SHIP_ASSESSMENT_BOTS = frozenset({"github-actions", "github-actions[bot]"})
28_FIELD_RE = re.compile(r"^\s*(?P<key>reviewer|head|vendor|model)\s*:\s*(?P<value>\S+)\s*$",
29 re.IGNORECASE | re.MULTILINE)
30_SHIP_BRANCH_RE = re.compile(r"^(feature|fix|chore|docs|test)/issue-\d+(?:-|$)")
33@dataclass(frozen=True)
34class EvidenceItem:
35 id: str
36 kind: str
37 required: bool
38 description: str
40 def as_dict(self) -> dict[str, Any]:
41 return {
42 "id": self.id,
43 "kind": self.kind,
44 "required": self.required,
45 "description": self.description,
46 }
49def gate_active(labels: Sequence[str] | None, gate_label: str) -> bool:
50 """Return whether ``gate_label`` is present in ``labels`` (None/empty -> False).
52 An empty ``gate_label`` is never active, so a misconfigured (blank) label can
53 never silently match — the schema also forbids an empty ``evidence_gate_label``.
54 """
55 if not gate_label:
56 return False
57 return gate_label in set(labels or ())
60def gate_decision(
61 labels: Sequence[str] | None,
62 gate_label: str,
63 *,
64 waiver_label: str = DEFAULT_WAIVER_LABEL,
65 head_ref: str | None = None,
66 pr_comments: list[dict[str, Any]] | None = None,
67 pr_reviews: list[dict[str, Any]] | None = None,
68 ledger_records: Sequence[object] | None = None,
69) -> dict[str, Any]:
70 """Return the fail-closed evidence-gate arming decision.
72 Ship provenance arms the gate by default. The only disarm path is an explicit
73 waiver label applied by an operator; the legacy gate label remains an
74 additional arming signal for already-installed workflows.
75 """
76 label_set = set(labels or ())
77 if waiver_label and waiver_label in label_set:
78 return _gate_decision(False, "operator-waiver-label", waiver_label, waived=True)
79 if gate_active(labels, gate_label):
80 return _gate_decision(True, "gate-label", gate_label)
81 if head_ref and _SHIP_BRANCH_RE.search(head_ref):
82 return _gate_decision(True, "ship-branch", head_ref)
83 if _has_trusted_ship_assessment(pr_comments or []):
84 return _gate_decision(True, "ship-assessment-comment", SHIP_ASSESSMENT_HEADING)
85 if _has_trusted_review_marker([*(pr_comments or []), *(pr_reviews or [])]):
86 return _gate_decision(True, "review-verdict-marker", REVIEW_VERDICT_MARKER)
87 if ledger_records:
88 return _gate_decision(True, "ship-run-ledger", "ship_run")
89 return _gate_decision(False, "no-ship-provenance", None)
92def _gate_decision(
93 enforced: bool,
94 reason: str,
95 source: str | None,
96 *,
97 waived: bool = False,
98) -> dict[str, Any]:
99 return {
100 "schema_version": SCHEMA_VERSION,
101 "enforced": enforced,
102 "waived": waived,
103 "reason": reason,
104 "source": source,
105 }
108def _has_trusted_ship_assessment(items: list[dict[str, Any]]) -> bool:
109 return any(
110 _is_ship_assessment_source(item) and _is_ship_assessment(_body(item))
111 for item in items
112 )
115def _is_ship_assessment_source(item: dict[str, Any]) -> bool:
116 if _is_trusted_source(item, enforced=True):
117 return True
118 user = item.get("user") if isinstance(item.get("user"), dict) else {}
119 login = user.get("login") if isinstance(user.get("login"), str) else None
120 return bool(login and login.lower() in TRUSTED_SHIP_ASSESSMENT_BOTS)
123def contract_as_dict(
124 review_contract: dict[str, Any],
125 *,
126 dry_run: bool = False,
127 enforced: bool = True,
128 deferrals: tuple[str, ...] = (),
129) -> dict[str, Any]:
130 """Return the required evidence set derived from review/jury flags."""
131 return {
132 "schema_version": SCHEMA_VERSION,
133 "enforced": enforced,
134 "source": "review_merge_contract + closure_comment",
135 "dry_run_disables_gating": True,
136 "fail_closed": True,
137 "require_distinct_vendors": _require_distinct_vendors(review_contract),
138 "accepted_sources": {
139 "closure": (
140 "trusted issue/PR comments carrying keel.closure-comment.v1"
141 ),
142 "review": (
143 "trusted PR review/comment carrying keel.review-verdict.v1 and current head"
144 ),
145 "jury": "trusted PR comment carrying keel.jury-verdict.v1 and current head",
146 },
147 "not_accepted": [
148 "pull_request_body",
149 "chat_summary",
150 "untrusted_public_comment",
151 "keel_ship_assessment_comment",
152 ],
153 "deferrals": list(deferrals),
154 "required": [
155 item.as_dict()
156 for item in required_items(review_contract, dry_run=False, enforced=enforced)
157 ],
158 "active_required": [
159 item.as_dict()
160 for item in required_items(review_contract, dry_run=dry_run, enforced=enforced)
161 ],
162 }
165def required_items(
166 review_contract: dict[str, Any],
167 *,
168 dry_run: bool = False,
169 enforced: bool = True,
170) -> tuple[EvidenceItem, ...]:
171 """Return the tier/flag-derived evidence requirements."""
172 if dry_run or not enforced:
173 return ()
174 reviewers = review_contract.get("reviewers")
175 reviewer_count = reviewers.get("count") if isinstance(reviewers, dict) else 0
176 reviewer_count = reviewer_count if isinstance(reviewer_count, int) and reviewer_count > 0 else 0
177 jury = review_contract.get("jury")
178 jury_required = (
179 isinstance(jury, dict)
180 and bool(jury.get("enabled"))
181 and jury.get("mode") == "gating"
182 )
183 items: list[EvidenceItem] = [
184 EvidenceItem(
185 "closure-comment-pr",
186 "closure",
187 True,
188 "PR conversation comment with keel.closure-comment.v1 marker",
189 ),
190 EvidenceItem(
191 "closure-comment-issue",
192 "closure",
193 True,
194 "Linked issue comment with keel.closure-comment.v1 marker",
195 ),
196 ]
197 for index in range(1, reviewer_count + 1):
198 items.append(EvidenceItem(
199 f"review-verdict-{index}",
200 "review",
201 True,
202 "Distinct posted s7 reviewer verdict for the current PR",
203 ))
204 if jury_required:
205 items.append(EvidenceItem(
206 "jury-verdict",
207 "jury",
208 True,
209 "Posted gating jury verdict comment for the current PR",
210 ))
211 return tuple(items)
214def verify(
215 review_contract: dict[str, Any],
216 *,
217 pr_comments: list[dict[str, Any]] | None = None,
218 issue_comments: list[dict[str, Any]] | None = None,
219 pr_reviews: list[dict[str, Any]] | None = None,
220 pr_body: str | None = None,
221 pr_labels: Sequence[str] | None = None,
222 head_sha: str | None = None,
223 ledger_record: dict[str, Any] | None = None,
224 dry_run: bool = False,
225 enforced: bool = True,
226 deferrals: tuple[str, ...] = (),
227) -> dict[str, Any]:
228 """Verify required evidence artifacts and return a deterministic report.
230 When ``ledger_record`` is the ship_run record for this PR, a closure comment
231 only counts when its content matches the canonical render of that record
232 (closure-comment fidelity). Without a record the marker-only behavior holds.
234 When the gate is active, ``pr_labels`` are additionally checked for the
235 mandatory ``agent:<vendor>`` attribution label (and cross-checked against the
236 ledger implementer vendor when a record is present); see
237 :func:`attribution_check`.
238 """
239 del pr_body # Explicitly not accepted as evidence.
240 items = required_items(review_contract, dry_run=dry_run, enforced=enforced)
241 deferred = set(deferrals)
242 counts = _evidence_counts(
243 pr_comments=pr_comments or [],
244 issue_comments=issue_comments or [],
245 pr_reviews=pr_reviews or [],
246 head_sha=head_sha,
247 enforced=enforced,
248 ledger_record=ledger_record,
249 )
250 findings = _run_context_findings(
251 pr_comments=pr_comments or [],
252 issue_comments=issue_comments or [],
253 enforced=enforced,
254 ledger_record=ledger_record,
255 )
256 mismatch = _closure_mismatch_scopes(
257 pr_comments=pr_comments or [],
258 issue_comments=issue_comments or [],
259 enforced=enforced,
260 ledger_record=ledger_record,
261 )
262 results = []
263 for item in items:
264 present = _is_present(item, counts)
265 is_deferred = item.id in deferred or item.kind in deferred or "all" in deferred
266 ok = present or is_deferred
267 results.append({
268 "id": item.id,
269 "kind": item.kind,
270 "required": item.required,
271 "present": present,
272 "deferred": is_deferred,
273 "ok": ok,
274 "reason": None if ok else _result_reason(item, mismatch),
275 })
276 missing = [result["id"] for result in results if not result["ok"]]
277 distinct = _distinct_vendor_finding(
278 review_contract,
279 items=items,
280 deferred=deferred,
281 pr_comments=pr_comments or [],
282 pr_reviews=pr_reviews or [],
283 head_sha=head_sha,
284 enforced=enforced,
285 )
286 if distinct is not None:
287 findings = [*findings, distinct]
288 attribution = _attribution_finding(
289 pr_labels=pr_labels,
290 enforced=enforced and not dry_run,
291 ledger_record=ledger_record,
292 )
293 if attribution is not None:
294 findings = [*findings, attribution]
295 blocking_findings = [finding for finding in findings if finding["severity"] == "major"]
296 return {
297 "schema_version": SCHEMA_VERSION,
298 "status": "pass" if not missing and not blocking_findings else "fail",
299 "dry_run": dry_run,
300 "enforced": enforced,
301 "required_count": len(items),
302 "missing": missing,
303 "results": results,
304 "counts": counts,
305 "findings": findings,
306 }
309def _require_distinct_vendors(review_contract: dict[str, Any]) -> bool:
310 reviewers = review_contract.get("reviewers")
311 return bool(reviewers.get("require_distinct_vendors")) if isinstance(reviewers, dict) else False
314def _distinct_vendor_finding(
315 review_contract: dict[str, Any],
316 *,
317 items: tuple[EvidenceItem, ...],
318 deferred: set[str],
319 pr_comments: list[dict[str, Any]],
320 pr_reviews: list[dict[str, Any]],
321 head_sha: str | None,
322 enforced: bool,
323) -> dict[str, Any] | None:
324 """Return a blocking finding when the optional vendor-distinctness check fails.
326 Off by default: ``None`` unless ``reviewers.require_distinct_vendors`` is set
327 on the contract. Skipped when review evidence is deferred so the knob never
328 overrides an explicit deferral.
329 """
330 if not _require_distinct_vendors(review_contract):
331 return None
332 if "review" in deferred or "all" in deferred:
333 return None
334 required = sum(1 for item in items if item.kind == "review" and item.id not in deferred)
335 if required <= 0:
336 return None
337 provenance = _review_vendor_provenance(
338 [*pr_comments, *pr_reviews],
339 head_sha=head_sha,
340 enforced=enforced,
341 )
342 result = distinct_vendor_check(list(provenance.values()), required_count=required)
343 if result["ok"]:
344 return None
345 return {
346 "id": "review-vendor-distinctness",
347 "severity": "major",
348 "kind": "review",
349 "message": f"require_distinct_vendors: {result['reason']}.",
350 }
353_CLOSURE_MISMATCH_REASON = (
354 "closure comment does not match the ship_run ledger record"
355)
358def _result_reason(item: EvidenceItem, mismatch: set[str]) -> str:
359 if item.id == "closure-comment-pr" and "pr" in mismatch:
360 return _CLOSURE_MISMATCH_REASON
361 if item.id == "closure-comment-issue" and "issue" in mismatch:
362 return _CLOSURE_MISMATCH_REASON
363 return f"missing required evidence: {item.id}"
366def _closure_mismatch_scopes(
367 *,
368 pr_comments: list[dict[str, Any]],
369 issue_comments: list[dict[str, Any]],
370 enforced: bool,
371 ledger_record: dict[str, Any] | None,
372) -> set[str]:
373 """Return scopes ({"pr"}/{"issue"}) where a marker closure mismatched the ledger.
375 A scope is reported only when a trusted marker-bearing closure exists but none
376 of them match the record — so a stale comment alongside a correct re-post does
377 not produce a misleading mismatch reason.
378 """
379 if ledger_record is None:
380 return set()
381 scopes: set[str] = set()
382 for scope, comments in (("pr", pr_comments), ("issue", issue_comments)):
383 markered = [
384 comment for comment in comments
385 if _is_trusted_source(comment, enforced=enforced)
386 and _has_closure_marker(_body(comment))
387 ]
388 if markered and not any(
389 closure_body_matches_record(_body(comment), ledger_record)
390 for comment in markered
391 ):
392 scopes.add(scope)
393 return scopes
396def _evidence_counts(
397 *,
398 pr_comments: list[dict[str, Any]],
399 issue_comments: list[dict[str, Any]],
400 pr_reviews: list[dict[str, Any]],
401 head_sha: str | None = None,
402 enforced: bool = True,
403 ledger_record: dict[str, Any] | None = None,
404) -> dict[str, int]:
405 review_keys = _review_evidence_keys(
406 [*pr_comments, *pr_reviews],
407 head_sha=head_sha,
408 enforced=enforced,
409 )
410 return {
411 "closure_pr": sum(
412 _is_closure_comment(comment, enforced=enforced, record=ledger_record)
413 for comment in pr_comments
414 ),
415 "closure_issue": sum(
416 _is_closure_comment(comment, enforced=enforced, record=ledger_record)
417 for comment in issue_comments
418 ),
419 "review_verdict": len(review_keys),
420 "jury_verdict": sum(_is_jury_verdict(comment, head_sha=head_sha, enforced=enforced)
421 for comment in pr_comments),
422 }
425def _is_present(item: EvidenceItem, counts: dict[str, int]) -> bool:
426 if item.id == "closure-comment-pr":
427 return counts["closure_pr"] >= 1
428 if item.id == "closure-comment-issue":
429 return counts["closure_issue"] >= 1
430 if item.kind == "review":
431 index = int(item.id.rsplit("-", 1)[1])
432 return counts["review_verdict"] >= index
433 if item.id == "jury-verdict":
434 return counts["jury_verdict"] >= 1
435 return False
438def _body(item: dict[str, Any]) -> str:
439 body = item.get("body")
440 return body if isinstance(body, str) else ""
443def _has_closure_marker(body: str) -> bool:
444 return closure.COMMENT_MARKER in body
447def _normalize_closure_body(body: str) -> str:
448 """Normalize a closure body for content comparison.
450 Robust to harmless formatting drift but sensitive to real content changes:
451 trailing whitespace is stripped per line, runs of blank lines collapse to a
452 single blank line, and leading/trailing blank lines are dropped.
453 """
454 lines = [line.rstrip() for line in body.splitlines()]
455 normalized: list[str] = []
456 for line in lines:
457 if not line and (not normalized or not normalized[-1]):
458 continue
459 normalized.append(line)
460 while normalized and not normalized[-1]:
461 normalized.pop()
462 return "\n".join(normalized)
465def closure_body_matches_record(body: str, record: dict[str, Any]) -> bool:
466 """Return whether ``body`` matches the canonical render of ``record``."""
467 expected = closure.render_closure_comment(record)
468 return _normalize_closure_body(body) == _normalize_closure_body(expected)
471def _is_closure_comment(
472 item: dict[str, Any],
473 *,
474 enforced: bool = True,
475 record: dict[str, Any] | None = None,
476) -> bool:
477 if not _is_trusted_source(item, enforced=enforced):
478 return False
479 if not _has_closure_marker(_body(item)):
480 return False
481 if record is None:
482 return True
483 return closure_body_matches_record(_body(item), record)
486def _run_context_findings(
487 *,
488 pr_comments: list[dict[str, Any]],
489 issue_comments: list[dict[str, Any]],
490 enforced: bool,
491 ledger_record: dict[str, Any] | None = None,
492) -> list[dict[str, Any]]:
493 comments = [*pr_comments, *issue_comments]
494 findings: list[dict[str, Any]] = []
495 for item in comments:
496 if not _is_closure_comment(item, enforced=enforced, record=ledger_record):
497 continue
498 body = _body(item)
499 if _has_empty_run_context(body):
500 findings.append({
501 "id": "run-context-empty",
502 "severity": "major" if enforced else "minor",
503 "kind": "closure",
504 "message": "Closure comment Run context is fully degraded.",
505 })
506 return findings
509def _has_empty_run_context(body: str) -> bool:
510 if "### Run context" not in body:
511 return False
512 fields = _run_context_fields(body)
513 return fields == {
514 "host agent": "unknown",
515 "transport": "unknown",
516 "profile": "unknown",
517 "jury": "off",
518 "consent": "unknown (scopes: none)",
519 }
522def _run_context_fields(body: str) -> dict[str, str]:
523 fields: dict[str, str] = {}
524 in_block = False
525 for line in body.splitlines():
526 if line.strip() == "### Run context":
527 in_block = True
528 continue
529 if in_block and line.startswith("### "):
530 break
531 if not in_block:
532 continue
533 match = re.match(r"^-\s+\*\*(?P<key>[^*]+):\*\*\s+(?P<value>.+?)\s*$", line)
534 if match:
535 fields[match.group("key").strip().lower()] = match.group("value").strip().lower()
536 return fields
539def _is_trusted_source(item: dict[str, Any], *, enforced: bool = True) -> bool:
540 """Return whether GitHub marks this evidence source as trusted.
542 Live GitHub comment/review payloads include ``author_association``. Enforced
543 evidence fails closed when that field is absent because offline fixtures are
544 agent-writable and must not manufacture trust. Untrusted explicit
545 associations fail closed even if the author type is ``Bot``.
546 """
547 association = item.get("author_association")
548 if association is None:
549 return not enforced
550 if isinstance(association, str) and association.upper() in TRUSTED_AUTHOR_ASSOCIATIONS:
551 return True
552 return False
555def _is_ship_assessment(body: str) -> bool:
556 return SHIP_ASSESSMENT_HEADING in body or "keel ship \u2014" in body
559def count_review_verdicts(
560 pr_comments: list[dict[str, Any]] | None = None,
561 pr_reviews: list[dict[str, Any]] | None = None,
562 *,
563 head_sha: str | None = None,
564 enforced: bool = True,
565) -> int:
566 """Count distinct trusted review-verdict reviewers for a PR.
568 This is the same evidence-side counting the verify report uses for the
569 ``review`` items: it collapses idempotent re-posts by the same reviewer to
570 one verdict and only counts trusted, head-bound verdicts. Reused by capture
571 reconcile to cross-check the ledger's recorded reviewer count.
572 """
573 keys = _review_evidence_keys(
574 [*(pr_comments or []), *(pr_reviews or [])],
575 head_sha=head_sha,
576 enforced=enforced,
577 )
578 return len(keys)
581def _review_evidence_keys(
582 items: list[dict[str, Any]],
583 *,
584 head_sha: str | None = None,
585 enforced: bool = True,
586) -> set[str]:
587 keys: set[str] = set()
588 for item in items:
589 if not _is_trusted_source(item, enforced=enforced):
590 continue
591 body = _body(item)
592 if not _is_review_verdict_body(body):
593 continue
594 if not _matches_head(item, body, head_sha):
595 continue
596 keys.add(_reviewer_key(item, body))
597 return keys
600def _review_vendor_provenance(
601 items: list[dict[str, Any]],
602 *,
603 head_sha: str | None = None,
604 enforced: bool = True,
605) -> dict[str, str | None]:
606 """Map each accepted review-verdict reviewer-key to its declared vendor.
608 The value is the lower-cased ``vendor:`` provenance for that verdict, or
609 ``None`` when the verdict carries no vendor field. Keys mirror
610 :func:`_review_evidence_keys`, so duplicate reviewer-keys collapse to one
611 entry (idempotent re-posts do not inflate the vendor set).
612 """
613 provenance: dict[str, str | None] = {}
614 for item in items:
615 if not _is_trusted_source(item, enforced=enforced):
616 continue
617 body = _body(item)
618 if not _is_review_verdict_body(body):
619 continue
620 if not _matches_head(item, body, head_sha):
621 continue
622 key = _reviewer_key(item, body)
623 if key in provenance:
624 continue
625 vendor = _fields(body).get("vendor")
626 provenance[key] = vendor.lower() if vendor else None
627 return provenance
630def distinct_vendor_check(
631 vendors: Sequence[str | None],
632 *,
633 required_count: int,
634) -> dict[str, Any]:
635 """Pure vendor-distinctness check over review-verdict provenance.
637 ``vendors`` is one entry per accepted review verdict: the declared vendor, or
638 ``None`` when the verdict carries no vendor provenance. The check passes only
639 when at least ``required_count`` verdicts each declare a vendor and those
640 vendors are all distinct. It fails when a required verdict is missing vendor
641 provenance, or when two required verdicts share a vendor.
643 Returns ``{ok, reason, duplicated, missing_provenance}``. No I/O — fully
644 unit-testable. A non-positive ``required_count`` always passes (nothing to
645 require).
646 """
647 if required_count <= 0:
648 return {"ok": True, "reason": None, "duplicated": [], "missing_provenance": 0}
649 present = [vendor for vendor in vendors if vendor]
650 missing = len(vendors) - len(present)
651 seen: set[str] = set()
652 duplicated: list[str] = []
653 for vendor in present:
654 if vendor in seen and vendor not in duplicated:
655 duplicated.append(vendor)
656 seen.add(vendor)
657 if len(present) < required_count:
658 return {
659 "ok": False,
660 "reason": "missing vendor provenance on required review verdict(s)",
661 "duplicated": duplicated,
662 "missing_provenance": missing,
663 }
664 if duplicated:
665 return {
666 "ok": False,
667 "reason": f"review verdicts share a vendor: {', '.join(sorted(duplicated))}",
668 "duplicated": sorted(duplicated),
669 "missing_provenance": missing,
670 }
671 return {"ok": True, "reason": None, "duplicated": [], "missing_provenance": missing}
674def agent_label_vendors(labels: Sequence[str] | None) -> list[str]:
675 """Return the lower-cased vendor slugs from every ``agent:<vendor>`` label.
677 A blank vendor (a bare ``agent:`` label) is ignored. Order is preserved and
678 duplicates are kept so callers can reason about the raw label set; this is a
679 pure helper with no I/O.
680 """
681 vendors: list[str] = []
682 for label in labels or ():
683 if not isinstance(label, str) or not label.startswith(AGENT_LABEL_PREFIX):
684 continue
685 vendor = label[len(AGENT_LABEL_PREFIX):].strip().lower()
686 if vendor:
687 vendors.append(vendor)
688 return vendors
691def ledger_implementer_vendor(ledger_record: dict[str, Any] | None) -> str | None:
692 """Return the implementer's vendor slug from a ship_run ``ledger_record``.
694 The ledger stores the effective implementer as a codename or ``vendor:model``
695 string under ``actors.implementer``; the vendor is the part before the first
696 ``:``. Returns ``None`` when no record, no implementer, or a blank implementer
697 is recorded so the cross-check can degrade to presence-only. Pure — no I/O.
698 """
699 if not isinstance(ledger_record, dict):
700 return None
701 actors = ledger_record.get("actors")
702 implementer = actors.get("implementer") if isinstance(actors, dict) else None
703 if not isinstance(implementer, str) or not implementer.strip():
704 return None
705 vendor, _ = agents.split_delegate(implementer.strip())
706 vendor = vendor.strip().lower()
707 return vendor or None
710def attribution_check(
711 labels: Sequence[str] | None,
712 *,
713 implementer_vendor: str | None = None,
714) -> dict[str, Any]:
715 """Pure attribution-label check over a PR's labels and the ledger implementer.
717 Two layers, both fail-closed only on a real contradiction:
719 * **Presence** — at least one non-blank ``agent:<vendor>`` label must exist.
720 Missing one is a ``missing-label`` finding.
721 * **Cross-check** — when ``implementer_vendor`` is known (a ship_run record
722 recorded an implementer), one of the PR's ``agent:*`` vendors must match it.
723 A mismatch is a ``vendor-mismatch`` finding. When ``implementer_vendor`` is
724 ``None`` (no record / no implementer) only the presence layer runs, so PRs
725 that predate attribution recording are not broken.
727 Returns ``{ok, reason, label_vendors, implementer_vendor}``. No I/O.
728 """
729 label_vendors = agent_label_vendors(labels)
730 implementer = implementer_vendor.strip().lower() if implementer_vendor else None
731 if not label_vendors:
732 return {
733 "ok": False,
734 "reason": "missing-label",
735 "label_vendors": label_vendors,
736 "implementer_vendor": implementer,
737 }
738 if implementer is not None and implementer not in label_vendors:
739 return {
740 "ok": False,
741 "reason": "vendor-mismatch",
742 "label_vendors": label_vendors,
743 "implementer_vendor": implementer,
744 }
745 return {
746 "ok": True,
747 "reason": None,
748 "label_vendors": label_vendors,
749 "implementer_vendor": implementer,
750 }
753def _attribution_finding(
754 *,
755 pr_labels: Sequence[str] | None,
756 enforced: bool,
757 ledger_record: dict[str, Any] | None,
758) -> dict[str, Any] | None:
759 """Return a blocking attribution finding when the gate is active, else ``None``.
761 Only runs when the evidence gate is active (``enforced``) *and* PR labels were
762 actually fetched (``pr_labels is not None``): the presence check is cheap and
763 default-on, while the vendor cross-check engages only when the ledger recorded
764 an implementer vendor. Degrades gracefully — labels not available skips the
765 check entirely, no record means presence-only, and a gate-inactive run skips
766 the check (back-compat with callers that never pass labels).
767 """
768 if not enforced or pr_labels is None:
769 return None
770 implementer_vendor = ledger_implementer_vendor(ledger_record)
771 result = attribution_check(pr_labels, implementer_vendor=implementer_vendor)
772 if result["ok"]:
773 return None
774 if result["reason"] == "missing-label":
775 message = "PR is missing a mandatory agent:<vendor> attribution label."
776 else:
777 message = (
778 "PR agent:<vendor> attribution "
779 f"({', '.join(result['label_vendors'])}) does not match the ship_run "
780 f"ledger implementer vendor ({result['implementer_vendor']})."
781 )
782 return {
783 "id": "attribution-label",
784 "severity": "major",
785 "kind": "attribution",
786 "message": message,
787 }
790def _reviewer_key(item: dict[str, Any], body: str) -> str:
791 fields = _fields(body)
792 reviewer = fields.get("reviewer")
793 if reviewer:
794 return f"reviewer:{reviewer.lower()}"
795 user = item.get("user")
796 if isinstance(user, dict) and isinstance(user.get("login"), str) and user["login"]:
797 return f"user:{user['login'].lower()}"
798 digest = hashlib.sha256(body.encode("utf-8")).hexdigest()
799 return f"body:{digest}"
802def _matches_head(item: dict[str, Any], body: str, head_sha: str | None) -> bool:
803 if not head_sha:
804 return True
805 fields = _fields(body)
806 recorded = fields.get("head")
807 if recorded:
808 return recorded == head_sha
809 commit_id = item.get("commit_id")
810 return isinstance(commit_id, str) and commit_id == head_sha
813def _fields(body: str) -> dict[str, str]:
814 return {match.group("key").lower(): match.group("value")
815 for match in _FIELD_RE.finditer(body)}
818def _is_review_verdict_body(body: str) -> bool:
819 if not body or _is_ship_assessment(body) or _has_closure_marker(body):
820 return False
821 if JURY_VERDICT_MARKER in body:
822 return False
823 return REVIEW_VERDICT_MARKER in body
826def _has_trusted_review_marker(items: list[dict[str, Any]]) -> bool:
827 return any(
828 _is_trusted_source(item, enforced=True) and REVIEW_VERDICT_MARKER in _body(item)
829 for item in items
830 )
833def _is_jury_verdict(
834 item: dict[str, Any],
835 *,
836 head_sha: str | None = None,
837 enforced: bool = True,
838) -> bool:
839 if not _is_trusted_source(item, enforced=enforced):
840 return False
841 body = _body(item)
842 if not body or _is_ship_assessment(body) or _has_closure_marker(body):
843 return False
844 return JURY_VERDICT_MARKER in body and _matches_head(item, body, head_sha)