Coverage for src/ai_jury/findings.py: 100%

112 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-05 20:29 +0000

1"""Machine-readable finding schema and parser. 

2 

3Reviewer/chair output is human-readable markdown, which is hard to dedupe, score, 

4gate in CI, or turn into inline comments. This module defines a structured 

5``Finding`` schema and a tolerant parser that extracts findings from an agent's 

6raw output (a fenced ``json`` code block). 

7""" 

8from __future__ import annotations 

9 

10import json 

11import re 

12from dataclasses import dataclass 

13 

14SEVERITIES: tuple[str, ...] = ("critical", "major", "minor", "nit", "info") 

15CONFIDENCES: tuple[str, ...] = ("high", "medium", "low") 

16 

17# Verification verdict statuses (issue #3). 

18VERDICT_STATUSES: tuple[str, ...] = ("verified", "unsupported", "needs_human_decision") 

19 

20# Lower number = more severe; useful for ranking/sorting. 

21SEVERITY_ORDER: dict[str, int] = {sev: i for i, sev in enumerate(SEVERITIES)} 

22 

23# Legacy severity names mapped onto the canonical schema. 

24_SEVERITY_ALIASES: dict[str, str] = {"blocker": "critical"} 

25 

26_DEFAULT_SEVERITY = "info" 

27_DEFAULT_CONFIDENCE = "medium" 

28 

29# Matches a fenced ```json ... ``` block (case-insensitive on the language tag). 

30_JSON_BLOCK_RE = re.compile(r"```[ \t]*json[ \t]*\r?\n(.*?)```", re.DOTALL | re.IGNORECASE) 

31 

32 

33def _normalize_severity(value: object) -> str: 

34 if isinstance(value, str): 

35 v = value.strip().lower() 

36 v = _SEVERITY_ALIASES.get(v, v) 

37 if v in SEVERITY_ORDER: 

38 return v 

39 return _DEFAULT_SEVERITY 

40 

41 

42def _normalize_confidence(value: object) -> str: 

43 if isinstance(value, str): 

44 v = value.strip().lower() 

45 if v in CONFIDENCES: 

46 return v 

47 return _DEFAULT_CONFIDENCE 

48 

49 

50@dataclass 

51class Finding: 

52 """A single structured review finding.""" 

53 

54 severity: str 

55 file: str 

56 claim: str 

57 line: int | None = None 

58 evidence: str = "" 

59 suggested_fix: str = "" 

60 confidence: str = _DEFAULT_CONFIDENCE 

61 reviewer: str = "" 

62 

63 def __post_init__(self) -> None: 

64 self.severity = _normalize_severity(self.severity) 

65 self.confidence = _normalize_confidence(self.confidence) 

66 if self.line is not None and not isinstance(self.line, bool): 

67 try: 

68 self.line = int(self.line) 

69 except (TypeError, ValueError): 

70 self.line = None 

71 else: 

72 self.line = None 

73 

74 @classmethod 

75 def from_obj(cls, obj: dict, reviewer: str) -> Finding: 

76 """Build a Finding from a decoded JSON object, forcing ``reviewer``.""" 

77 return cls( 

78 severity=str(obj.get("severity", _DEFAULT_SEVERITY)), 

79 file=str(obj.get("file", "")), 

80 claim=str(obj.get("claim", "")), 

81 line=obj.get("line"), 

82 evidence=str(obj.get("evidence", "")), 

83 suggested_fix=str(obj.get("suggested_fix", "")), 

84 confidence=str(obj.get("confidence", _DEFAULT_CONFIDENCE)), 

85 reviewer=reviewer, 

86 ) 

87 

88 

89def parse_findings(text: str, reviewer: str) -> tuple[list[Finding], list[str]]: 

90 """Extract structured findings from an agent's raw output. 

91 

92 The agent is asked to emit a fenced ```json block holding a JSON array of 

93 finding objects. We locate the *last* such block, decode it, and build 

94 Finding objects (forcing ``reviewer`` to preserve identity). 

95 

96 Never raises. On a malformed/wrong-typed ``json`` block, returns 

97 ``([], [warning])``. A legitimately missing block yields ``([], [])``. 

98 """ 

99 if not text: 

100 return [], [] 

101 

102 blocks = _JSON_BLOCK_RE.findall(text) 

103 if not blocks: 

104 return [], [] 

105 

106 raw = blocks[-1].strip() 

107 try: 

108 data = json.loads(raw) 

109 except (ValueError, TypeError) as exc: 

110 return [], [f"{reviewer}: malformed or missing structured findings ({exc})"] 

111 

112 if not isinstance(data, list): 

113 return [], [ 

114 f"{reviewer}: malformed or missing structured findings " 

115 f"(expected a JSON array, got {type(data).__name__})" 

116 ] 

117 

118 findings: list[Finding] = [] 

119 warnings: list[str] = [] 

120 for i, obj in enumerate(data): 

121 if not isinstance(obj, dict): 

122 warnings.append( 

123 f"{reviewer}: malformed or missing structured findings " 

124 f"(item {i} is {type(obj).__name__}, expected object)" 

125 ) 

126 continue 

127 findings.append(Finding.from_obj(obj, reviewer)) 

128 return findings, warnings 

129 

130 

131def _coerce_line(value: object) -> int | None: 

132 if value is None or isinstance(value, bool): 

133 return None 

134 try: 

135 return int(value) 

136 except (TypeError, ValueError): 

137 return None 

138 

139 

140def _normalize_status(value: object) -> str: 

141 if isinstance(value, str): 

142 v = value.strip().lower().replace("-", "_").replace(" ", "_") 

143 if v in VERDICT_STATUSES: 

144 return v 

145 return "needs_human_decision" 

146 

147 

148@dataclass 

149class Verdict: 

150 """A verifier's judgement on a candidate finding.""" 

151 

152 file: str | None = None 

153 line: int | None = None 

154 claim: str = "" 

155 status: str = "needs_human_decision" 

156 reasoning: str = "" 

157 

158 

159def parse_verdicts(text: str, verifier: str = "") -> tuple[list[Verdict], list[str]]: 

160 """Extract verification verdicts from a verifier's raw output. 

161 

162 The verifier is asked to emit a fenced ```json block holding a JSON array of 

163 verdict objects. We locate the *last* such block and decode it. Never raises; 

164 on malformed input returns ``([], [warning])``. 

165 """ 

166 label = verifier or "verifier" 

167 if not text: 

168 return [], [f"{label}: no verdicts (empty output)"] 

169 

170 blocks = _JSON_BLOCK_RE.findall(text) 

171 if not blocks: 

172 return [], [f"{label}: no JSON verdicts block found"] 

173 

174 raw = blocks[-1].strip() 

175 try: 

176 data = json.loads(raw) 

177 except (ValueError, TypeError) as exc: 

178 return [], [f"{label}: malformed verdicts JSON ({exc})"] 

179 

180 if isinstance(data, dict): 

181 data = data.get("verdicts", data.get("findings", [])) 

182 if not isinstance(data, list): 

183 return [], [f"{label}: verdicts block is not a JSON array"] 

184 

185 verdicts: list[Verdict] = [] 

186 warnings: list[str] = [] 

187 for i, obj in enumerate(data): 

188 if not isinstance(obj, dict): 

189 warnings.append( 

190 f"{label}: verdict item {i} is {type(obj).__name__}, expected object" 

191 ) 

192 continue 

193 verdicts.append( 

194 Verdict( 

195 file=(obj.get("file") or None), 

196 line=_coerce_line(obj.get("line")), 

197 claim=str(obj.get("claim", "")).strip(), 

198 status=_normalize_status(obj.get("status")), 

199 reasoning=str(obj.get("reasoning", "")).strip(), 

200 ) 

201 ) 

202 return verdicts, warnings