Coverage for src/keel/runtime.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Runtime capability detection and requirement evaluation. 

2 

3Capabilities describe what the current execution environment can do. They are runtime 

4facts, not project policy: whether local tools exist, whether GitHub access is available, 

5and whether live mutation classes are possible. The detector is injectable so tests stay 

6offline and deterministic. 

7""" 

8 

9from __future__ import annotations 

10 

11import json 

12import os 

13import shutil 

14from collections.abc import Callable, Mapping 

15from dataclasses import dataclass 

16from pathlib import Path 

17 

18from . import capabilities 

19 

20KNOWN_CAPABILITIES = capabilities.KNOWN_CAPABILITIES 

21 

22@dataclass(frozen=True) 

23class Capability: 

24 """One detected runtime capability.""" 

25 

26 name: str 

27 available: bool 

28 detail: str 

29 source: str 

30 

31 def as_dict(self) -> dict: 

32 return { 

33 "name": self.name, 

34 "available": self.available, 

35 "detail": self.detail, 

36 "source": self.source, 

37 } 

38 

39 

40@dataclass(frozen=True) 

41class CapabilityReport: 

42 """All capabilities detected for a run.""" 

43 

44 capabilities: tuple[Capability, ...] 

45 

46 def get(self, name: str) -> Capability: 

47 for cap in self.capabilities: 

48 if cap.name == name: 

49 return cap 

50 return Capability(name, False, "unknown capability", "unknown") 

51 

52 def available(self, name: str) -> bool: 

53 return self.get(name).available 

54 

55 def as_dict(self) -> dict: 

56 return {"capabilities": [cap.as_dict() for cap in self.capabilities]} 

57 

58 def to_json(self) -> str: 

59 return json.dumps(self.as_dict(), indent=2, sort_keys=True) 

60 

61 def render(self) -> str: 

62 lines = ["keel capabilities"] 

63 for cap in self.capabilities: 

64 status = "yes" if cap.available else "no" 

65 lines.append(f" {cap.name:<18} {status:<3} {cap.detail}") 

66 return "\n".join(lines) 

67 

68 

69@dataclass(frozen=True) 

70class CapabilityRequirement: 

71 """Capabilities needed by a command or extension.""" 

72 

73 required: tuple[str, ...] = () 

74 optional: tuple[str, ...] = () 

75 

76 def merged(self, other: CapabilityRequirement) -> CapabilityRequirement: 

77 return CapabilityRequirement( 

78 required=_unique((*self.required, *other.required)), 

79 optional=_unique((*self.optional, *other.optional)), 

80 ) 

81 

82 def as_dict(self) -> dict: 

83 return {"required": list(self.required), "optional": list(self.optional)} 

84 

85 

86@dataclass(frozen=True) 

87class CapabilityEvaluation: 

88 """A requirement checked against a capability report.""" 

89 

90 requirement: CapabilityRequirement 

91 missing_required: tuple[str, ...] 

92 missing_optional: tuple[str, ...] 

93 

94 @property 

95 def ok(self) -> bool: 

96 return not self.missing_required 

97 

98 def as_dict(self) -> dict: 

99 return { 

100 "required": list(self.requirement.required), 

101 "optional": list(self.requirement.optional), 

102 "missing_required": list(self.missing_required), 

103 "missing_optional": list(self.missing_optional), 

104 "ok": self.ok, 

105 } 

106 

107 def render(self) -> str: 

108 lines = [ 

109 "runtime capabilities:", 

110 f" required: {', '.join(self.requirement.required) or '-'}", 

111 f" optional: {', '.join(self.requirement.optional) or '-'}", 

112 ] 

113 if self.missing_required: 

114 lines.append(f" missing required: {', '.join(self.missing_required)}") 

115 if self.missing_optional: 

116 lines.append(f" degraded optional: {', '.join(self.missing_optional)}") 

117 return "\n".join(lines) 

118 

119 

120def detect( 

121 root: str | Path = ".", 

122 *, 

123 env: Mapping[str, str] | None = None, 

124 which: Callable[[str], str | None] = shutil.which, 

125 run: Callable[..., object] | None = None, 

126) -> CapabilityReport: 

127 """Detect capabilities for the current runtime. 

128 

129 Environment overrides intentionally use generic keel names so projects can surface 

130 host-agent capabilities without hardcoding one consumer's tooling into core. 

131 """ 

132 

133 env = os.environ if env is None else env 

134 if run is None: 

135 from .runner import run_argv 

136 run = run_argv 

137 root_path = Path(root) 

138 sh = which("sh") 

139 git = which("git") 

140 gh = which("gh") 

141 adb = _tool_capability("adb", env_name="KEEL_ADB", env=env, which=which) 

142 firebase = _tool_capability("firebase", env_name="KEEL_FIREBASE", env=env, which=which) 

143 filesystem_write = _can_write(root_path) 

144 gh_auth = False 

145 gh_auth_detail = "gh not available" 

146 if gh: 

147 result = run(["gh", "auth", "status"], cwd=str(root_path), timeout=10) 

148 gh_auth = bool(getattr(result, "ok", False)) 

149 gh_auth_detail = "authenticated" if gh_auth else "gh auth status failed" 

150 

151 caps = ( 

152 Capability("shell", sh is not None, sh or "sh not found", "PATH"), 

153 Capability("git", git is not None, git or "git not found", "PATH"), 

154 Capability("gh", gh is not None, gh or "gh not found", "PATH"), 

155 Capability("gh-auth", gh_auth, gh_auth_detail, "gh auth status"), 

156 Capability("github-mcp", _truthy(env.get("KEEL_GITHUB_MCP")), 

157 "KEEL_GITHUB_MCP", "environment"), 

158 Capability("subagents", _truthy(env.get("KEEL_SUBAGENTS")), 

159 "KEEL_SUBAGENTS", "environment"), 

160 Capability("parallel-subagents", _truthy(env.get("KEEL_PARALLEL_SUBAGENTS")), 

161 "KEEL_PARALLEL_SUBAGENTS", "environment"), 

162 Capability("browser", _truthy(env.get("KEEL_BROWSER")), 

163 "KEEL_BROWSER", "environment"), 

164 adb, 

165 firebase, 

166 Capability("filesystem-write", filesystem_write, 

167 "root writable" if filesystem_write else "root not writable", "filesystem"), 

168 Capability("worktree", git is not None and filesystem_write, 

169 "requires git and writable root", "derived"), 

170 Capability("release-publish", _truthy(env.get("KEEL_RELEASE_PUBLISH")), 

171 "KEEL_RELEASE_PUBLISH", "environment"), 

172 Capability("secret-access", _truthy(env.get("KEEL_SECRET_ACCESS")), 

173 "KEEL_SECRET_ACCESS", "environment"), 

174 Capability("production-adjacent", _truthy(env.get("KEEL_PRODUCTION_ADJACENT")), 

175 "KEEL_PRODUCTION_ADJACENT", "environment"), 

176 Capability("private-setup", _truthy(env.get("KEEL_PRIVATE_SETUP")), 

177 "KEEL_PRIVATE_SETUP", "environment"), 

178 ) 

179 return CapabilityReport(caps) 

180 

181 

182def evaluate(requirement: CapabilityRequirement, report: CapabilityReport) -> CapabilityEvaluation: 

183 """Check required and optional capabilities against a report.""" 

184 

185 missing_required = tuple(name for name in requirement.required if not report.available(name)) 

186 missing_optional = tuple(name for name in requirement.optional if not report.available(name)) 

187 return CapabilityEvaluation(requirement, missing_required, missing_optional) 

188 

189 

190def validate_names(names: tuple[str, ...] | list[str], *, source: str) -> list[str]: 

191 """Return errors for unknown capability names.""" 

192 

193 return capabilities.validate_names(names, source=source) 

194 

195 

196def _truthy(value: str | None) -> bool: 

197 return (value or "").strip().lower() in {"1", "true", "yes", "on"} 

198 

199 

200def _tool_capability( 

201 name: str, 

202 *, 

203 env_name: str, 

204 env: Mapping[str, str], 

205 which: Callable[[str], str | None], 

206) -> Capability: 

207 if _truthy(env.get(env_name)): 

208 return Capability(name, True, env_name, "environment") 

209 path = which(name) 

210 return Capability(name, path is not None, path or f"{name} not found", "PATH") 

211 

212 

213def _can_write(root: Path) -> bool: 

214 if not root.exists(): 

215 return False 

216 return os.access(root, os.W_OK) 

217 

218 

219def _unique(values: tuple[str, ...]) -> tuple[str, ...]: 

220 return tuple(dict.fromkeys(values))