Coverage for src/keel/doctor.py: 100%

110 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Pure diagnostics for ``keel doctor`` — a read-only health pass. 

2 

3This module is **pure**: no network, no wall-clock, no randomness. The caller 

4(``keel.cli``) performs all I/O — fetching the latest version from PyPI, reading 

5adapter markers off disk, loading config, probing state-path existence — and 

6passes the already-gathered facts into :func:`run_doctor`, which classifies each 

7check as ``ok`` / ``warn`` / ``fail`` and returns a structured, JSON-stable 

8result. Every branch here is deterministic and unit-tested. 

9 

10Checks 

11------ 

12``cli_version`` installed ``keel.__version__`` vs latest on PyPI (the 

13 headline check — a silent downgrade is a ``fail``). 

14``adapter_version`` ``keel_version=`` markers on installed adapter surfaces vs 

15 the running CLI version. 

16``orphan_adapters`` surfaces whose ``command=`` is no longer in the installed 

17 keel (stale-marker orphans). 

18``core_version`` ``core_version`` constraint from project.yaml vs the 

19 installed CLI version. 

20``state_paths`` existence/validity of the configured ledger + checkpoint 

21 paths (advisory; missing == empty history, not a defect). 

22""" 

23 

24from __future__ import annotations 

25 

26import re 

27from dataclasses import dataclass, field 

28 

29SCHEMA_VERSION = "keel.doctor.v1" 

30 

31#: per-check status levels, ordered worst-last for summary roll-up. 

32_OK = "ok" 

33_WARN = "warn" 

34_FAIL = "fail" 

35_RANK = {_OK: 0, _WARN: 1, _FAIL: 2} 

36 

37#: a release version: ``MAJOR.MINOR.PATCH`` with optional further dotted parts. 

38_VERSION_RE = re.compile(r"^\d+(?:\.\d+)*$") 

39#: a ``core_version`` constraint: an optional operator (``^`` / ``~`` / ``>=`` / 

40#: ``==``) followed by a dotted version. A bare version means exact match. 

41_CONSTRAINT_RE = re.compile(r"^(?P<op>\^|~|>=|<=|>|<|==|=)?\s*(?P<version>\d+(?:\.\d+)*)$") 

42 

43 

44@dataclass(frozen=True) 

45class CheckResult: 

46 """One diagnostic check outcome (JSON-stable via :meth:`as_dict`).""" 

47 

48 name: str 

49 status: str 

50 summary: str 

51 detail: dict[str, object] = field(default_factory=dict) 

52 

53 def as_dict(self) -> dict[str, object]: 

54 return { 

55 "name": self.name, 

56 "status": self.status, 

57 "summary": self.summary, 

58 "detail": dict(self.detail), 

59 } 

60 

61 

62def _parse_version(text: str) -> tuple[int, ...] | None: 

63 """Parse a dotted release version into a comparable tuple (``None`` if unparseable).""" 

64 if not isinstance(text, str) or not _VERSION_RE.match(text.strip()): 

65 return None 

66 return tuple(int(part) for part in text.strip().split(".")) 

67 

68 

69def _pad(a: tuple[int, ...], b: tuple[int, ...]) -> tuple[tuple[int, ...], tuple[int, ...]]: 

70 """Right-pad the shorter tuple with zeros so the two compare component-wise.""" 

71 width = max(len(a), len(b)) 

72 return a + (0,) * (width - len(a)), b + (0,) * (width - len(b)) 

73 

74 

75def constraint_satisfied(installed: str, constraint: str) -> bool | None: 

76 """Does ``installed`` satisfy the ``core_version`` ``constraint``? 

77 

78 Supports ``^`` (caret: same leading non-zero, ``>=``), ``~`` (tilde: same 

79 major+minor, ``>=``), the comparison operators (``>=``, ``<=``, ``>``, 

80 ``<``, ``==``/``=``), and a bare version (exact match). Returns ``None`` when 

81 either side is unparseable so the caller can report ``unknown`` rather than 

82 guess. Pure and deterministic. 

83 """ 

84 inst = _parse_version(installed) 

85 match = _CONSTRAINT_RE.match(constraint.strip()) if isinstance(constraint, str) else None 

86 if inst is None or match is None: 

87 return None 

88 want = _parse_version(match.group("version")) 

89 if want is None: # pragma: no cover - regex already guarantees a parseable version 

90 return None 

91 op = match.group("op") or "==" 

92 pi, pw = _pad(inst, want) 

93 if op == "^": 

94 # caret: pin the most-significant non-zero component, then ``>=``. 

95 lead = next((i for i, part in enumerate(want) if part != 0), len(want) - 1) 

96 return pi[lead] == pw[lead] and pi[:lead] == pw[:lead] and pi >= pw 

97 if op == "~": 

98 # tilde: pin major+minor (or major when no minor given), then ``>=``. 

99 pin = min(2, len(want)) 

100 return pi[:pin] == pw[:pin] and pi >= pw 

101 if op in (">=",): 

102 return pi >= pw 

103 if op in (">",): 

104 return pi > pw 

105 if op in ("<=",): 

106 return pi <= pw 

107 if op in ("<",): 

108 return pi < pw 

109 return pi == pw # ``==`` / ``=`` / bare 

110 

111 

112def _check_cli_version(installed: str, latest: str | None) -> CheckResult: 

113 """Installed CLI vs latest on PyPI. Offline => ``warn`` (unknown); stale => ``fail``.""" 

114 if latest is None: 

115 return CheckResult( 

116 "cli_version", _WARN, 

117 f"installed {installed}; latest unknown (offline or PyPI unreachable)", 

118 {"installed": installed, "latest": "unknown"}, 

119 ) 

120 inst, lat = _parse_version(installed), _parse_version(latest) 

121 detail = {"installed": installed, "latest": latest} 

122 if inst is None or lat is None: 

123 return CheckResult( 

124 "cli_version", _WARN, 

125 f"installed {installed}; latest {latest}; cannot compare versions", 

126 detail, 

127 ) 

128 pi, pl = _pad(inst, lat) 

129 if pi < pl: 

130 return CheckResult( 

131 "cli_version", _FAIL, 

132 f"installed {installed} is behind latest {latest} — upgrade keel-workflow", 

133 detail, 

134 ) 

135 if pi > pl: 

136 return CheckResult( 

137 "cli_version", _WARN, 

138 f"installed {installed} is ahead of latest {latest} (pre-release or unpublished)", 

139 detail, 

140 ) 

141 return CheckResult( 

142 "cli_version", _OK, f"installed {installed} is up to date", detail, 

143 ) 

144 

145 

146def _check_adapter_version(installed: str, markers: list[dict[str, object]]) -> CheckResult: 

147 """Installed adapter ``keel_version`` markers vs the running CLI version.""" 

148 if not markers: 

149 return CheckResult( 

150 "adapter_version", _WARN, 

151 "no keel-generated adapter surfaces found under --root", 

152 {"installed": installed, "surfaces": 0, "drift": []}, 

153 ) 

154 drift = [] 

155 for marker in markers: 

156 marker_version = marker.get("keel_version") 

157 if marker_version != installed: 

158 drift.append({ 

159 "surface": marker.get("surface", ""), 

160 "name": marker.get("name", ""), 

161 "keel_version": marker_version, 

162 }) 

163 if drift: 

164 return CheckResult( 

165 "adapter_version", _WARN, 

166 f"{len(drift)} of {len(markers)} adapter surface(s) drift from CLI {installed} " 

167 "— run keel update-adapter", 

168 {"installed": installed, "surfaces": len(markers), "drift": drift}, 

169 ) 

170 return CheckResult( 

171 "adapter_version", _OK, 

172 f"all {len(markers)} adapter surface(s) match CLI {installed}", 

173 {"installed": installed, "surfaces": len(markers), "drift": []}, 

174 ) 

175 

176 

177def _check_orphan_adapters(orphans: list[dict[str, object]]) -> CheckResult: 

178 """Surfaces whose command is no longer in the installed keel (stale-marker orphans).""" 

179 if not orphans: 

180 return CheckResult( 

181 "orphan_adapters", _OK, "no orphan adapter surfaces", {"orphans": []}, 

182 ) 

183 return CheckResult( 

184 "orphan_adapters", _WARN, 

185 f"{len(orphans)} orphan adapter surface(s) — command(s) no longer in installed keel", 

186 {"orphans": list(orphans)}, 

187 ) 

188 

189 

190def _check_core_version(installed: str, core_version: str | None) -> CheckResult: 

191 """``core_version`` constraint from project.yaml vs the installed CLI version.""" 

192 if core_version is None: 

193 return CheckResult( 

194 "core_version", _OK, "no project config given — core_version check skipped", 

195 {"installed": installed, "core_version": None}, 

196 ) 

197 detail = {"installed": installed, "core_version": core_version} 

198 satisfied = constraint_satisfied(installed, core_version) 

199 if satisfied is None: 

200 return CheckResult( 

201 "core_version", _WARN, 

202 f"cannot evaluate core_version {core_version!r} against installed {installed}", 

203 detail, 

204 ) 

205 if not satisfied: 

206 return CheckResult( 

207 "core_version", _FAIL, 

208 f"installed {installed} does not satisfy core_version {core_version!r}", 

209 detail, 

210 ) 

211 return CheckResult( 

212 "core_version", _OK, 

213 f"installed {installed} satisfies core_version {core_version!r}", 

214 detail, 

215 ) 

216 

217 

218def _check_state_paths(state_paths: list[dict[str, object]]) -> CheckResult: 

219 """Advisory check on configured ledger/checkpoint paths — missing == empty history.""" 

220 if not state_paths: 

221 return CheckResult( 

222 "state_paths", _OK, "no state paths configured", {"paths": []}, 

223 ) 

224 for entry in state_paths: 

225 if entry.get("status") == "invalid": 

226 present = sum(1 for e in state_paths if e.get("status") == "present") 

227 return CheckResult( 

228 "state_paths", _WARN, 

229 "one or more configured state paths are invalid", 

230 {"paths": list(state_paths), "present": present}, 

231 ) 

232 present = sum(1 for e in state_paths if e.get("status") == "present") 

233 return CheckResult( 

234 "state_paths", _OK, 

235 f"{present} of {len(state_paths)} state path(s) present " 

236 "(missing paths report as empty history)", 

237 {"paths": list(state_paths), "present": present}, 

238 ) 

239 

240 

241def run_doctor( 

242 *, 

243 installed_version: str, 

244 latest_version: str | None, 

245 adapter_markers: list[dict[str, object]], 

246 orphans: list[dict[str, object]], 

247 core_version: str | None, 

248 state_paths: list[dict[str, object]], 

249) -> dict[str, object]: 

250 """Run all diagnostic checks over already-gathered facts (pure, deterministic). 

251 

252 Returns a JSON-stable dict: ``schema_version``, ``installed_version``, the 

253 ordered ``checks`` list, and a roll-up ``status`` (worst of all checks) plus 

254 counts. The caller maps ``status`` to an exit code (and ``--strict`` turns a 

255 ``fail`` roll-up into a non-zero exit). 

256 """ 

257 checks = [ 

258 _check_cli_version(installed_version, latest_version), 

259 _check_adapter_version(installed_version, adapter_markers), 

260 _check_orphan_adapters(orphans), 

261 _check_core_version(installed_version, core_version), 

262 _check_state_paths(state_paths), 

263 ] 

264 worst = max((c.status for c in checks), key=lambda s: _RANK[s]) 

265 counts = {_OK: 0, _WARN: 0, _FAIL: 0} 

266 for check in checks: 

267 counts[check.status] += 1 

268 return { 

269 "schema_version": SCHEMA_VERSION, 

270 "installed_version": installed_version, 

271 "status": worst, 

272 "counts": counts, 

273 "checks": [c.as_dict() for c in checks], 

274 } 

275 

276 

277def render_report(report: dict[str, object]) -> str: 

278 """Render a doctor report as aligned human-readable status lines.""" 

279 lines = [f"keel doctor — {report['status']} (keel {report['installed_version']})"] 

280 for check in report["checks"]: 

281 state = str(check["status"]).upper() 

282 lines.append(f" {state:>4} {check['name']:<16} {check['summary']}") 

283 counts = report["counts"] 

284 lines.append( 

285 f" summary : {counts[_OK]} ok, {counts[_WARN]} warn, {counts[_FAIL]} fail" 

286 ) 

287 return "\n".join(lines)