Coverage for src/keel/doctor.py: 100%
110 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Pure diagnostics for ``keel doctor`` — a read-only health pass.
3This module is **pure**: no network, no wall-clock, no randomness. The caller
4(``keel.cli``) performs all I/O — fetching the latest version from PyPI, reading
5adapter markers off disk, loading config, probing state-path existence — and
6passes the already-gathered facts into :func:`run_doctor`, which classifies each
7check as ``ok`` / ``warn`` / ``fail`` and returns a structured, JSON-stable
8result. Every branch here is deterministic and unit-tested.
10Checks
11------
12``cli_version`` installed ``keel.__version__`` vs latest on PyPI (the
13 headline check — a silent downgrade is a ``fail``).
14``adapter_version`` ``keel_version=`` markers on installed adapter surfaces vs
15 the running CLI version.
16``orphan_adapters`` surfaces whose ``command=`` is no longer in the installed
17 keel (stale-marker orphans).
18``core_version`` ``core_version`` constraint from project.yaml vs the
19 installed CLI version.
20``state_paths`` existence/validity of the configured ledger + checkpoint
21 paths (advisory; missing == empty history, not a defect).
22"""
24from __future__ import annotations
26import re
27from dataclasses import dataclass, field
29SCHEMA_VERSION = "keel.doctor.v1"
31#: per-check status levels, ordered worst-last for summary roll-up.
32_OK = "ok"
33_WARN = "warn"
34_FAIL = "fail"
35_RANK = {_OK: 0, _WARN: 1, _FAIL: 2}
37#: a release version: ``MAJOR.MINOR.PATCH`` with optional further dotted parts.
38_VERSION_RE = re.compile(r"^\d+(?:\.\d+)*$")
39#: a ``core_version`` constraint: an optional operator (``^`` / ``~`` / ``>=`` /
40#: ``==``) followed by a dotted version. A bare version means exact match.
41_CONSTRAINT_RE = re.compile(r"^(?P<op>\^|~|>=|<=|>|<|==|=)?\s*(?P<version>\d+(?:\.\d+)*)$")
44@dataclass(frozen=True)
45class CheckResult:
46 """One diagnostic check outcome (JSON-stable via :meth:`as_dict`)."""
48 name: str
49 status: str
50 summary: str
51 detail: dict[str, object] = field(default_factory=dict)
53 def as_dict(self) -> dict[str, object]:
54 return {
55 "name": self.name,
56 "status": self.status,
57 "summary": self.summary,
58 "detail": dict(self.detail),
59 }
62def _parse_version(text: str) -> tuple[int, ...] | None:
63 """Parse a dotted release version into a comparable tuple (``None`` if unparseable)."""
64 if not isinstance(text, str) or not _VERSION_RE.match(text.strip()):
65 return None
66 return tuple(int(part) for part in text.strip().split("."))
69def _pad(a: tuple[int, ...], b: tuple[int, ...]) -> tuple[tuple[int, ...], tuple[int, ...]]:
70 """Right-pad the shorter tuple with zeros so the two compare component-wise."""
71 width = max(len(a), len(b))
72 return a + (0,) * (width - len(a)), b + (0,) * (width - len(b))
75def constraint_satisfied(installed: str, constraint: str) -> bool | None:
76 """Does ``installed`` satisfy the ``core_version`` ``constraint``?
78 Supports ``^`` (caret: same leading non-zero, ``>=``), ``~`` (tilde: same
79 major+minor, ``>=``), the comparison operators (``>=``, ``<=``, ``>``,
80 ``<``, ``==``/``=``), and a bare version (exact match). Returns ``None`` when
81 either side is unparseable so the caller can report ``unknown`` rather than
82 guess. Pure and deterministic.
83 """
84 inst = _parse_version(installed)
85 match = _CONSTRAINT_RE.match(constraint.strip()) if isinstance(constraint, str) else None
86 if inst is None or match is None:
87 return None
88 want = _parse_version(match.group("version"))
89 if want is None: # pragma: no cover - regex already guarantees a parseable version
90 return None
91 op = match.group("op") or "=="
92 pi, pw = _pad(inst, want)
93 if op == "^":
94 # caret: pin the most-significant non-zero component, then ``>=``.
95 lead = next((i for i, part in enumerate(want) if part != 0), len(want) - 1)
96 return pi[lead] == pw[lead] and pi[:lead] == pw[:lead] and pi >= pw
97 if op == "~":
98 # tilde: pin major+minor (or major when no minor given), then ``>=``.
99 pin = min(2, len(want))
100 return pi[:pin] == pw[:pin] and pi >= pw
101 if op in (">=",):
102 return pi >= pw
103 if op in (">",):
104 return pi > pw
105 if op in ("<=",):
106 return pi <= pw
107 if op in ("<",):
108 return pi < pw
109 return pi == pw # ``==`` / ``=`` / bare
112def _check_cli_version(installed: str, latest: str | None) -> CheckResult:
113 """Installed CLI vs latest on PyPI. Offline => ``warn`` (unknown); stale => ``fail``."""
114 if latest is None:
115 return CheckResult(
116 "cli_version", _WARN,
117 f"installed {installed}; latest unknown (offline or PyPI unreachable)",
118 {"installed": installed, "latest": "unknown"},
119 )
120 inst, lat = _parse_version(installed), _parse_version(latest)
121 detail = {"installed": installed, "latest": latest}
122 if inst is None or lat is None:
123 return CheckResult(
124 "cli_version", _WARN,
125 f"installed {installed}; latest {latest}; cannot compare versions",
126 detail,
127 )
128 pi, pl = _pad(inst, lat)
129 if pi < pl:
130 return CheckResult(
131 "cli_version", _FAIL,
132 f"installed {installed} is behind latest {latest} — upgrade keel-workflow",
133 detail,
134 )
135 if pi > pl:
136 return CheckResult(
137 "cli_version", _WARN,
138 f"installed {installed} is ahead of latest {latest} (pre-release or unpublished)",
139 detail,
140 )
141 return CheckResult(
142 "cli_version", _OK, f"installed {installed} is up to date", detail,
143 )
146def _check_adapter_version(installed: str, markers: list[dict[str, object]]) -> CheckResult:
147 """Installed adapter ``keel_version`` markers vs the running CLI version."""
148 if not markers:
149 return CheckResult(
150 "adapter_version", _WARN,
151 "no keel-generated adapter surfaces found under --root",
152 {"installed": installed, "surfaces": 0, "drift": []},
153 )
154 drift = []
155 for marker in markers:
156 marker_version = marker.get("keel_version")
157 if marker_version != installed:
158 drift.append({
159 "surface": marker.get("surface", ""),
160 "name": marker.get("name", ""),
161 "keel_version": marker_version,
162 })
163 if drift:
164 return CheckResult(
165 "adapter_version", _WARN,
166 f"{len(drift)} of {len(markers)} adapter surface(s) drift from CLI {installed} "
167 "— run keel update-adapter",
168 {"installed": installed, "surfaces": len(markers), "drift": drift},
169 )
170 return CheckResult(
171 "adapter_version", _OK,
172 f"all {len(markers)} adapter surface(s) match CLI {installed}",
173 {"installed": installed, "surfaces": len(markers), "drift": []},
174 )
177def _check_orphan_adapters(orphans: list[dict[str, object]]) -> CheckResult:
178 """Surfaces whose command is no longer in the installed keel (stale-marker orphans)."""
179 if not orphans:
180 return CheckResult(
181 "orphan_adapters", _OK, "no orphan adapter surfaces", {"orphans": []},
182 )
183 return CheckResult(
184 "orphan_adapters", _WARN,
185 f"{len(orphans)} orphan adapter surface(s) — command(s) no longer in installed keel",
186 {"orphans": list(orphans)},
187 )
190def _check_core_version(installed: str, core_version: str | None) -> CheckResult:
191 """``core_version`` constraint from project.yaml vs the installed CLI version."""
192 if core_version is None:
193 return CheckResult(
194 "core_version", _OK, "no project config given — core_version check skipped",
195 {"installed": installed, "core_version": None},
196 )
197 detail = {"installed": installed, "core_version": core_version}
198 satisfied = constraint_satisfied(installed, core_version)
199 if satisfied is None:
200 return CheckResult(
201 "core_version", _WARN,
202 f"cannot evaluate core_version {core_version!r} against installed {installed}",
203 detail,
204 )
205 if not satisfied:
206 return CheckResult(
207 "core_version", _FAIL,
208 f"installed {installed} does not satisfy core_version {core_version!r}",
209 detail,
210 )
211 return CheckResult(
212 "core_version", _OK,
213 f"installed {installed} satisfies core_version {core_version!r}",
214 detail,
215 )
218def _check_state_paths(state_paths: list[dict[str, object]]) -> CheckResult:
219 """Advisory check on configured ledger/checkpoint paths — missing == empty history."""
220 if not state_paths:
221 return CheckResult(
222 "state_paths", _OK, "no state paths configured", {"paths": []},
223 )
224 for entry in state_paths:
225 if entry.get("status") == "invalid":
226 present = sum(1 for e in state_paths if e.get("status") == "present")
227 return CheckResult(
228 "state_paths", _WARN,
229 "one or more configured state paths are invalid",
230 {"paths": list(state_paths), "present": present},
231 )
232 present = sum(1 for e in state_paths if e.get("status") == "present")
233 return CheckResult(
234 "state_paths", _OK,
235 f"{present} of {len(state_paths)} state path(s) present "
236 "(missing paths report as empty history)",
237 {"paths": list(state_paths), "present": present},
238 )
241def run_doctor(
242 *,
243 installed_version: str,
244 latest_version: str | None,
245 adapter_markers: list[dict[str, object]],
246 orphans: list[dict[str, object]],
247 core_version: str | None,
248 state_paths: list[dict[str, object]],
249) -> dict[str, object]:
250 """Run all diagnostic checks over already-gathered facts (pure, deterministic).
252 Returns a JSON-stable dict: ``schema_version``, ``installed_version``, the
253 ordered ``checks`` list, and a roll-up ``status`` (worst of all checks) plus
254 counts. The caller maps ``status`` to an exit code (and ``--strict`` turns a
255 ``fail`` roll-up into a non-zero exit).
256 """
257 checks = [
258 _check_cli_version(installed_version, latest_version),
259 _check_adapter_version(installed_version, adapter_markers),
260 _check_orphan_adapters(orphans),
261 _check_core_version(installed_version, core_version),
262 _check_state_paths(state_paths),
263 ]
264 worst = max((c.status for c in checks), key=lambda s: _RANK[s])
265 counts = {_OK: 0, _WARN: 0, _FAIL: 0}
266 for check in checks:
267 counts[check.status] += 1
268 return {
269 "schema_version": SCHEMA_VERSION,
270 "installed_version": installed_version,
271 "status": worst,
272 "counts": counts,
273 "checks": [c.as_dict() for c in checks],
274 }
277def render_report(report: dict[str, object]) -> str:
278 """Render a doctor report as aligned human-readable status lines."""
279 lines = [f"keel doctor — {report['status']} (keel {report['installed_version']})"]
280 for check in report["checks"]:
281 state = str(check["status"]).upper()
282 lines.append(f" {state:>4} {check['name']:<16} {check['summary']}")
283 counts = report["counts"]
284 lines.append(
285 f" summary : {counts[_OK]} ok, {counts[_WARN]} warn, {counts[_FAIL]} fail"
286 )
287 return "\n".join(lines)