Coverage for src/ai_jury/classification.py: 100%
97 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
1"""Deterministic PR-level classification derived from structured findings.
3The jury report already lists individual findings and consensus groups, but
4maintainers also want a compact, at-a-glance signal: how much review effort a PR
5needs, how risky it is, whether it touches security-sensitive code, and whether
6it warrants human attention. This module derives those four classifications as a
7PURE, fully deterministic function of the structured findings, the consensus
8groups, and (optionally) the unified diff.
10Nothing here calls an LLM or the network: identical inputs always produce
11identical output, which is what makes the classification safe to snapshot-test
12and to render in the deterministic mock report.
14Classifications
15---------------
16``review_effort`` : int, 1-5
17``risk_level`` : str, one of ``low`` / ``medium`` / ``high``
18``security_sensitive`` : bool
19``needs_human_attention`` : bool
21See :func:`classify` for the exact, documented formulas.
22"""
23from __future__ import annotations
25import re
26from typing import Any
28from .findings import SEVERITY_ORDER
30# Risk levels, ordered least to most severe.
31RISK_LOW = "low"
32RISK_MEDIUM = "medium"
33RISK_HIGH = "high"
35# Consensus buckets that mean "a human still needs to look at this": the verifier
36# could not confirm the finding, or flagged it as needing a human decision.
37_UNRESOLVED_BUCKETS = {"disputed"}
38_UNRESOLVED_STATUSES = {"needs_human_decision"}
40# Security keyword set. A finding is treated as security-sensitive if any of
41# these whole-word tokens (or multi-word phrases) appears in its claim, evidence,
42# suggested fix, or file path. Kept deliberately small and high-signal so benign
43# findings do not over-match. Matching is case-insensitive and word-boundary
44# anchored for single tokens (so "auth" does not fire inside "author").
45SECURITY_KEYWORDS: tuple[str, ...] = (
46 "injection",
47 "sql injection",
48 "xss",
49 "csrf",
50 "ssrf",
51 "rce",
52 "remote code execution",
53 "traversal",
54 "path traversal",
55 "directory traversal",
56 "secret",
57 "credential",
58 "password",
59 "token",
60 "api key",
61 "private key",
62 "auth",
63 "authentication",
64 "authorization",
65 "deserialization",
66 "sanitize",
67 "sanitization",
68 "escape",
69 "vulnerab",
70 "exploit",
71 "privilege",
72 "sandbox escape",
73)
75# Pre-compiled, word-boundary anchored matchers for each keyword. Multi-word
76# phrases match on a relaxed boundary (spaces inside the phrase are literal).
77_KEYWORD_RES: tuple[re.Pattern[str], ...] = tuple(
78 re.compile(r"\b" + re.escape(kw) + r"\b", re.IGNORECASE) for kw in SECURITY_KEYWORDS
79)
82def _severity_rank(severity: str) -> int:
83 """Lower number = more severe (mirrors findings.SEVERITY_ORDER)."""
84 return SEVERITY_ORDER.get(severity, len(SEVERITY_ORDER))
87def _resolved_findings(outcome: Any, findings: Any) -> list:
88 """Pick the finding list to classify on.
90 Prefers an explicit ``findings`` argument, then ``outcome.findings``. The
91 list is returned as-is (callers pass already-aggregated findings).
92 """
93 if findings is not None:
94 return list(findings)
95 if outcome is not None and getattr(outcome, "findings", None) is not None:
96 return list(outcome.findings)
97 return []
100def _resolved_groups(outcome: Any, groups: Any) -> list:
101 if groups is not None:
102 return list(groups)
103 if outcome is not None and getattr(outcome, "groups", None) is not None:
104 return list(outcome.groups)
105 return []
108def diff_lines_changed(diff: str | None) -> int:
109 """Count added/removed lines in a unified diff (deterministic).
111 Counts lines beginning with a single ``+`` or ``-`` that are NOT part of the
112 file header (``+++`` / ``---``). Returns 0 for an empty or missing diff.
113 """
114 if not diff:
115 return 0
116 return sum(
117 1 for line in diff.splitlines()
118 if line.startswith(("+", "-")) and not line.startswith(("+++", "---"))
119 )
122def _text_blob(finding: Any) -> str:
123 """Concatenate the human-text fields of a finding for keyword scanning."""
124 parts = [
125 getattr(finding, "claim", "") or "",
126 getattr(finding, "evidence", "") or "",
127 getattr(finding, "suggested_fix", "") or "",
128 getattr(finding, "file", "") or "",
129 getattr(finding, "reviewer", "") or "",
130 ]
131 return " ".join(parts)
134def is_security_finding(finding: Any) -> bool:
135 """True if a single finding looks security-related.
137 A finding is security-sensitive when EITHER its severity is ``critical`` OR
138 any :data:`SECURITY_KEYWORDS` token appears in its text fields. The
139 injection-scanner's synthetic finding (reviewer ``injection-scanner``,
140 claim mentioning "injection") is therefore caught by the keyword path.
141 """
142 if getattr(finding, "severity", "") == "critical":
143 return True
144 blob = _text_blob(finding)
145 return any(rx.search(blob) for rx in _KEYWORD_RES)
148def _risk_level(findings: list, groups: list) -> str:
149 """Derive the risk level from the most severe finding.
151 Thresholds (deterministic):
152 * ``high`` — any ``critical`` finding, OR any ``major`` finding that is
153 part of a confirmed consensus group (consensus/majority bucket and not
154 rejected/unsupported).
155 * ``medium`` — any ``major`` finding (single-reviewer / unverified), OR any
156 ``minor`` finding.
157 * ``low`` — only ``nit`` / ``info`` findings, or no findings at all.
158 """
159 if not findings:
160 return RISK_LOW
162 has_critical = any(f.severity == "critical" for f in findings)
163 if has_critical:
164 return RISK_HIGH
166 has_major = any(f.severity == "major" for f in findings)
167 if has_major:
168 # A confirmed (consensus/majority, not rejected) major finding is high
169 # risk; an isolated or rejected one is medium.
170 for g in groups:
171 if (
172 g.severity == "major"
173 and g.bucket in ("consensus", "majority")
174 and (getattr(g, "status", "") or "") != "unsupported"
175 ):
176 return RISK_HIGH
177 return RISK_MEDIUM
179 has_minor = any(f.severity == "minor" for f in findings)
180 if has_minor:
181 return RISK_MEDIUM
183 return RISK_LOW
186def _review_effort(findings: list, lines_changed: int) -> int:
187 """Map findings + diff size onto a 1-5 review-effort score (deterministic).
189 Formula (clamped to 1..5):
191 base = 1
192 + finding-count contribution:
193 >= 8 findings -> +2, >= 3 findings -> +1, >= 1 finding -> +0 extra
194 (1 is added below via the severity/any-finding term)
195 + severity-spread contribution:
196 any critical/major -> +2, else any minor -> +1
197 + diff-size contribution:
198 > 400 changed lines -> +2, > 80 changed lines -> +1
200 The score is monotonic-ish: more findings, more severe findings, and a
201 larger diff each only ever raise (never lower) the effort, and the result is
202 clamped to the documented 1..5 range.
203 """
204 score = 1
206 n = len(findings)
207 if n >= 8:
208 score += 2
209 elif n >= 3:
210 score += 1
211 elif n >= 1:
212 score += 0 # presence is captured by the severity term below
214 most_severe = min((_severity_rank(f.severity) for f in findings), default=99)
215 if most_severe <= _severity_rank("major"):
216 score += 2
217 elif most_severe <= _severity_rank("minor"):
218 score += 1
220 if lines_changed > 400:
221 score += 2
222 elif lines_changed > 80:
223 score += 1
225 return max(1, min(5, score))
228def _has_unresolved_groups(groups: list) -> bool:
229 """True if any consensus group is disputed or needs a human decision."""
230 for g in groups:
231 if getattr(g, "bucket", "") in _UNRESOLVED_BUCKETS:
232 return True
233 if getattr(g, "status", "") in _UNRESOLVED_STATUSES:
234 return True
235 return False
238def classify(
239 outcome: Any = None,
240 *,
241 findings: Any = None,
242 groups: Any = None,
243 diff: str | None = None,
244) -> dict:
245 """Return the deterministic PR-level classification dict.
247 Accepts either a ``JuryOutcome`` (positional) or explicit ``findings`` /
248 ``groups`` keyword arguments (which take precedence). ``diff`` is optional;
249 when given, its changed-line count feeds the review-effort score.
251 Returns a dict with exactly these keys:
253 ``review_effort`` int 1-5
254 ``risk_level`` "low" | "medium" | "high"
255 ``security_sensitive`` bool
256 ``needs_human_attention`` bool
258 Determinism: the result is a pure function of the inputs — same findings,
259 groups, and diff always yield the same dict.
260 """
261 fs = _resolved_findings(outcome, findings)
262 gs = _resolved_groups(outcome, groups)
263 lines_changed = diff_lines_changed(diff)
265 risk = _risk_level(fs, gs)
266 security = any(is_security_finding(f) for f in fs)
267 effort = _review_effort(fs, lines_changed)
268 needs_human = (
269 risk == RISK_HIGH or security or _has_unresolved_groups(gs)
270 )
272 return {
273 "review_effort": effort,
274 "risk_level": risk,
275 "security_sensitive": bool(security),
276 "needs_human_attention": bool(needs_human),
277 }
280def label_strings(classification: dict) -> list[str]:
281 """Derive GitHub label strings from a classification dict (deterministic).
283 Mirrors the labels suggested in issue #7, e.g.::
285 ["review effort: 3/5", "risk: high", "possible security issue",
286 "needs human attention"]
288 The security and human-attention labels are only emitted when their flag is
289 true. Order is stable.
290 """
291 labels = [
292 f"review effort: {classification['review_effort']}/5",
293 f"risk: {classification['risk_level']}",
294 ]
295 if classification.get("security_sensitive"):
296 labels.append("possible security issue")
297 if classification.get("needs_human_attention"):
298 labels.append("needs human attention")
299 return labels
302def summary_line(classification: dict) -> str:
303 """Render a compact one-line human summary of the classification."""
304 return (
305 f"review effort: {classification['review_effort']}/5"
306 f" · risk: {classification['risk_level']}"
307 f" · security-sensitive: {'yes' if classification['security_sensitive'] else 'no'}"
308 f" · needs human attention: "
309 f"{'yes' if classification['needs_human_attention'] else 'no'}"
310 )