Coverage for src/keel/redaction.py: 100%
98 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Capture-artifact redaction helpers.
3The defaults intentionally target common credential shapes only. Project-specific
4terms stay in ``policy_pack.capture_redaction.deny_patterns``.
5"""
7from __future__ import annotations
9import re
10from dataclasses import dataclass
11from typing import Any
13from . import config as cfg
15REDACTION_SCHEMA_VERSION = "keel.capture-redaction.v1"
16_ASSIGNMENT_SEGMENT_BOUNDARY = r"[,;][\"']?\b[A-Za-z0-9_-]{1,128}\b[\"']?\s*[:=]"
19class RedactionError(ValueError):
20 """Raised when a redaction policy cannot be compiled safely."""
23@dataclass(frozen=True)
24class RedactionRule:
25 id: str
26 pattern: re.Pattern[str]
27 replacement: str
28 source: str
29 literal_replacement: bool = False
32@dataclass(frozen=True)
33class RedactionPolicy:
34 rules: tuple[RedactionRule, ...]
37@dataclass(frozen=True)
38class RedactionResult:
39 value: Any
40 audit: dict[str, Any]
43_DEFAULT_RULES: tuple[tuple[str, str, str], ...] = (
44 (
45 "private-key-block",
46 r"-----BEGIN [A-Z ]*PRIVATE KEY-----[\s\S]*?-----END [A-Z ]*PRIVATE KEY-----",
47 "[REDACTED:private-key]",
48 ),
49 (
50 "bearer-token",
51 r"\bBearer\s+[A-Za-z0-9._~+/=-]{16,}",
52 "Bearer [REDACTED:bearer-token]",
53 ),
54 (
55 "github-token",
56 r"\bgh[pousr]_[A-Za-z0-9_]{20,}\b",
57 "[REDACTED:github-token]",
58 ),
59 (
60 "credential-url",
61 r"([A-Za-z][A-Za-z0-9+.-]{0,64}://)([^/\s:@]+):([^/\s@]+)@",
62 r"\1[REDACTED:credentials]@",
63 ),
64 (
65 # Matches ``KEY=value`` / ``KEY: value`` / ``"key": "value"`` credential
66 # assignments. The optional quote on each side of the key consumes a JSON
67 # key's surrounding quotes (so ``{"api_key": …}`` redacts cleanly without an
68 # orphaned quote); the key itself may carry an arbitrary prefix
69 # (``ANTHROPIC_API_KEY``). The value matches, in order: a balanced double- or
70 # single-quoted string of 8+ chars (spaces allowed), or an unquoted run of
71 # 8+ chars whose leading ``["']?`` also catches an *unbalanced* opening quote
72 # (``KEY="secret`` with no close), closing that leak. The 8-char floor on
73 # every arm keeps short status strings (``token: "none"``, ``api_key=""``)
74 # from being redacted. The value class excludes code brackets and stops before
75 # comma/semicolon-delimited sibling assignments (``a=secret,b=value``), so
76 # adjacent credential keys are redacted and audited independently. The
77 # possessive run plus ``(?![(\[])`` tail rejects call / subscript expressions
78 # (``token = get_token()``) instead of mangling them; a value cannot start
79 # with ``$``, so ``${...}`` / ``$(...)`` references are left intact. The
80 # replacement normalises the separator to ``=`` (a ``KEY: secret`` colon form
81 # renders as ``KEY=[REDACTED:credential]``).
82 "credential-assignment",
83 r"(?i)[\"']?\b([A-Za-z0-9_-]*(?:api[_-]?key|access[_-]?token|refresh[_-]?token|client[_-]?secret|"
84 r"secret[_-]?access[_-]?key|secret[_-]?key|token|secret|password|passwd|pwd)"
85 r")\b[\"']?\s*[:=]\s*"
86 r"(?:\"[^\"\n]{8,}\"|'[^'\n]{8,}'|[\"']?"
87 rf"(?:(?!{_ASSIGNMENT_SEGMENT_BOUNDARY})[^\s\"'(){{}}\[\]$]){{8,}}+(?![(\[]))",
88 r"\1=[REDACTED:credential]",
89 ),
90 (
91 "llm-api-key",
92 r"\bsk-[A-Za-z0-9_-]{20,}\b",
93 "[REDACTED:llm-api-key]",
94 ),
95)
98def policy_from_config(
99 config: cfg.ProjectConfig | None = None,
100 *,
101 strict: bool = True,
102) -> RedactionPolicy:
103 """Compile the default policy plus project-provided deny patterns."""
104 rules = [
105 RedactionRule(rule_id, _compile(pattern, f"default.{rule_id}"), replacement, "default")
106 for rule_id, pattern, replacement in _DEFAULT_RULES
107 ]
108 if config is not None:
109 rules.extend(_configured_rules(config, strict=strict))
110 return RedactionPolicy(tuple(rules))
113def sanitize(value: Any, policy: RedactionPolicy) -> RedactionResult:
114 """Recursively sanitize strings inside ``value`` and return an audit summary."""
115 counts: dict[str, int] = {}
116 sanitized = _sanitize_value(value, policy, counts)
117 rules = [
118 {"id": rule.id, "source": rule.source, "count": counts[rule.id]}
119 for rule in policy.rules
120 if counts.get(rule.id, 0)
121 ]
122 return RedactionResult(
123 sanitized,
124 {
125 "schema_version": REDACTION_SCHEMA_VERSION,
126 "status": "applied",
127 "rules": rules,
128 "redaction_count": sum(item["count"] for item in rules),
129 },
130 )
133def contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]:
134 """Return the redaction policy contract without exposing sensitive values."""
135 configured_ids = _configured_rule_ids(config)
136 return {
137 "schema_version": REDACTION_SCHEMA_VERSION,
138 "default_redaction": True,
139 "configured_rule_count": len(configured_ids),
140 "configured_rule_ids": configured_ids,
141 "policy_source": "defaults + policy_pack.capture_redaction.deny_patterns",
142 "audit_includes_original_values": False,
143 "invalid_policy_handling": "skip-artifact-with-reason",
144 }
147def _configured_rule_ids(config: cfg.ProjectConfig) -> list[str]:
148 pack = config.policy_pack or {}
149 section = pack.get("capture_redaction")
150 if not isinstance(section, dict):
151 return []
152 raw_rules = section.get("deny_patterns")
153 if not isinstance(raw_rules, list):
154 return []
155 ids: list[str] = []
156 for index, raw in enumerate(raw_rules):
157 if isinstance(raw, dict):
158 ids.append(str(raw.get("id") or f"deny-pattern-{index + 1}"))
159 return ids
162def _configured_rules(config: cfg.ProjectConfig, *, strict: bool = True) -> list[RedactionRule]:
163 pack = config.policy_pack or {}
164 section = pack.get("capture_redaction")
165 if not isinstance(section, dict):
166 return []
167 raw_rules = section.get("deny_patterns")
168 if not isinstance(raw_rules, list):
169 return []
170 rules: list[RedactionRule] = []
171 for index, raw in enumerate(raw_rules):
172 if not isinstance(raw, dict):
173 continue
174 rule_id = str(raw.get("id") or f"deny-pattern-{index + 1}")
175 pattern = raw.get("pattern")
176 if not isinstance(pattern, str) or not pattern:
177 continue
178 replacement = raw.get("replacement")
179 if not isinstance(replacement, str) or not replacement:
180 replacement = f"[REDACTED:{rule_id}]"
181 source = f"policy_pack.capture_redaction.deny_patterns.{rule_id}"
182 try:
183 compiled = _compile(pattern, source)
184 except RedactionError:
185 if strict:
186 raise
187 continue
188 rules.append(RedactionRule(rule_id, compiled, replacement, source,
189 literal_replacement=True))
190 return rules
193def _compile(pattern: str, source: str) -> re.Pattern[str]:
194 try:
195 return re.compile(pattern)
196 except re.error as exc:
197 raise RedactionError(f"invalid capture redaction pattern at {source}: {exc}") from exc
200def _sanitize_value(value: Any, policy: RedactionPolicy, counts: dict[str, int]) -> Any:
201 if isinstance(value, str):
202 return _sanitize_string(value, policy, counts)
203 if isinstance(value, list):
204 return [_sanitize_value(item, policy, counts) for item in value]
205 if isinstance(value, dict):
206 return {key: _sanitize_value(child, policy, counts) for key, child in value.items()}
207 return value
210def _sanitize_string(value: str, policy: RedactionPolicy, counts: dict[str, int]) -> str:
211 text = value
212 for rule in policy.rules:
213 replacement = (
214 (lambda _match, value=rule.replacement: value)
215 if rule.literal_replacement else rule.replacement
216 )
217 text, count = rule.pattern.subn(replacement, text)
218 if count:
219 counts[rule.id] = counts.get(rule.id, 0) + count
220 return text