Coverage for src/keel/captureverify.py: 100%

51 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Pure capture reconciliation: cross-check merged PRs against the ledger. 

2 

3``keel capture-verify`` historically trusted the agent to pass every merged PR 

4via ``--merged-pr`` and to self-report ``--capture-status applied`` with no 

5proof. This module hardens that accounting with three additive checks, all 

6pure data in / pure findings out (no network, subprocess, clock, or random): 

7 

81. **missing-marker** — every PR in the derived merged set must have a valid 

9 capture marker in the ledger. A merged PR with no marker is a finding, so a 

10 merged PR can no longer silently vanish from capture accounting by being 

11 omitted from the args. 

122. **applied-without-artifact** — an ``applied`` capture must carry a durable 

13 capture artifact reference (path/hash) in its ledger record. ``applied`` 

14 with no artifact is a finding. ``deferred``/``skipped`` need no artifact. 

153. **reviewer-count-mismatch** — the ledger record's ``actors.reviewers`` count 

16 for a PR is cross-checked against the evidence-side review-verdict count for 

17 that PR. Recording more reviewers than verdicts posted is a finding. 

18 

19The CLI does the I/O (transport query for merged PRs, marker/verdict fetch) and 

20feeds the results here. The base pass/fail semantics of ``verify_session`` are 

21preserved; these are strictly additional findings. 

22""" 

23 

24from __future__ import annotations 

25 

26from typing import Any 

27 

28from . import capture, ledger 

29 

30RECONCILE_SCHEMA_VERSION = "keel.capture-verify-reconcile.v1" 

31 

32FINDING_MISSING_MARKER = "missing-marker" 

33FINDING_APPLIED_WITHOUT_ARTIFACT = "applied-without-artifact" 

34FINDING_REVIEWER_COUNT_MISMATCH = "reviewer-count-mismatch" 

35 

36 

37def reconcile( 

38 records: list[dict[str, Any]], 

39 merged_prs: list[int] | tuple[int, ...], 

40 *, 

41 verdict_counts: dict[int, int] | None = None, 

42) -> dict[str, Any]: 

43 """Cross-check the derived merged-PR set against the ledger. 

44 

45 ``records`` is the run ledger. ``merged_prs`` is the authoritative merged-PR 

46 set (derived from the transport, not the agent's args). ``verdict_counts`` 

47 maps a PR number to the evidence-side review-verdict count; a PR omitted from 

48 the mapping skips the reviewer cross-check (the count is unknown offline, so 

49 it degrades to advisory rather than failing). 

50 

51 Returns a structured report: per-PR results plus a flat findings list and a 

52 summary. ``ok`` is true only when no findings were raised. 

53 """ 

54 counts = verdict_counts or {} 

55 results = [_reconcile_pr(records, pr, counts) for pr in merged_prs] 

56 findings = [finding for result in results for finding in result["findings"]] 

57 by_type = { 

58 FINDING_MISSING_MARKER: 0, 

59 FINDING_APPLIED_WITHOUT_ARTIFACT: 0, 

60 FINDING_REVIEWER_COUNT_MISMATCH: 0, 

61 } 

62 for finding in findings: 

63 by_type[finding["type"]] += 1 

64 return { 

65 "schema_version": RECONCILE_SCHEMA_VERSION, 

66 "ok": not findings, 

67 "merged_prs": list(merged_prs), 

68 "results": results, 

69 "findings": findings, 

70 "summary": { 

71 "checked": len(results), 

72 "findings": len(findings), 

73 **by_type, 

74 }, 

75 } 

76 

77 

78def _reconcile_pr( 

79 records: list[dict[str, Any]], 

80 pr_number: int, 

81 verdict_counts: dict[int, int], 

82) -> dict[str, Any]: 

83 verification = capture._verify_pr(records, pr_number) 

84 record = ledger.latest_ship_run_for_pr(records, pr_number) 

85 findings: list[dict[str, Any]] = [] 

86 

87 if not verification["ok"] and verification["status"] == "missing": 

88 findings.append( 

89 _finding( 

90 FINDING_MISSING_MARKER, 

91 pr_number, 

92 "merged PR has no capture marker in the ledger", 

93 ) 

94 ) 

95 

96 artifact = _capture_artifact(record) 

97 if verification.get("status") == "applied" and not artifact: 

98 findings.append( 

99 _finding( 

100 FINDING_APPLIED_WITHOUT_ARTIFACT, 

101 pr_number, 

102 "capture status is applied but no capture artifact was recorded", 

103 ) 

104 ) 

105 

106 recorded_reviewers = _recorded_reviewer_count(record) 

107 verdicts = verdict_counts.get(pr_number) 

108 if verdicts is not None and recorded_reviewers > verdicts: 

109 findings.append( 

110 _finding( 

111 FINDING_REVIEWER_COUNT_MISMATCH, 

112 pr_number, 

113 f"ledger records {recorded_reviewers} reviewer(s) but only " 

114 f"{verdicts} review verdict(s) were posted", 

115 recorded_reviewers=recorded_reviewers, 

116 posted_verdicts=verdicts, 

117 ) 

118 ) 

119 

120 return { 

121 "pr": pr_number, 

122 "ok": not findings, 

123 "marker_status": verification["status"], 

124 "marker": verification.get("marker"), 

125 "artifact": artifact, 

126 "recorded_reviewers": recorded_reviewers, 

127 "posted_verdicts": verdicts, 

128 "findings": findings, 

129 } 

130 

131 

132def _finding(finding_type: str, pr_number: int, reason: str, **extra: Any) -> dict[str, Any]: 

133 finding = {"type": finding_type, "pr": pr_number, "reason": reason} 

134 finding.update(extra) 

135 return finding 

136 

137 

138def _capture_artifact(record: dict[str, Any] | None) -> str | None: 

139 if not isinstance(record, dict): 

140 return None 

141 block = record.get("capture") 

142 if not isinstance(block, dict): 

143 return None 

144 artifact = block.get("artifact") 

145 return artifact if isinstance(artifact, str) and artifact.strip() else None 

146 

147 

148def _recorded_reviewer_count(record: dict[str, Any] | None) -> int: 

149 if not isinstance(record, dict): 

150 return 0 

151 actors = record.get("actors") 

152 if not isinstance(actors, dict): 

153 return 0 

154 reviewers = actors.get("reviewers") 

155 if not isinstance(reviewers, list): 

156 return 0 

157 return sum(1 for reviewer in reviewers if isinstance(reviewer, str) and reviewer.strip())