Coverage for src/keel/config.py: 100%
105 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Load + validate a keel ``project.yaml`` into a typed, immutable config.
3Pure and deterministic: parsing the same YAML always yields the same
4``ProjectConfig`` and the same :func:`config_hash`. The only I/O is reading the
5file in :func:`load_config`; everything else operates on plain data so it is
6trivially unit-testable.
7"""
9from __future__ import annotations
11import hashlib
12import json
13from dataclasses import dataclass, field
14from pathlib import Path
15from typing import Any
17import yaml
19from . import jsonschema_min
20from .capabilities import validate_names
21from .model import SLOTS # single source of truth for the named slots (re-exported)
23SCHEMA_PATH = Path(__file__).parent / "schema" / "project.schema.json"
25DEFAULT_EXTENSIONS_DIR = ".keel/extensions"
27__all__ = ["SLOTS", "DEFAULT_EXTENSIONS_DIR", "Automation", "Knobs", "ProjectConfig",
28 "ConfigError", "load_config", "parse_config", "validate_data", "load_schema",
29 "config_hash"]
32class ConfigError(ValueError):
33 """Raised when a project config fails schema validation."""
35 def __init__(self, source: str, errors: list[str]):
36 self.source = source
37 self.errors = list(errors)
38 joined = "\n - ".join(self.errors)
39 super().__init__(f"invalid keel config {source}:\n - {joined}")
42def load_schema() -> dict:
43 """Load the bundled JSON Schema for ``project.yaml``."""
44 return json.loads(SCHEMA_PATH.read_text(encoding="utf-8"))
47def validate_data(data: Any, schema: dict | None = None) -> list[str]:
48 """Return schema-validation errors for raw config data (empty == valid)."""
49 return jsonschema_min.validate(data, schema if schema is not None else load_schema())
52@dataclass(frozen=True)
53class Knobs:
54 """Per-project values consumed by the (otherwise neutral) backbone steps."""
56 build_gate_cmd: str
57 lint_cmd: str | None = None
58 implementer_agents: dict[str, str] = field(default_factory=dict)
59 tier3_globs: tuple[str, ...] = ()
60 ci_workflows: dict[str, str] = field(default_factory=dict)
61 docs_gate_paths: tuple[str, ...] = ()
62 docs_only_allowlist: tuple[str, ...] = ()
63 sot_doc: str | None = None
64 required_capabilities: tuple[str, ...] = ()
65 optional_capabilities: tuple[str, ...] = ()
66 evidence_gate_label: str = "keel:ship"
67 evidence_require_distinct_vendors: bool = False
70@dataclass(frozen=True)
71class Automation:
72 """Trusted unattended-run consent defaults."""
74 approved_scopes: tuple[str, ...] = ()
75 operator: str | None = None
78@dataclass(frozen=True)
79class ProjectConfig:
80 """A resolved, immutable keel project config."""
82 extends: str
83 core_version: str
84 base_branch: str
85 knobs: Knobs
86 owner: str | None = None
87 repo: str | None = None
88 platform: str | None = None
89 timezone: str | None = None
90 merge_window: str | None = None
91 merge_window_mode: str = "freeze"
92 consent_mode: str = "explicit"
93 gates: tuple[str, ...] = ()
94 extensions: dict[str, tuple[str, ...]] = field(default_factory=dict)
95 extensions_dir: str = DEFAULT_EXTENSIONS_DIR
96 policy_pack: dict[str, Any] = field(default_factory=dict)
97 automation: Automation = field(default_factory=Automation)
99 def slot(self, name: str) -> tuple[str, ...]:
100 """Extension files registered for a named slot (``()`` if none)."""
101 if name not in SLOTS:
102 raise KeyError(f"unknown slot {name!r}; valid slots: {', '.join(SLOTS)}")
103 return self.extensions.get(name, ())
106def _build(data: dict) -> ProjectConfig:
107 k = data["knobs"]
108 knobs = Knobs(
109 build_gate_cmd=k["build_gate_cmd"],
110 lint_cmd=k.get("lint_cmd"),
111 implementer_agents=dict(k.get("implementer_agents", {})),
112 tier3_globs=tuple(k.get("tier3_globs", [])),
113 ci_workflows=dict(k.get("ci_workflows", {})),
114 docs_gate_paths=tuple(k.get("docs_gate_paths", [])),
115 docs_only_allowlist=tuple(k.get("docs_only_allowlist", [])),
116 sot_doc=k.get("sot_doc"),
117 required_capabilities=tuple(k.get("required_capabilities", [])),
118 optional_capabilities=tuple(k.get("optional_capabilities", [])),
119 evidence_gate_label=k.get("evidence_gate_label", "keel:ship"),
120 evidence_require_distinct_vendors=bool(k.get("evidence_require_distinct_vendors", False)),
121 )
122 extensions = {slot: tuple(files) for slot, files in data.get("extensions", {}).items()}
123 automation_data = data.get("automation", {})
124 automation_scopes = tuple(dict.fromkeys(automation_data.get("approved_scopes", [])))
125 return ProjectConfig(
126 extends=data["extends"],
127 core_version=data["core_version"],
128 base_branch=data["base_branch"],
129 knobs=knobs,
130 owner=data.get("owner"),
131 repo=data.get("repo"),
132 platform=data.get("platform"),
133 timezone=data.get("timezone"),
134 merge_window=data.get("merge_window"),
135 merge_window_mode=data.get("merge_window_mode", "freeze"),
136 consent_mode=data.get("consent_mode", "explicit"),
137 gates=tuple(data.get("gates", [])),
138 extensions=extensions,
139 extensions_dir=data.get("extensions_dir", DEFAULT_EXTENSIONS_DIR),
140 policy_pack=json.loads(json.dumps(data.get("policy_pack", {}), sort_keys=True)),
141 automation=Automation(
142 approved_scopes=tuple(sorted(automation_scopes)),
143 operator=automation_data.get("operator"),
144 ),
145 )
148def parse_config(data: Any, *, source: str = "<dict>", schema: dict | None = None) -> ProjectConfig:
149 """Validate raw data and build a :class:`ProjectConfig` (raises on error)."""
150 if not isinstance(data, dict):
151 raise ConfigError(source, [f"$: expected an object (got {type(data).__name__})"])
152 errors = validate_data(data, schema)
153 if isinstance(data, dict) and isinstance(data.get("knobs"), dict):
154 knobs = data["knobs"]
155 errors.extend(validate_names(
156 tuple(knobs.get("required_capabilities", [])),
157 source=f"{source}: knobs.required_capabilities",
158 ))
159 errors.extend(validate_names(
160 tuple(knobs.get("optional_capabilities", [])),
161 source=f"{source}: knobs.optional_capabilities",
162 ))
163 if isinstance(data, dict) and isinstance(data.get("policy_pack"), dict):
164 for path, names in _policy_capability_fields(data["policy_pack"]):
165 errors.extend(validate_names(tuple(names), source=f"{source}: {path}"))
166 if errors:
167 raise ConfigError(source, errors)
168 return _build(data)
171def load_config(path: str | Path) -> ProjectConfig:
172 """Read + validate a ``project.yaml`` from disk."""
173 path = Path(path)
174 data = yaml.safe_load(path.read_text(encoding="utf-8"))
175 return parse_config(data, source=str(path))
178def config_hash(config: ProjectConfig) -> str:
179 """Stable SHA-256 over the canonicalised config (cache key / determinism)."""
180 payload = json.dumps(_canonical(config), sort_keys=True, separators=(",", ":"))
181 return hashlib.sha256(payload.encode("utf-8")).hexdigest()
184def _policy_capability_fields(value: Any, path: str = "policy_pack") -> list[tuple[str, list]]:
185 fields: list[tuple[str, list]] = []
186 if isinstance(value, dict):
187 for key, child in value.items():
188 child_path = f"{path}.{key}"
189 if (
190 key in {"required_capabilities", "optional_capabilities"}
191 and isinstance(child, list)
192 ):
193 fields.append((child_path, child))
194 else:
195 fields.extend(_policy_capability_fields(child, child_path))
196 elif isinstance(value, list):
197 for i, child in enumerate(value):
198 fields.extend(_policy_capability_fields(child, f"{path}[{i}]"))
199 return fields
202def _canonical(config: ProjectConfig) -> dict:
203 return {
204 "extends": config.extends,
205 "core_version": config.core_version,
206 "base_branch": config.base_branch,
207 "owner": config.owner,
208 "repo": config.repo,
209 "platform": config.platform,
210 "timezone": config.timezone,
211 "merge_window": config.merge_window,
212 "merge_window_mode": config.merge_window_mode,
213 "consent_mode": config.consent_mode,
214 "gates": list(config.gates),
215 "extensions_dir": config.extensions_dir,
216 "extensions": {k: list(v) for k, v in sorted(config.extensions.items())},
217 "policy_pack": config.policy_pack,
218 "automation": {
219 "approved_scopes": list(config.automation.approved_scopes),
220 "operator": config.automation.operator,
221 },
222 "knobs": {
223 "build_gate_cmd": config.knobs.build_gate_cmd,
224 "lint_cmd": config.knobs.lint_cmd,
225 "implementer_agents": dict(sorted(config.knobs.implementer_agents.items())),
226 "tier3_globs": list(config.knobs.tier3_globs),
227 "ci_workflows": dict(sorted(config.knobs.ci_workflows.items())),
228 "docs_gate_paths": list(config.knobs.docs_gate_paths),
229 "docs_only_allowlist": list(config.knobs.docs_only_allowlist),
230 "sot_doc": config.knobs.sot_doc,
231 "required_capabilities": list(config.knobs.required_capabilities),
232 "optional_capabilities": list(config.knobs.optional_capabilities),
233 "evidence_gate_label": config.knobs.evidence_gate_label,
234 "evidence_require_distinct_vendors": config.knobs.evidence_require_distinct_vendors,
235 },
236 }