Coverage for src/ai_jury/findings.py: 100%
112 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
1"""Machine-readable finding schema and parser.
3Reviewer/chair output is human-readable markdown, which is hard to dedupe, score,
4gate in CI, or turn into inline comments. This module defines a structured
5``Finding`` schema and a tolerant parser that extracts findings from an agent's
6raw output (a fenced ``json`` code block).
7"""
8from __future__ import annotations
10import json
11import re
12from dataclasses import dataclass
14SEVERITIES: tuple[str, ...] = ("critical", "major", "minor", "nit", "info")
15CONFIDENCES: tuple[str, ...] = ("high", "medium", "low")
17# Verification verdict statuses (issue #3).
18VERDICT_STATUSES: tuple[str, ...] = ("verified", "unsupported", "needs_human_decision")
20# Lower number = more severe; useful for ranking/sorting.
21SEVERITY_ORDER: dict[str, int] = {sev: i for i, sev in enumerate(SEVERITIES)}
23# Legacy severity names mapped onto the canonical schema.
24_SEVERITY_ALIASES: dict[str, str] = {"blocker": "critical"}
26_DEFAULT_SEVERITY = "info"
27_DEFAULT_CONFIDENCE = "medium"
29# Matches a fenced ```json ... ``` block (case-insensitive on the language tag).
30_JSON_BLOCK_RE = re.compile(r"```[ \t]*json[ \t]*\r?\n(.*?)```", re.DOTALL | re.IGNORECASE)
33def _normalize_severity(value: object) -> str:
34 if isinstance(value, str):
35 v = value.strip().lower()
36 v = _SEVERITY_ALIASES.get(v, v)
37 if v in SEVERITY_ORDER:
38 return v
39 return _DEFAULT_SEVERITY
42def _normalize_confidence(value: object) -> str:
43 if isinstance(value, str):
44 v = value.strip().lower()
45 if v in CONFIDENCES:
46 return v
47 return _DEFAULT_CONFIDENCE
50@dataclass
51class Finding:
52 """A single structured review finding."""
54 severity: str
55 file: str
56 claim: str
57 line: int | None = None
58 evidence: str = ""
59 suggested_fix: str = ""
60 confidence: str = _DEFAULT_CONFIDENCE
61 reviewer: str = ""
63 def __post_init__(self) -> None:
64 self.severity = _normalize_severity(self.severity)
65 self.confidence = _normalize_confidence(self.confidence)
66 if self.line is not None and not isinstance(self.line, bool):
67 try:
68 self.line = int(self.line)
69 except (TypeError, ValueError):
70 self.line = None
71 else:
72 self.line = None
74 @classmethod
75 def from_obj(cls, obj: dict, reviewer: str) -> Finding:
76 """Build a Finding from a decoded JSON object, forcing ``reviewer``."""
77 return cls(
78 severity=str(obj.get("severity", _DEFAULT_SEVERITY)),
79 file=str(obj.get("file", "")),
80 claim=str(obj.get("claim", "")),
81 line=obj.get("line"),
82 evidence=str(obj.get("evidence", "")),
83 suggested_fix=str(obj.get("suggested_fix", "")),
84 confidence=str(obj.get("confidence", _DEFAULT_CONFIDENCE)),
85 reviewer=reviewer,
86 )
89def parse_findings(text: str, reviewer: str) -> tuple[list[Finding], list[str]]:
90 """Extract structured findings from an agent's raw output.
92 The agent is asked to emit a fenced ```json block holding a JSON array of
93 finding objects. We locate the *last* such block, decode it, and build
94 Finding objects (forcing ``reviewer`` to preserve identity).
96 Never raises. On a malformed/wrong-typed ``json`` block, returns
97 ``([], [warning])``. A legitimately missing block yields ``([], [])``.
98 """
99 if not text:
100 return [], []
102 blocks = _JSON_BLOCK_RE.findall(text)
103 if not blocks:
104 return [], []
106 raw = blocks[-1].strip()
107 try:
108 data = json.loads(raw)
109 except (ValueError, TypeError) as exc:
110 return [], [f"{reviewer}: malformed or missing structured findings ({exc})"]
112 if not isinstance(data, list):
113 return [], [
114 f"{reviewer}: malformed or missing structured findings "
115 f"(expected a JSON array, got {type(data).__name__})"
116 ]
118 findings: list[Finding] = []
119 warnings: list[str] = []
120 for i, obj in enumerate(data):
121 if not isinstance(obj, dict):
122 warnings.append(
123 f"{reviewer}: malformed or missing structured findings "
124 f"(item {i} is {type(obj).__name__}, expected object)"
125 )
126 continue
127 findings.append(Finding.from_obj(obj, reviewer))
128 return findings, warnings
131def _coerce_line(value: object) -> int | None:
132 if value is None or isinstance(value, bool):
133 return None
134 try:
135 return int(value)
136 except (TypeError, ValueError):
137 return None
140def _normalize_status(value: object) -> str:
141 if isinstance(value, str):
142 v = value.strip().lower().replace("-", "_").replace(" ", "_")
143 if v in VERDICT_STATUSES:
144 return v
145 return "needs_human_decision"
148@dataclass
149class Verdict:
150 """A verifier's judgement on a candidate finding."""
152 file: str | None = None
153 line: int | None = None
154 claim: str = ""
155 status: str = "needs_human_decision"
156 reasoning: str = ""
159def parse_verdicts(text: str, verifier: str = "") -> tuple[list[Verdict], list[str]]:
160 """Extract verification verdicts from a verifier's raw output.
162 The verifier is asked to emit a fenced ```json block holding a JSON array of
163 verdict objects. We locate the *last* such block and decode it. Never raises;
164 on malformed input returns ``([], [warning])``.
165 """
166 label = verifier or "verifier"
167 if not text:
168 return [], [f"{label}: no verdicts (empty output)"]
170 blocks = _JSON_BLOCK_RE.findall(text)
171 if not blocks:
172 return [], [f"{label}: no JSON verdicts block found"]
174 raw = blocks[-1].strip()
175 try:
176 data = json.loads(raw)
177 except (ValueError, TypeError) as exc:
178 return [], [f"{label}: malformed verdicts JSON ({exc})"]
180 if isinstance(data, dict):
181 data = data.get("verdicts", data.get("findings", []))
182 if not isinstance(data, list):
183 return [], [f"{label}: verdicts block is not a JSON array"]
185 verdicts: list[Verdict] = []
186 warnings: list[str] = []
187 for i, obj in enumerate(data):
188 if not isinstance(obj, dict):
189 warnings.append(
190 f"{label}: verdict item {i} is {type(obj).__name__}, expected object"
191 )
192 continue
193 verdicts.append(
194 Verdict(
195 file=(obj.get("file") or None),
196 line=_coerce_line(obj.get("line")),
197 claim=str(obj.get("claim", "")).strip(),
198 status=_normalize_status(obj.get("status")),
199 reasoning=str(obj.get("reasoning", "")).strip(),
200 )
201 )
202 return verdicts, warnings