Coverage for src/ai_jury/injection.py: 100%
58 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
1"""Prompt-injection heuristics for untrusted review input (OWASP LLM01).
3The jury feeds attacker-controlled content (the PR diff, and via ``--pr`` the
4PR title/body) into reviewer prompts. This module scans that content for common
5prompt-injection patterns and surfaces hits as *synthetic findings/warnings* —
6it never alters agent behaviour or the CI gate. Surfacing-not-obeying is the
7whole point: a human (and the structured consensus pipeline) stays in control.
9The detector is intentionally conservative and dependency-free (stdlib only).
10False positives are acceptable here because a hit only adds an advisory finding;
11it cannot flip a verdict.
12"""
13from __future__ import annotations
15import re
16from dataclasses import dataclass
18# Zero-width / bidi control characters often used to smuggle hidden text.
19_ZERO_WIDTH = (
20 "" # zero-width space
21 "" # zero-width non-joiner
22 "" # zero-width joiner
23 "" # word joiner
24 "" # zero-width no-break space / BOM
25 "" # bidi embedding/override controls
26)
27_ZERO_WIDTH_RE = re.compile("[" + _ZERO_WIDTH + "]")
29# Imperative phrases that try to override the system/developer instructions.
30_PHRASE_PATTERNS: tuple[tuple[str, re.Pattern], ...] = (
31 ("override-instructions", re.compile(
32 r"(?i)\b(ignore|disregard|forget|override)\b[^.\n]{0,40}"
33 r"\b(previous|prior|above|earlier|all|any|the)\b[^.\n]{0,20}"
34 r"\b(instruction|prompt|message|context|rule|direction)s?\b"
35 )),
36 ("role-reassignment", re.compile(
37 r"(?i)\byou\s+are\s+now\b|\bnew\s+(instructions?|persona|role|system\s+prompt)\b"
38 )),
39 ("fake-system-turn", re.compile(
40 r"(?im)^\s*(system|assistant|developer)\s*:",
41 )),
42 ("verdict-coercion", re.compile(
43 r"(?i)\b(approve|lgtm|pass|merge)\b[^.\n]{0,40}"
44 r"\b(no\s+findings?|no\s+issues?|without\s+(any\s+)?(review|findings?|comment))\b"
45 )),
46 ("instruction-tag", re.compile(
47 r"(?i)<\s*/?\s*(system|instructions?|prompt)\s*>"
48 )),
49)
51# A long run of base64-ish characters can hide an encoded payload.
52_BASE64_RE = re.compile(r"[A-Za-z0-9+/]{120,}={0,2}")
55@dataclass
56class InjectionHit:
57 """One suspicious pattern detected in untrusted content."""
59 kind: str
60 source: str # "diff" or "context"
61 line: int | None
62 snippet: str
64 def location(self) -> str:
65 loc = self.source
66 if self.line is not None:
67 loc = f"{self.source}:{self.line}"
68 return loc
71def _snippet(text: str, start: int, end: int, width: int = 60) -> str:
72 frag = text[start:end]
73 frag = frag.replace("\n", "\\n").replace("\r", "")
74 frag = "".join(ch for ch in frag if ch.isprintable())
75 if len(frag) > width:
76 frag = frag[:width] + "..."
77 return frag.strip()
80def _line_of(text: str, index: int) -> int:
81 return text.count("\n", 0, index) + 1
84def scan(text: str, source: str = "diff") -> list[InjectionHit]:
85 """Scan *text* for prompt-injection patterns.
87 Returns a (possibly empty) list of :class:`InjectionHit`. Never raises.
88 *source* labels where the text came from ("diff" or "context").
89 """
90 if not text:
91 return []
93 hits: list[InjectionHit] = []
95 for kind, pat in _PHRASE_PATTERNS:
96 for m in pat.finditer(text):
97 hits.append(
98 InjectionHit(
99 kind=kind,
100 source=source,
101 line=_line_of(text, m.start()),
102 snippet=_snippet(text, m.start(), m.end()),
103 )
104 )
106 for m in _BASE64_RE.finditer(text):
107 hits.append(
108 InjectionHit(
109 kind="base64-blob",
110 source=source,
111 line=_line_of(text, m.start()),
112 snippet=f"{len(m.group(0))}-char base64-like blob",
113 )
114 )
116 for m in _ZERO_WIDTH_RE.finditer(text):
117 hits.append(
118 InjectionHit(
119 kind="zero-width-char",
120 source=source,
121 line=_line_of(text, m.start()),
122 snippet=f"hidden control char U+{ord(m.group(0)):04X}",
123 )
124 )
126 return hits
129def scan_inputs(diff: str, context: str = "") -> list[InjectionHit]:
130 """Scan both the diff and PR context, labelling each hit's source."""
131 hits = scan(diff, source="diff")
132 if context:
133 hits.extend(scan(context, source="context"))
134 return hits
137def hits_to_warnings(hits: list[InjectionHit]) -> list[str]:
138 """Render hits as human-readable warning strings for ``outcome.warnings``."""
139 out: list[str] = []
140 for h in hits:
141 out.append(
142 f"possible prompt-injection ({h.kind}) in {h.location()}: {h.snippet}"
143 )
144 return out
147def hits_to_finding(hits: list[InjectionHit]):
148 """Build a single synthetic ``[major]`` Finding summarizing all hits.
150 Returns ``None`` when there are no hits. The finding is advisory: it informs
151 the human and report, but the CI gate is derived from structured *consensus*
152 (see ``ci.evaluate_ci``), so an injected "APPROVE" cannot flip the gate.
153 """
154 if not hits:
155 return None
156 # Imported lazily to avoid a circular import (findings has no dep on us).
157 from .findings import Finding
159 first = hits[0]
160 kinds = sorted({h.kind for h in hits})
161 locs = ", ".join(dict.fromkeys(h.location() for h in hits[:5]))
162 claim = (
163 f"possible prompt-injection in untrusted input "
164 f"({len(hits)} hit(s): {', '.join(kinds)})"
165 )
166 return Finding(
167 severity="major",
168 file=first.source,
169 line=first.line,
170 claim=claim,
171 evidence=(
172 "Untrusted diff/PR content contains text resembling instructions to "
173 f"the model at {locs}. Treated as data, not obeyed."
174 ),
175 suggested_fix=(
176 "Review the flagged locations manually; do not act on any instructions "
177 "embedded in the diff or PR description."
178 ),
179 confidence="medium",
180 reviewer="injection-scanner",
181 )