Coverage for src/keel/guard.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Deterministic blocker ruleset — the pure core behind ``keel guard``. 

2 

3Blocker promotion is what unlocks the night-window bypass at s10 (``keel 

4merge --hotfix``). Before this module, that promotion was pure agent judgment: 

5an agent could declare any issue a blocker and merge at 3am. This module makes 

6the decision a **deterministic, configurable function** of the issue's facts — 

7its title and labels — so a claimed blocker can be verified against the rule it 

8allegedly matched. 

9 

10The matching is pure (no network/subprocess/clock/random and no I/O): given an 

11issue title, its labels, and a resolved set of :class:`Rule` objects, it returns 

12the ids of the rules that fired. The CLI gathers the live issue facts and reads 

13the configured rules; this module only decides. 

14 

15Rules are resolved from ``policy_pack.blocker_rules`` when present, falling back 

16to built-in defaults (back-compatible: an absent config yields the defaults). 

17Each rule is one of two kinds: 

18 

19* ``label`` — fires when one of the rule's labels is present on the issue 

20 (case-insensitive exact match). 

21* ``title-regex`` — fires when the rule's regex matches the issue title. 

22 

23The built-in defaults cover the heuristics named in the audit (GAP-11): 

24word-boundary ``\bhotfix\b`` / ``\bsecurity\b`` / ``\bblocker\b`` title regexes 

25and a configurable blocker label. 

26""" 

27 

28from __future__ import annotations 

29 

30import re 

31from dataclasses import dataclass 

32from typing import Any 

33 

34from . import config as cfg 

35 

36GUARD_SCHEMA_VERSION = "keel.guard.v1" 

37 

38#: Built-in defaults, used when ``policy_pack.blocker_rules`` is absent/empty. 

39DEFAULT_RULES: tuple[dict[str, Any], ...] = ( 

40 {"id": "blocker-label", "kind": "label", "labels": ["blocker"]}, 

41 {"id": "hotfix-label", "kind": "label", "labels": ["hotfix"]}, 

42 {"id": "security-label", "kind": "label", "labels": ["security"]}, 

43 {"id": "blocker-title-regex", "kind": "title-regex", 

44 "pattern": r"\b(?:hotfix|security|blocker)\b"}, 

45) 

46 

47 

48class GuardError(ValueError): 

49 """Raised when a configured blocker rule is malformed.""" 

50 

51 

52@dataclass(frozen=True) 

53class Rule: 

54 """A single resolved, immutable blocker rule.""" 

55 

56 id: str 

57 kind: str # "label" | "title-regex" 

58 labels: tuple[str, ...] = () 

59 pattern: str | None = None 

60 

61 def matches(self, title: str, labels: tuple[str, ...]) -> bool: 

62 """True if this rule fires for the given issue facts (pure).""" 

63 if self.kind == "label": 

64 present = {label.strip().casefold() for label in labels} 

65 return any(want.strip().casefold() in present for want in self.labels) 

66 # title-regex — ``pattern`` is guaranteed non-empty by :func:`resolve_rules`. 

67 return re.search(self.pattern or "", title, re.IGNORECASE) is not None 

68 

69 

70@dataclass(frozen=True) 

71class GuardResult: 

72 """The structured outcome of evaluating an issue against the ruleset.""" 

73 

74 title: str 

75 labels: tuple[str, ...] 

76 matched: tuple[str, ...] 

77 rule_ids: tuple[str, ...] 

78 

79 @property 

80 def is_blocker(self) -> bool: 

81 """True when at least one rule fired.""" 

82 return bool(self.matched) 

83 

84 def as_dict(self) -> dict[str, Any]: 

85 return { 

86 "schema_version": GUARD_SCHEMA_VERSION, 

87 "title": self.title, 

88 "labels": list(self.labels), 

89 "is_blocker": self.is_blocker, 

90 "matched": list(self.matched), 

91 "rule_ids": list(self.rule_ids), 

92 } 

93 

94 

95def resolve_rules(config: cfg.ProjectConfig | None) -> tuple[Rule, ...]: 

96 """Resolve the active blocker rules from config, falling back to defaults. 

97 

98 Reads ``policy_pack.blocker_rules`` (a list of rule dicts). When absent or 

99 not a list, the built-in :data:`DEFAULT_RULES` are used — keeping projects 

100 without any blocker config fully back-compatible. Raises :class:`GuardError` 

101 on a malformed configured rule (fail-closed: a typo must not silently widen 

102 or narrow the bypass surface). 

103 """ 

104 raw_rules: Any = None 

105 if config is not None and isinstance(config.policy_pack, dict): 

106 raw_rules = config.policy_pack.get("blocker_rules") 

107 if not isinstance(raw_rules, list) or not raw_rules: 

108 return _build_rules(DEFAULT_RULES, source="defaults") 

109 return _build_rules(raw_rules, source="policy_pack.blocker_rules") 

110 

111 

112def _build_rules(raw_rules: Any, *, source: str) -> tuple[Rule, ...]: 

113 rules: list[Rule] = [] 

114 seen: set[str] = set() 

115 for index, raw in enumerate(raw_rules): 

116 where = f"{source}[{index}]" 

117 if not isinstance(raw, dict): 

118 raise GuardError(f"{where}: expected an object") 

119 rule_id = raw.get("id") 

120 if not isinstance(rule_id, str) or not rule_id.strip(): 

121 raise GuardError(f"{where}: missing non-empty 'id'") 

122 rule_id = rule_id.strip() 

123 if rule_id in seen: 

124 raise GuardError(f"{where}: duplicate rule id {rule_id!r}") 

125 seen.add(rule_id) 

126 kind = raw.get("kind") 

127 if kind == "label": 

128 labels = raw.get("labels") 

129 if not isinstance(labels, list) or not labels: 

130 raise GuardError(f"{where}: label rule needs a non-empty 'labels' list") 

131 clean = tuple(str(label) for label in labels) 

132 rules.append(Rule(id=rule_id, kind="label", labels=clean)) 

133 elif kind == "title-regex": 

134 pattern = raw.get("pattern") 

135 if not isinstance(pattern, str) or not pattern: 

136 raise GuardError(f"{where}: title-regex rule needs a non-empty 'pattern'") 

137 try: 

138 re.compile(pattern) 

139 except re.error as exc: 

140 raise GuardError(f"{where}: invalid regex {pattern!r}: {exc}") from exc 

141 rules.append(Rule(id=rule_id, kind="title-regex", pattern=pattern)) 

142 else: 

143 raise GuardError(f"{where}: unknown rule kind {kind!r}") 

144 return tuple(rules) 

145 

146 

147def evaluate(title: str, labels: tuple[str, ...] | list[str], *, 

148 rules: tuple[Rule, ...]) -> GuardResult: 

149 """Evaluate the issue facts against ``rules`` (pure). 

150 

151 Returns a :class:`GuardResult` carrying the ids of every rule that fired 

152 (in rule order) plus the full set of rule ids considered. Rule ids are 

153 unique by construction (:func:`resolve_rules` rejects duplicates), so each 

154 fired rule appears at most once without an explicit dedup step. 

155 """ 

156 norm_labels = tuple(labels) 

157 matched = tuple(rule.id for rule in rules if rule.matches(title, norm_labels)) 

158 return GuardResult( 

159 title=title, 

160 labels=norm_labels, 

161 matched=matched, 

162 rule_ids=tuple(rule.id for rule in rules), 

163 ) 

164 

165 

166def evaluate_config(title: str, labels: tuple[str, ...] | list[str], *, 

167 config: cfg.ProjectConfig | None) -> GuardResult: 

168 """Convenience: resolve rules from ``config`` then :func:`evaluate`.""" 

169 return evaluate(title, labels, rules=resolve_rules(config))