Coverage for src/keel/guard.py: 100%
75 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Deterministic blocker ruleset — the pure core behind ``keel guard``.
3Blocker promotion is what unlocks the night-window bypass at s10 (``keel
4merge --hotfix``). Before this module, that promotion was pure agent judgment:
5an agent could declare any issue a blocker and merge at 3am. This module makes
6the decision a **deterministic, configurable function** of the issue's facts —
7its title and labels — so a claimed blocker can be verified against the rule it
8allegedly matched.
10The matching is pure (no network/subprocess/clock/random and no I/O): given an
11issue title, its labels, and a resolved set of :class:`Rule` objects, it returns
12the ids of the rules that fired. The CLI gathers the live issue facts and reads
13the configured rules; this module only decides.
15Rules are resolved from ``policy_pack.blocker_rules`` when present, falling back
16to built-in defaults (back-compatible: an absent config yields the defaults).
17Each rule is one of two kinds:
19* ``label`` — fires when one of the rule's labels is present on the issue
20 (case-insensitive exact match).
21* ``title-regex`` — fires when the rule's regex matches the issue title.
23The built-in defaults cover the heuristics named in the audit (GAP-11):
24word-boundary ``\bhotfix\b`` / ``\bsecurity\b`` / ``\bblocker\b`` title regexes
25and a configurable blocker label.
26"""
28from __future__ import annotations
30import re
31from dataclasses import dataclass
32from typing import Any
34from . import config as cfg
36GUARD_SCHEMA_VERSION = "keel.guard.v1"
38#: Built-in defaults, used when ``policy_pack.blocker_rules`` is absent/empty.
39DEFAULT_RULES: tuple[dict[str, Any], ...] = (
40 {"id": "blocker-label", "kind": "label", "labels": ["blocker"]},
41 {"id": "hotfix-label", "kind": "label", "labels": ["hotfix"]},
42 {"id": "security-label", "kind": "label", "labels": ["security"]},
43 {"id": "blocker-title-regex", "kind": "title-regex",
44 "pattern": r"\b(?:hotfix|security|blocker)\b"},
45)
48class GuardError(ValueError):
49 """Raised when a configured blocker rule is malformed."""
52@dataclass(frozen=True)
53class Rule:
54 """A single resolved, immutable blocker rule."""
56 id: str
57 kind: str # "label" | "title-regex"
58 labels: tuple[str, ...] = ()
59 pattern: str | None = None
61 def matches(self, title: str, labels: tuple[str, ...]) -> bool:
62 """True if this rule fires for the given issue facts (pure)."""
63 if self.kind == "label":
64 present = {label.strip().casefold() for label in labels}
65 return any(want.strip().casefold() in present for want in self.labels)
66 # title-regex — ``pattern`` is guaranteed non-empty by :func:`resolve_rules`.
67 return re.search(self.pattern or "", title, re.IGNORECASE) is not None
70@dataclass(frozen=True)
71class GuardResult:
72 """The structured outcome of evaluating an issue against the ruleset."""
74 title: str
75 labels: tuple[str, ...]
76 matched: tuple[str, ...]
77 rule_ids: tuple[str, ...]
79 @property
80 def is_blocker(self) -> bool:
81 """True when at least one rule fired."""
82 return bool(self.matched)
84 def as_dict(self) -> dict[str, Any]:
85 return {
86 "schema_version": GUARD_SCHEMA_VERSION,
87 "title": self.title,
88 "labels": list(self.labels),
89 "is_blocker": self.is_blocker,
90 "matched": list(self.matched),
91 "rule_ids": list(self.rule_ids),
92 }
95def resolve_rules(config: cfg.ProjectConfig | None) -> tuple[Rule, ...]:
96 """Resolve the active blocker rules from config, falling back to defaults.
98 Reads ``policy_pack.blocker_rules`` (a list of rule dicts). When absent or
99 not a list, the built-in :data:`DEFAULT_RULES` are used — keeping projects
100 without any blocker config fully back-compatible. Raises :class:`GuardError`
101 on a malformed configured rule (fail-closed: a typo must not silently widen
102 or narrow the bypass surface).
103 """
104 raw_rules: Any = None
105 if config is not None and isinstance(config.policy_pack, dict):
106 raw_rules = config.policy_pack.get("blocker_rules")
107 if not isinstance(raw_rules, list) or not raw_rules:
108 return _build_rules(DEFAULT_RULES, source="defaults")
109 return _build_rules(raw_rules, source="policy_pack.blocker_rules")
112def _build_rules(raw_rules: Any, *, source: str) -> tuple[Rule, ...]:
113 rules: list[Rule] = []
114 seen: set[str] = set()
115 for index, raw in enumerate(raw_rules):
116 where = f"{source}[{index}]"
117 if not isinstance(raw, dict):
118 raise GuardError(f"{where}: expected an object")
119 rule_id = raw.get("id")
120 if not isinstance(rule_id, str) or not rule_id.strip():
121 raise GuardError(f"{where}: missing non-empty 'id'")
122 rule_id = rule_id.strip()
123 if rule_id in seen:
124 raise GuardError(f"{where}: duplicate rule id {rule_id!r}")
125 seen.add(rule_id)
126 kind = raw.get("kind")
127 if kind == "label":
128 labels = raw.get("labels")
129 if not isinstance(labels, list) or not labels:
130 raise GuardError(f"{where}: label rule needs a non-empty 'labels' list")
131 clean = tuple(str(label) for label in labels)
132 rules.append(Rule(id=rule_id, kind="label", labels=clean))
133 elif kind == "title-regex":
134 pattern = raw.get("pattern")
135 if not isinstance(pattern, str) or not pattern:
136 raise GuardError(f"{where}: title-regex rule needs a non-empty 'pattern'")
137 try:
138 re.compile(pattern)
139 except re.error as exc:
140 raise GuardError(f"{where}: invalid regex {pattern!r}: {exc}") from exc
141 rules.append(Rule(id=rule_id, kind="title-regex", pattern=pattern))
142 else:
143 raise GuardError(f"{where}: unknown rule kind {kind!r}")
144 return tuple(rules)
147def evaluate(title: str, labels: tuple[str, ...] | list[str], *,
148 rules: tuple[Rule, ...]) -> GuardResult:
149 """Evaluate the issue facts against ``rules`` (pure).
151 Returns a :class:`GuardResult` carrying the ids of every rule that fired
152 (in rule order) plus the full set of rule ids considered. Rule ids are
153 unique by construction (:func:`resolve_rules` rejects duplicates), so each
154 fired rule appears at most once without an explicit dedup step.
155 """
156 norm_labels = tuple(labels)
157 matched = tuple(rule.id for rule in rules if rule.matches(title, norm_labels))
158 return GuardResult(
159 title=title,
160 labels=norm_labels,
161 matched=matched,
162 rule_ids=tuple(rule.id for rule in rules),
163 )
166def evaluate_config(title: str, labels: tuple[str, ...] | list[str], *,
167 config: cfg.ProjectConfig | None) -> GuardResult:
168 """Convenience: resolve rules from ``config`` then :func:`evaluate`."""
169 return evaluate(title, labels, rules=resolve_rules(config))