Coverage for src/ai_jury/classification.py: 100%

97 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-05 20:29 +0000

1"""Deterministic PR-level classification derived from structured findings. 

2 

3The jury report already lists individual findings and consensus groups, but 

4maintainers also want a compact, at-a-glance signal: how much review effort a PR 

5needs, how risky it is, whether it touches security-sensitive code, and whether 

6it warrants human attention. This module derives those four classifications as a 

7PURE, fully deterministic function of the structured findings, the consensus 

8groups, and (optionally) the unified diff. 

9 

10Nothing here calls an LLM or the network: identical inputs always produce 

11identical output, which is what makes the classification safe to snapshot-test 

12and to render in the deterministic mock report. 

13 

14Classifications 

15--------------- 

16``review_effort`` : int, 1-5 

17``risk_level`` : str, one of ``low`` / ``medium`` / ``high`` 

18``security_sensitive`` : bool 

19``needs_human_attention`` : bool 

20 

21See :func:`classify` for the exact, documented formulas. 

22""" 

23from __future__ import annotations 

24 

25import re 

26from typing import Any 

27 

28from .findings import SEVERITY_ORDER 

29 

30# Risk levels, ordered least to most severe. 

31RISK_LOW = "low" 

32RISK_MEDIUM = "medium" 

33RISK_HIGH = "high" 

34 

35# Consensus buckets that mean "a human still needs to look at this": the verifier 

36# could not confirm the finding, or flagged it as needing a human decision. 

37_UNRESOLVED_BUCKETS = {"disputed"} 

38_UNRESOLVED_STATUSES = {"needs_human_decision"} 

39 

40# Security keyword set. A finding is treated as security-sensitive if any of 

41# these whole-word tokens (or multi-word phrases) appears in its claim, evidence, 

42# suggested fix, or file path. Kept deliberately small and high-signal so benign 

43# findings do not over-match. Matching is case-insensitive and word-boundary 

44# anchored for single tokens (so "auth" does not fire inside "author"). 

45SECURITY_KEYWORDS: tuple[str, ...] = ( 

46 "injection", 

47 "sql injection", 

48 "xss", 

49 "csrf", 

50 "ssrf", 

51 "rce", 

52 "remote code execution", 

53 "traversal", 

54 "path traversal", 

55 "directory traversal", 

56 "secret", 

57 "credential", 

58 "password", 

59 "token", 

60 "api key", 

61 "private key", 

62 "auth", 

63 "authentication", 

64 "authorization", 

65 "deserialization", 

66 "sanitize", 

67 "sanitization", 

68 "escape", 

69 "vulnerab", 

70 "exploit", 

71 "privilege", 

72 "sandbox escape", 

73) 

74 

75# Pre-compiled, word-boundary anchored matchers for each keyword. Multi-word 

76# phrases match on a relaxed boundary (spaces inside the phrase are literal). 

77_KEYWORD_RES: tuple[re.Pattern[str], ...] = tuple( 

78 re.compile(r"\b" + re.escape(kw) + r"\b", re.IGNORECASE) for kw in SECURITY_KEYWORDS 

79) 

80 

81 

82def _severity_rank(severity: str) -> int: 

83 """Lower number = more severe (mirrors findings.SEVERITY_ORDER).""" 

84 return SEVERITY_ORDER.get(severity, len(SEVERITY_ORDER)) 

85 

86 

87def _resolved_findings(outcome: Any, findings: Any) -> list: 

88 """Pick the finding list to classify on. 

89 

90 Prefers an explicit ``findings`` argument, then ``outcome.findings``. The 

91 list is returned as-is (callers pass already-aggregated findings). 

92 """ 

93 if findings is not None: 

94 return list(findings) 

95 if outcome is not None and getattr(outcome, "findings", None) is not None: 

96 return list(outcome.findings) 

97 return [] 

98 

99 

100def _resolved_groups(outcome: Any, groups: Any) -> list: 

101 if groups is not None: 

102 return list(groups) 

103 if outcome is not None and getattr(outcome, "groups", None) is not None: 

104 return list(outcome.groups) 

105 return [] 

106 

107 

108def diff_lines_changed(diff: str | None) -> int: 

109 """Count added/removed lines in a unified diff (deterministic). 

110 

111 Counts lines beginning with a single ``+`` or ``-`` that are NOT part of the 

112 file header (``+++`` / ``---``). Returns 0 for an empty or missing diff. 

113 """ 

114 if not diff: 

115 return 0 

116 return sum( 

117 1 for line in diff.splitlines() 

118 if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) 

119 ) 

120 

121 

122def _text_blob(finding: Any) -> str: 

123 """Concatenate the human-text fields of a finding for keyword scanning.""" 

124 parts = [ 

125 getattr(finding, "claim", "") or "", 

126 getattr(finding, "evidence", "") or "", 

127 getattr(finding, "suggested_fix", "") or "", 

128 getattr(finding, "file", "") or "", 

129 getattr(finding, "reviewer", "") or "", 

130 ] 

131 return " ".join(parts) 

132 

133 

134def is_security_finding(finding: Any) -> bool: 

135 """True if a single finding looks security-related. 

136 

137 A finding is security-sensitive when EITHER its severity is ``critical`` OR 

138 any :data:`SECURITY_KEYWORDS` token appears in its text fields. The 

139 injection-scanner's synthetic finding (reviewer ``injection-scanner``, 

140 claim mentioning "injection") is therefore caught by the keyword path. 

141 """ 

142 if getattr(finding, "severity", "") == "critical": 

143 return True 

144 blob = _text_blob(finding) 

145 return any(rx.search(blob) for rx in _KEYWORD_RES) 

146 

147 

148def _risk_level(findings: list, groups: list) -> str: 

149 """Derive the risk level from the most severe finding. 

150 

151 Thresholds (deterministic): 

152 * ``high`` — any ``critical`` finding, OR any ``major`` finding that is 

153 part of a confirmed consensus group (consensus/majority bucket and not 

154 rejected/unsupported). 

155 * ``medium`` — any ``major`` finding (single-reviewer / unverified), OR any 

156 ``minor`` finding. 

157 * ``low`` — only ``nit`` / ``info`` findings, or no findings at all. 

158 """ 

159 if not findings: 

160 return RISK_LOW 

161 

162 has_critical = any(f.severity == "critical" for f in findings) 

163 if has_critical: 

164 return RISK_HIGH 

165 

166 has_major = any(f.severity == "major" for f in findings) 

167 if has_major: 

168 # A confirmed (consensus/majority, not rejected) major finding is high 

169 # risk; an isolated or rejected one is medium. 

170 for g in groups: 

171 if ( 

172 g.severity == "major" 

173 and g.bucket in ("consensus", "majority") 

174 and (getattr(g, "status", "") or "") != "unsupported" 

175 ): 

176 return RISK_HIGH 

177 return RISK_MEDIUM 

178 

179 has_minor = any(f.severity == "minor" for f in findings) 

180 if has_minor: 

181 return RISK_MEDIUM 

182 

183 return RISK_LOW 

184 

185 

186def _review_effort(findings: list, lines_changed: int) -> int: 

187 """Map findings + diff size onto a 1-5 review-effort score (deterministic). 

188 

189 Formula (clamped to 1..5): 

190 

191 base = 1 

192 + finding-count contribution: 

193 >= 8 findings -> +2, >= 3 findings -> +1, >= 1 finding -> +0 extra 

194 (1 is added below via the severity/any-finding term) 

195 + severity-spread contribution: 

196 any critical/major -> +2, else any minor -> +1 

197 + diff-size contribution: 

198 > 400 changed lines -> +2, > 80 changed lines -> +1 

199 

200 The score is monotonic-ish: more findings, more severe findings, and a 

201 larger diff each only ever raise (never lower) the effort, and the result is 

202 clamped to the documented 1..5 range. 

203 """ 

204 score = 1 

205 

206 n = len(findings) 

207 if n >= 8: 

208 score += 2 

209 elif n >= 3: 

210 score += 1 

211 elif n >= 1: 

212 score += 0 # presence is captured by the severity term below 

213 

214 most_severe = min((_severity_rank(f.severity) for f in findings), default=99) 

215 if most_severe <= _severity_rank("major"): 

216 score += 2 

217 elif most_severe <= _severity_rank("minor"): 

218 score += 1 

219 

220 if lines_changed > 400: 

221 score += 2 

222 elif lines_changed > 80: 

223 score += 1 

224 

225 return max(1, min(5, score)) 

226 

227 

228def _has_unresolved_groups(groups: list) -> bool: 

229 """True if any consensus group is disputed or needs a human decision.""" 

230 for g in groups: 

231 if getattr(g, "bucket", "") in _UNRESOLVED_BUCKETS: 

232 return True 

233 if getattr(g, "status", "") in _UNRESOLVED_STATUSES: 

234 return True 

235 return False 

236 

237 

238def classify( 

239 outcome: Any = None, 

240 *, 

241 findings: Any = None, 

242 groups: Any = None, 

243 diff: str | None = None, 

244) -> dict: 

245 """Return the deterministic PR-level classification dict. 

246 

247 Accepts either a ``JuryOutcome`` (positional) or explicit ``findings`` / 

248 ``groups`` keyword arguments (which take precedence). ``diff`` is optional; 

249 when given, its changed-line count feeds the review-effort score. 

250 

251 Returns a dict with exactly these keys: 

252 

253 ``review_effort`` int 1-5 

254 ``risk_level`` "low" | "medium" | "high" 

255 ``security_sensitive`` bool 

256 ``needs_human_attention`` bool 

257 

258 Determinism: the result is a pure function of the inputs — same findings, 

259 groups, and diff always yield the same dict. 

260 """ 

261 fs = _resolved_findings(outcome, findings) 

262 gs = _resolved_groups(outcome, groups) 

263 lines_changed = diff_lines_changed(diff) 

264 

265 risk = _risk_level(fs, gs) 

266 security = any(is_security_finding(f) for f in fs) 

267 effort = _review_effort(fs, lines_changed) 

268 needs_human = ( 

269 risk == RISK_HIGH or security or _has_unresolved_groups(gs) 

270 ) 

271 

272 return { 

273 "review_effort": effort, 

274 "risk_level": risk, 

275 "security_sensitive": bool(security), 

276 "needs_human_attention": bool(needs_human), 

277 } 

278 

279 

280def label_strings(classification: dict) -> list[str]: 

281 """Derive GitHub label strings from a classification dict (deterministic). 

282 

283 Mirrors the labels suggested in issue #7, e.g.:: 

284 

285 ["review effort: 3/5", "risk: high", "possible security issue", 

286 "needs human attention"] 

287 

288 The security and human-attention labels are only emitted when their flag is 

289 true. Order is stable. 

290 """ 

291 labels = [ 

292 f"review effort: {classification['review_effort']}/5", 

293 f"risk: {classification['risk_level']}", 

294 ] 

295 if classification.get("security_sensitive"): 

296 labels.append("possible security issue") 

297 if classification.get("needs_human_attention"): 

298 labels.append("needs human attention") 

299 return labels 

300 

301 

302def summary_line(classification: dict) -> str: 

303 """Render a compact one-line human summary of the classification.""" 

304 return ( 

305 f"review effort: {classification['review_effort']}/5" 

306 f" · risk: {classification['risk_level']}" 

307 f" · security-sensitive: {'yes' if classification['security_sensitive'] else 'no'}" 

308 f" · needs human attention: " 

309 f"{'yes' if classification['needs_human_attention'] else 'no'}" 

310 )