Coverage for src/keel/runtime.py: 100%
101 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Runtime capability detection and requirement evaluation.
3Capabilities describe what the current execution environment can do. They are runtime
4facts, not project policy: whether local tools exist, whether GitHub access is available,
5and whether live mutation classes are possible. The detector is injectable so tests stay
6offline and deterministic.
7"""
9from __future__ import annotations
11import json
12import os
13import shutil
14from collections.abc import Callable, Mapping
15from dataclasses import dataclass
16from pathlib import Path
18from . import capabilities
20KNOWN_CAPABILITIES = capabilities.KNOWN_CAPABILITIES
22@dataclass(frozen=True)
23class Capability:
24 """One detected runtime capability."""
26 name: str
27 available: bool
28 detail: str
29 source: str
31 def as_dict(self) -> dict:
32 return {
33 "name": self.name,
34 "available": self.available,
35 "detail": self.detail,
36 "source": self.source,
37 }
40@dataclass(frozen=True)
41class CapabilityReport:
42 """All capabilities detected for a run."""
44 capabilities: tuple[Capability, ...]
46 def get(self, name: str) -> Capability:
47 for cap in self.capabilities:
48 if cap.name == name:
49 return cap
50 return Capability(name, False, "unknown capability", "unknown")
52 def available(self, name: str) -> bool:
53 return self.get(name).available
55 def as_dict(self) -> dict:
56 return {"capabilities": [cap.as_dict() for cap in self.capabilities]}
58 def to_json(self) -> str:
59 return json.dumps(self.as_dict(), indent=2, sort_keys=True)
61 def render(self) -> str:
62 lines = ["keel capabilities"]
63 for cap in self.capabilities:
64 status = "yes" if cap.available else "no"
65 lines.append(f" {cap.name:<18} {status:<3} {cap.detail}")
66 return "\n".join(lines)
69@dataclass(frozen=True)
70class CapabilityRequirement:
71 """Capabilities needed by a command or extension."""
73 required: tuple[str, ...] = ()
74 optional: tuple[str, ...] = ()
76 def merged(self, other: CapabilityRequirement) -> CapabilityRequirement:
77 return CapabilityRequirement(
78 required=_unique((*self.required, *other.required)),
79 optional=_unique((*self.optional, *other.optional)),
80 )
82 def as_dict(self) -> dict:
83 return {"required": list(self.required), "optional": list(self.optional)}
86@dataclass(frozen=True)
87class CapabilityEvaluation:
88 """A requirement checked against a capability report."""
90 requirement: CapabilityRequirement
91 missing_required: tuple[str, ...]
92 missing_optional: tuple[str, ...]
94 @property
95 def ok(self) -> bool:
96 return not self.missing_required
98 def as_dict(self) -> dict:
99 return {
100 "required": list(self.requirement.required),
101 "optional": list(self.requirement.optional),
102 "missing_required": list(self.missing_required),
103 "missing_optional": list(self.missing_optional),
104 "ok": self.ok,
105 }
107 def render(self) -> str:
108 lines = [
109 "runtime capabilities:",
110 f" required: {', '.join(self.requirement.required) or '-'}",
111 f" optional: {', '.join(self.requirement.optional) or '-'}",
112 ]
113 if self.missing_required:
114 lines.append(f" missing required: {', '.join(self.missing_required)}")
115 if self.missing_optional:
116 lines.append(f" degraded optional: {', '.join(self.missing_optional)}")
117 return "\n".join(lines)
120def detect(
121 root: str | Path = ".",
122 *,
123 env: Mapping[str, str] | None = None,
124 which: Callable[[str], str | None] = shutil.which,
125 run: Callable[..., object] | None = None,
126) -> CapabilityReport:
127 """Detect capabilities for the current runtime.
129 Environment overrides intentionally use generic keel names so projects can surface
130 host-agent capabilities without hardcoding one consumer's tooling into core.
131 """
133 env = os.environ if env is None else env
134 if run is None:
135 from .runner import run_argv
136 run = run_argv
137 root_path = Path(root)
138 sh = which("sh")
139 git = which("git")
140 gh = which("gh")
141 adb = _tool_capability("adb", env_name="KEEL_ADB", env=env, which=which)
142 firebase = _tool_capability("firebase", env_name="KEEL_FIREBASE", env=env, which=which)
143 filesystem_write = _can_write(root_path)
144 gh_auth = False
145 gh_auth_detail = "gh not available"
146 if gh:
147 result = run(["gh", "auth", "status"], cwd=str(root_path), timeout=10)
148 gh_auth = bool(getattr(result, "ok", False))
149 gh_auth_detail = "authenticated" if gh_auth else "gh auth status failed"
151 caps = (
152 Capability("shell", sh is not None, sh or "sh not found", "PATH"),
153 Capability("git", git is not None, git or "git not found", "PATH"),
154 Capability("gh", gh is not None, gh or "gh not found", "PATH"),
155 Capability("gh-auth", gh_auth, gh_auth_detail, "gh auth status"),
156 Capability("github-mcp", _truthy(env.get("KEEL_GITHUB_MCP")),
157 "KEEL_GITHUB_MCP", "environment"),
158 Capability("subagents", _truthy(env.get("KEEL_SUBAGENTS")),
159 "KEEL_SUBAGENTS", "environment"),
160 Capability("parallel-subagents", _truthy(env.get("KEEL_PARALLEL_SUBAGENTS")),
161 "KEEL_PARALLEL_SUBAGENTS", "environment"),
162 Capability("browser", _truthy(env.get("KEEL_BROWSER")),
163 "KEEL_BROWSER", "environment"),
164 adb,
165 firebase,
166 Capability("filesystem-write", filesystem_write,
167 "root writable" if filesystem_write else "root not writable", "filesystem"),
168 Capability("worktree", git is not None and filesystem_write,
169 "requires git and writable root", "derived"),
170 Capability("release-publish", _truthy(env.get("KEEL_RELEASE_PUBLISH")),
171 "KEEL_RELEASE_PUBLISH", "environment"),
172 Capability("secret-access", _truthy(env.get("KEEL_SECRET_ACCESS")),
173 "KEEL_SECRET_ACCESS", "environment"),
174 Capability("production-adjacent", _truthy(env.get("KEEL_PRODUCTION_ADJACENT")),
175 "KEEL_PRODUCTION_ADJACENT", "environment"),
176 Capability("private-setup", _truthy(env.get("KEEL_PRIVATE_SETUP")),
177 "KEEL_PRIVATE_SETUP", "environment"),
178 )
179 return CapabilityReport(caps)
182def evaluate(requirement: CapabilityRequirement, report: CapabilityReport) -> CapabilityEvaluation:
183 """Check required and optional capabilities against a report."""
185 missing_required = tuple(name for name in requirement.required if not report.available(name))
186 missing_optional = tuple(name for name in requirement.optional if not report.available(name))
187 return CapabilityEvaluation(requirement, missing_required, missing_optional)
190def validate_names(names: tuple[str, ...] | list[str], *, source: str) -> list[str]:
191 """Return errors for unknown capability names."""
193 return capabilities.validate_names(names, source=source)
196def _truthy(value: str | None) -> bool:
197 return (value or "").strip().lower() in {"1", "true", "yes", "on"}
200def _tool_capability(
201 name: str,
202 *,
203 env_name: str,
204 env: Mapping[str, str],
205 which: Callable[[str], str | None],
206) -> Capability:
207 if _truthy(env.get(env_name)):
208 return Capability(name, True, env_name, "environment")
209 path = which(name)
210 return Capability(name, path is not None, path or f"{name} not found", "PATH")
213def _can_write(root: Path) -> bool:
214 if not root.exists():
215 return False
216 return os.access(root, os.W_OK)
219def _unique(values: tuple[str, ...]) -> tuple[str, ...]:
220 return tuple(dict.fromkeys(values))