Coverage for src/keel/config.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Load + validate a keel ``project.yaml`` into a typed, immutable config. 

2 

3Pure and deterministic: parsing the same YAML always yields the same 

4``ProjectConfig`` and the same :func:`config_hash`. The only I/O is reading the 

5file in :func:`load_config`; everything else operates on plain data so it is 

6trivially unit-testable. 

7""" 

8 

9from __future__ import annotations 

10 

11import hashlib 

12import json 

13from dataclasses import dataclass, field 

14from pathlib import Path 

15from typing import Any 

16 

17import yaml 

18 

19from . import jsonschema_min 

20from .capabilities import validate_names 

21from .model import SLOTS # single source of truth for the named slots (re-exported) 

22 

23SCHEMA_PATH = Path(__file__).parent / "schema" / "project.schema.json" 

24 

25DEFAULT_EXTENSIONS_DIR = ".keel/extensions" 

26 

27__all__ = ["SLOTS", "DEFAULT_EXTENSIONS_DIR", "Automation", "Knobs", "ProjectConfig", 

28 "ConfigError", "load_config", "parse_config", "validate_data", "load_schema", 

29 "config_hash"] 

30 

31 

32class ConfigError(ValueError): 

33 """Raised when a project config fails schema validation.""" 

34 

35 def __init__(self, source: str, errors: list[str]): 

36 self.source = source 

37 self.errors = list(errors) 

38 joined = "\n - ".join(self.errors) 

39 super().__init__(f"invalid keel config {source}:\n - {joined}") 

40 

41 

42def load_schema() -> dict: 

43 """Load the bundled JSON Schema for ``project.yaml``.""" 

44 return json.loads(SCHEMA_PATH.read_text(encoding="utf-8")) 

45 

46 

47def validate_data(data: Any, schema: dict | None = None) -> list[str]: 

48 """Return schema-validation errors for raw config data (empty == valid).""" 

49 return jsonschema_min.validate(data, schema if schema is not None else load_schema()) 

50 

51 

52@dataclass(frozen=True) 

53class Knobs: 

54 """Per-project values consumed by the (otherwise neutral) backbone steps.""" 

55 

56 build_gate_cmd: str 

57 lint_cmd: str | None = None 

58 implementer_agents: dict[str, str] = field(default_factory=dict) 

59 tier3_globs: tuple[str, ...] = () 

60 ci_workflows: dict[str, str] = field(default_factory=dict) 

61 docs_gate_paths: tuple[str, ...] = () 

62 docs_only_allowlist: tuple[str, ...] = () 

63 sot_doc: str | None = None 

64 required_capabilities: tuple[str, ...] = () 

65 optional_capabilities: tuple[str, ...] = () 

66 evidence_gate_label: str = "keel:ship" 

67 evidence_require_distinct_vendors: bool = False 

68 

69 

70@dataclass(frozen=True) 

71class Automation: 

72 """Trusted unattended-run consent defaults.""" 

73 

74 approved_scopes: tuple[str, ...] = () 

75 operator: str | None = None 

76 

77 

78@dataclass(frozen=True) 

79class ProjectConfig: 

80 """A resolved, immutable keel project config.""" 

81 

82 extends: str 

83 core_version: str 

84 base_branch: str 

85 knobs: Knobs 

86 owner: str | None = None 

87 repo: str | None = None 

88 platform: str | None = None 

89 timezone: str | None = None 

90 merge_window: str | None = None 

91 merge_window_mode: str = "freeze" 

92 consent_mode: str = "explicit" 

93 gates: tuple[str, ...] = () 

94 extensions: dict[str, tuple[str, ...]] = field(default_factory=dict) 

95 extensions_dir: str = DEFAULT_EXTENSIONS_DIR 

96 policy_pack: dict[str, Any] = field(default_factory=dict) 

97 automation: Automation = field(default_factory=Automation) 

98 

99 def slot(self, name: str) -> tuple[str, ...]: 

100 """Extension files registered for a named slot (``()`` if none).""" 

101 if name not in SLOTS: 

102 raise KeyError(f"unknown slot {name!r}; valid slots: {', '.join(SLOTS)}") 

103 return self.extensions.get(name, ()) 

104 

105 

106def _build(data: dict) -> ProjectConfig: 

107 k = data["knobs"] 

108 knobs = Knobs( 

109 build_gate_cmd=k["build_gate_cmd"], 

110 lint_cmd=k.get("lint_cmd"), 

111 implementer_agents=dict(k.get("implementer_agents", {})), 

112 tier3_globs=tuple(k.get("tier3_globs", [])), 

113 ci_workflows=dict(k.get("ci_workflows", {})), 

114 docs_gate_paths=tuple(k.get("docs_gate_paths", [])), 

115 docs_only_allowlist=tuple(k.get("docs_only_allowlist", [])), 

116 sot_doc=k.get("sot_doc"), 

117 required_capabilities=tuple(k.get("required_capabilities", [])), 

118 optional_capabilities=tuple(k.get("optional_capabilities", [])), 

119 evidence_gate_label=k.get("evidence_gate_label", "keel:ship"), 

120 evidence_require_distinct_vendors=bool(k.get("evidence_require_distinct_vendors", False)), 

121 ) 

122 extensions = {slot: tuple(files) for slot, files in data.get("extensions", {}).items()} 

123 automation_data = data.get("automation", {}) 

124 automation_scopes = tuple(dict.fromkeys(automation_data.get("approved_scopes", []))) 

125 return ProjectConfig( 

126 extends=data["extends"], 

127 core_version=data["core_version"], 

128 base_branch=data["base_branch"], 

129 knobs=knobs, 

130 owner=data.get("owner"), 

131 repo=data.get("repo"), 

132 platform=data.get("platform"), 

133 timezone=data.get("timezone"), 

134 merge_window=data.get("merge_window"), 

135 merge_window_mode=data.get("merge_window_mode", "freeze"), 

136 consent_mode=data.get("consent_mode", "explicit"), 

137 gates=tuple(data.get("gates", [])), 

138 extensions=extensions, 

139 extensions_dir=data.get("extensions_dir", DEFAULT_EXTENSIONS_DIR), 

140 policy_pack=json.loads(json.dumps(data.get("policy_pack", {}), sort_keys=True)), 

141 automation=Automation( 

142 approved_scopes=tuple(sorted(automation_scopes)), 

143 operator=automation_data.get("operator"), 

144 ), 

145 ) 

146 

147 

148def parse_config(data: Any, *, source: str = "<dict>", schema: dict | None = None) -> ProjectConfig: 

149 """Validate raw data and build a :class:`ProjectConfig` (raises on error).""" 

150 if not isinstance(data, dict): 

151 raise ConfigError(source, [f"$: expected an object (got {type(data).__name__})"]) 

152 errors = validate_data(data, schema) 

153 if isinstance(data, dict) and isinstance(data.get("knobs"), dict): 

154 knobs = data["knobs"] 

155 errors.extend(validate_names( 

156 tuple(knobs.get("required_capabilities", [])), 

157 source=f"{source}: knobs.required_capabilities", 

158 )) 

159 errors.extend(validate_names( 

160 tuple(knobs.get("optional_capabilities", [])), 

161 source=f"{source}: knobs.optional_capabilities", 

162 )) 

163 if isinstance(data, dict) and isinstance(data.get("policy_pack"), dict): 

164 for path, names in _policy_capability_fields(data["policy_pack"]): 

165 errors.extend(validate_names(tuple(names), source=f"{source}: {path}")) 

166 if errors: 

167 raise ConfigError(source, errors) 

168 return _build(data) 

169 

170 

171def load_config(path: str | Path) -> ProjectConfig: 

172 """Read + validate a ``project.yaml`` from disk.""" 

173 path = Path(path) 

174 data = yaml.safe_load(path.read_text(encoding="utf-8")) 

175 return parse_config(data, source=str(path)) 

176 

177 

178def config_hash(config: ProjectConfig) -> str: 

179 """Stable SHA-256 over the canonicalised config (cache key / determinism).""" 

180 payload = json.dumps(_canonical(config), sort_keys=True, separators=(",", ":")) 

181 return hashlib.sha256(payload.encode("utf-8")).hexdigest() 

182 

183 

184def _policy_capability_fields(value: Any, path: str = "policy_pack") -> list[tuple[str, list]]: 

185 fields: list[tuple[str, list]] = [] 

186 if isinstance(value, dict): 

187 for key, child in value.items(): 

188 child_path = f"{path}.{key}" 

189 if ( 

190 key in {"required_capabilities", "optional_capabilities"} 

191 and isinstance(child, list) 

192 ): 

193 fields.append((child_path, child)) 

194 else: 

195 fields.extend(_policy_capability_fields(child, child_path)) 

196 elif isinstance(value, list): 

197 for i, child in enumerate(value): 

198 fields.extend(_policy_capability_fields(child, f"{path}[{i}]")) 

199 return fields 

200 

201 

202def _canonical(config: ProjectConfig) -> dict: 

203 return { 

204 "extends": config.extends, 

205 "core_version": config.core_version, 

206 "base_branch": config.base_branch, 

207 "owner": config.owner, 

208 "repo": config.repo, 

209 "platform": config.platform, 

210 "timezone": config.timezone, 

211 "merge_window": config.merge_window, 

212 "merge_window_mode": config.merge_window_mode, 

213 "consent_mode": config.consent_mode, 

214 "gates": list(config.gates), 

215 "extensions_dir": config.extensions_dir, 

216 "extensions": {k: list(v) for k, v in sorted(config.extensions.items())}, 

217 "policy_pack": config.policy_pack, 

218 "automation": { 

219 "approved_scopes": list(config.automation.approved_scopes), 

220 "operator": config.automation.operator, 

221 }, 

222 "knobs": { 

223 "build_gate_cmd": config.knobs.build_gate_cmd, 

224 "lint_cmd": config.knobs.lint_cmd, 

225 "implementer_agents": dict(sorted(config.knobs.implementer_agents.items())), 

226 "tier3_globs": list(config.knobs.tier3_globs), 

227 "ci_workflows": dict(sorted(config.knobs.ci_workflows.items())), 

228 "docs_gate_paths": list(config.knobs.docs_gate_paths), 

229 "docs_only_allowlist": list(config.knobs.docs_only_allowlist), 

230 "sot_doc": config.knobs.sot_doc, 

231 "required_capabilities": list(config.knobs.required_capabilities), 

232 "optional_capabilities": list(config.knobs.optional_capabilities), 

233 "evidence_gate_label": config.knobs.evidence_gate_label, 

234 "evidence_require_distinct_vendors": config.knobs.evidence_require_distinct_vendors, 

235 }, 

236 }