Coverage for src/keel/redaction.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Capture-artifact redaction helpers. 

2 

3The defaults intentionally target common credential shapes only. Project-specific 

4terms stay in ``policy_pack.capture_redaction.deny_patterns``. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from dataclasses import dataclass 

11from typing import Any 

12 

13from . import config as cfg 

14 

15REDACTION_SCHEMA_VERSION = "keel.capture-redaction.v1" 

16_ASSIGNMENT_SEGMENT_BOUNDARY = r"[,;][\"']?\b[A-Za-z0-9_-]{1,128}\b[\"']?\s*[:=]" 

17 

18 

19class RedactionError(ValueError): 

20 """Raised when a redaction policy cannot be compiled safely.""" 

21 

22 

23@dataclass(frozen=True) 

24class RedactionRule: 

25 id: str 

26 pattern: re.Pattern[str] 

27 replacement: str 

28 source: str 

29 literal_replacement: bool = False 

30 

31 

32@dataclass(frozen=True) 

33class RedactionPolicy: 

34 rules: tuple[RedactionRule, ...] 

35 

36 

37@dataclass(frozen=True) 

38class RedactionResult: 

39 value: Any 

40 audit: dict[str, Any] 

41 

42 

43_DEFAULT_RULES: tuple[tuple[str, str, str], ...] = ( 

44 ( 

45 "private-key-block", 

46 r"-----BEGIN [A-Z ]*PRIVATE KEY-----[\s\S]*?-----END [A-Z ]*PRIVATE KEY-----", 

47 "[REDACTED:private-key]", 

48 ), 

49 ( 

50 "bearer-token", 

51 r"\bBearer\s+[A-Za-z0-9._~+/=-]{16,}", 

52 "Bearer [REDACTED:bearer-token]", 

53 ), 

54 ( 

55 "github-token", 

56 r"\bgh[pousr]_[A-Za-z0-9_]{20,}\b", 

57 "[REDACTED:github-token]", 

58 ), 

59 ( 

60 "credential-url", 

61 r"([A-Za-z][A-Za-z0-9+.-]{0,64}://)([^/\s:@]+):([^/\s@]+)@", 

62 r"\1[REDACTED:credentials]@", 

63 ), 

64 ( 

65 # Matches ``KEY=value`` / ``KEY: value`` / ``"key": "value"`` credential 

66 # assignments. The optional quote on each side of the key consumes a JSON 

67 # key's surrounding quotes (so ``{"api_key": …}`` redacts cleanly without an 

68 # orphaned quote); the key itself may carry an arbitrary prefix 

69 # (``ANTHROPIC_API_KEY``). The value matches, in order: a balanced double- or 

70 # single-quoted string of 8+ chars (spaces allowed), or an unquoted run of 

71 # 8+ chars whose leading ``["']?`` also catches an *unbalanced* opening quote 

72 # (``KEY="secret`` with no close), closing that leak. The 8-char floor on 

73 # every arm keeps short status strings (``token: "none"``, ``api_key=""``) 

74 # from being redacted. The value class excludes code brackets and stops before 

75 # comma/semicolon-delimited sibling assignments (``a=secret,b=value``), so 

76 # adjacent credential keys are redacted and audited independently. The 

77 # possessive run plus ``(?![(\[])`` tail rejects call / subscript expressions 

78 # (``token = get_token()``) instead of mangling them; a value cannot start 

79 # with ``$``, so ``${...}`` / ``$(...)`` references are left intact. The 

80 # replacement normalises the separator to ``=`` (a ``KEY: secret`` colon form 

81 # renders as ``KEY=[REDACTED:credential]``). 

82 "credential-assignment", 

83 r"(?i)[\"']?\b([A-Za-z0-9_-]*(?:api[_-]?key|access[_-]?token|refresh[_-]?token|client[_-]?secret|" 

84 r"secret[_-]?access[_-]?key|secret[_-]?key|token|secret|password|passwd|pwd)" 

85 r")\b[\"']?\s*[:=]\s*" 

86 r"(?:\"[^\"\n]{8,}\"|'[^'\n]{8,}'|[\"']?" 

87 rf"(?:(?!{_ASSIGNMENT_SEGMENT_BOUNDARY})[^\s\"'(){{}}\[\]$]){{8,}}+(?![(\[]))", 

88 r"\1=[REDACTED:credential]", 

89 ), 

90 ( 

91 "llm-api-key", 

92 r"\bsk-[A-Za-z0-9_-]{20,}\b", 

93 "[REDACTED:llm-api-key]", 

94 ), 

95) 

96 

97 

98def policy_from_config( 

99 config: cfg.ProjectConfig | None = None, 

100 *, 

101 strict: bool = True, 

102) -> RedactionPolicy: 

103 """Compile the default policy plus project-provided deny patterns.""" 

104 rules = [ 

105 RedactionRule(rule_id, _compile(pattern, f"default.{rule_id}"), replacement, "default") 

106 for rule_id, pattern, replacement in _DEFAULT_RULES 

107 ] 

108 if config is not None: 

109 rules.extend(_configured_rules(config, strict=strict)) 

110 return RedactionPolicy(tuple(rules)) 

111 

112 

113def sanitize(value: Any, policy: RedactionPolicy) -> RedactionResult: 

114 """Recursively sanitize strings inside ``value`` and return an audit summary.""" 

115 counts: dict[str, int] = {} 

116 sanitized = _sanitize_value(value, policy, counts) 

117 rules = [ 

118 {"id": rule.id, "source": rule.source, "count": counts[rule.id]} 

119 for rule in policy.rules 

120 if counts.get(rule.id, 0) 

121 ] 

122 return RedactionResult( 

123 sanitized, 

124 { 

125 "schema_version": REDACTION_SCHEMA_VERSION, 

126 "status": "applied", 

127 "rules": rules, 

128 "redaction_count": sum(item["count"] for item in rules), 

129 }, 

130 ) 

131 

132 

133def contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]: 

134 """Return the redaction policy contract without exposing sensitive values.""" 

135 configured_ids = _configured_rule_ids(config) 

136 return { 

137 "schema_version": REDACTION_SCHEMA_VERSION, 

138 "default_redaction": True, 

139 "configured_rule_count": len(configured_ids), 

140 "configured_rule_ids": configured_ids, 

141 "policy_source": "defaults + policy_pack.capture_redaction.deny_patterns", 

142 "audit_includes_original_values": False, 

143 "invalid_policy_handling": "skip-artifact-with-reason", 

144 } 

145 

146 

147def _configured_rule_ids(config: cfg.ProjectConfig) -> list[str]: 

148 pack = config.policy_pack or {} 

149 section = pack.get("capture_redaction") 

150 if not isinstance(section, dict): 

151 return [] 

152 raw_rules = section.get("deny_patterns") 

153 if not isinstance(raw_rules, list): 

154 return [] 

155 ids: list[str] = [] 

156 for index, raw in enumerate(raw_rules): 

157 if isinstance(raw, dict): 

158 ids.append(str(raw.get("id") or f"deny-pattern-{index + 1}")) 

159 return ids 

160 

161 

162def _configured_rules(config: cfg.ProjectConfig, *, strict: bool = True) -> list[RedactionRule]: 

163 pack = config.policy_pack or {} 

164 section = pack.get("capture_redaction") 

165 if not isinstance(section, dict): 

166 return [] 

167 raw_rules = section.get("deny_patterns") 

168 if not isinstance(raw_rules, list): 

169 return [] 

170 rules: list[RedactionRule] = [] 

171 for index, raw in enumerate(raw_rules): 

172 if not isinstance(raw, dict): 

173 continue 

174 rule_id = str(raw.get("id") or f"deny-pattern-{index + 1}") 

175 pattern = raw.get("pattern") 

176 if not isinstance(pattern, str) or not pattern: 

177 continue 

178 replacement = raw.get("replacement") 

179 if not isinstance(replacement, str) or not replacement: 

180 replacement = f"[REDACTED:{rule_id}]" 

181 source = f"policy_pack.capture_redaction.deny_patterns.{rule_id}" 

182 try: 

183 compiled = _compile(pattern, source) 

184 except RedactionError: 

185 if strict: 

186 raise 

187 continue 

188 rules.append(RedactionRule(rule_id, compiled, replacement, source, 

189 literal_replacement=True)) 

190 return rules 

191 

192 

193def _compile(pattern: str, source: str) -> re.Pattern[str]: 

194 try: 

195 return re.compile(pattern) 

196 except re.error as exc: 

197 raise RedactionError(f"invalid capture redaction pattern at {source}: {exc}") from exc 

198 

199 

200def _sanitize_value(value: Any, policy: RedactionPolicy, counts: dict[str, int]) -> Any: 

201 if isinstance(value, str): 

202 return _sanitize_string(value, policy, counts) 

203 if isinstance(value, list): 

204 return [_sanitize_value(item, policy, counts) for item in value] 

205 if isinstance(value, dict): 

206 return {key: _sanitize_value(child, policy, counts) for key, child in value.items()} 

207 return value 

208 

209 

210def _sanitize_string(value: str, policy: RedactionPolicy, counts: dict[str, int]) -> str: 

211 text = value 

212 for rule in policy.rules: 

213 replacement = ( 

214 (lambda _match, value=rule.replacement: value) 

215 if rule.literal_replacement else rule.replacement 

216 ) 

217 text, count = rule.pattern.subn(replacement, text) 

218 if count: 

219 counts[rule.id] = counts.get(rule.id, 0) + count 

220 return text