Coverage for src/keel/status.py: 100%
98 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Operator-facing progress snapshots for active or recent keel runs."""
3from __future__ import annotations
5from typing import Any
7from . import checkpoint, ledger
8from . import config as cfg
10STATUS_SCHEMA_VERSION = "keel.progress-status.v1"
12_WAIT_REASONS = {
13 "s1": "needs-input",
14 "s6": "ci",
15 "s7": "review",
16 "s8": "test",
17 "s10": "merge-window",
18 "s11": "capture",
19 "s12": "closeout",
20}
23def status_contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]:
24 """Return the progress status contract for adapters and operators."""
25 return {
26 "schema_version": STATUS_SCHEMA_VERSION,
27 "command": "status",
28 "source": {
29 "checkpoint": checkpoint.checkpoint_contract_as_dict(config),
30 "run_ledger": ledger.ledger_contract_as_dict(config),
31 },
32 "capture_health": ledger.capture_health_contract_as_dict(),
33 "realtime": False,
34 "snapshot_semantics": "last-safe-boundary",
35 "consumer_neutral": True,
36 "states": ["no-active-run", "active", "waiting", "interrupted", "completed"],
37 "item_states": ["shipped", "blocked", "deferred", "skipped"],
38 }
41def build_status_snapshot(
42 *,
43 config: cfg.ProjectConfig,
44 checkpoint_record: dict[str, Any] | None,
45 ledger_records: list[dict[str, Any]],
46 live_branches: list[str] | None = None,
47 live_pull_requests: list[int] | None = None,
48) -> dict[str, Any]:
49 """Build a concise, consumer-neutral progress snapshot.
51 ``live_branches``/``live_pull_requests`` are the git/transport-side live
52 references (supplied by the thin CLI layer). When present, orphans — live
53 branches/PRs with no covering checkpoint or ledger record — are flagged
54 advisorily (GAP-13, the git side of the missing-checkpoint hazard).
55 """
56 history = _history(ledger_records)
57 capture_health = ledger.capture_health_summary(ledger_records)
58 current = _current(checkpoint_record)
59 state = _state(checkpoint_record, history)
60 orphans = checkpoint.find_orphans(
61 live_branches=live_branches,
62 live_pull_requests=live_pull_requests,
63 checkpoint_record=checkpoint_record,
64 ledger_records=ledger_records,
65 )
66 return {
67 "schema_version": STATUS_SCHEMA_VERSION,
68 "status": state,
69 "project": {
70 "repo": config.repo,
71 "base_branch": config.base_branch,
72 "timezone": config.timezone,
73 },
74 "current": current,
75 "history": history,
76 "capture_health": capture_health,
77 "orphans": orphans,
78 "next": _next_item(checkpoint_record),
79 }
82def render_status(snapshot: dict[str, Any]) -> str:
83 """Render status for humans in actionability order."""
84 current = snapshot["current"]
85 history = snapshot["history"]
86 lines = [f"keel status — {snapshot['status']}"]
87 if current:
88 lines.append(f" issue : {current['issue'] or '-'}")
89 lines.append(f" step : {current['step'] or '-'}")
90 lines.append(f" wait : {current['wait_reason'] or '-'}")
91 lines.append(f" pr : {current['pull_request'] or '-'}")
92 lines.append(f" branch : {current['branch'] or '-'}")
93 lines.append(f" shipped : {history['counts']['shipped']}")
94 lines.append(f" blocked : {history['counts']['blocked']}")
95 lines.append(f" deferred : {history['counts']['deferred']}")
96 lines.append(f" skipped : {history['counts']['skipped']}")
97 capture_health = snapshot["capture_health"]
98 lines.append(f" capture : {capture_health['status']}")
99 lines.append(f" capture gaps : {capture_health['counts']['needs_reconcile']}")
100 orphans = snapshot["orphans"]
101 lines.append(f" orphans : {orphans['orphan_count']}")
102 if orphans["branches"]:
103 lines.append(f" orphan branch : {', '.join(orphans['branches'])}")
104 if orphans["pull_requests"]:
105 joined = ", ".join(f"#{pr}" for pr in orphans["pull_requests"])
106 lines.append(f" orphan pr : {joined}")
107 lines.append(f" next : {snapshot['next']['issue'] or '-'}")
108 return "\n".join(lines)
111def _state(checkpoint_record: dict[str, Any] | None, history: dict[str, Any]) -> str:
112 if checkpoint_record is None:
113 return "completed" if history["total"] else "no-active-run"
114 run_state = checkpoint_record.get("state", {})
115 if (
116 run_state.get("merge") == "merged"
117 and run_state.get("capture") not in {"not-started", "failed"}
118 and run_state.get("close") == "closed"
119 ):
120 return "completed"
121 stop_reason = checkpoint_record.get("state", {}).get("stop_reason")
122 if stop_reason:
123 return "interrupted"
124 step = checkpoint_record.get("position", {}).get("current_step")
125 if step in _WAIT_REASONS:
126 return "waiting"
127 return "active"
130def _current(record: dict[str, Any] | None) -> dict[str, Any] | None:
131 if record is None:
132 return None
133 queue = record.get("queue", {})
134 position = record.get("position", {})
135 identifiers = record.get("identifiers", {})
136 state = record.get("state", {})
137 step = position.get("current_step")
138 return {
139 "run_id": record.get("run_id"),
140 "command": record.get("command"),
141 "target": record.get("target"),
142 "issue": queue.get("active_issue"),
143 "step": step,
144 "completed_steps": list(position.get("completed_steps", [])),
145 "pull_request": identifiers.get("pull_request"),
146 "branch": identifiers.get("branch"),
147 "worktree": identifiers.get("worktree"),
148 "head_sha": identifiers.get("head_sha"),
149 "wait_reason": state.get("stop_reason") or _WAIT_REASONS.get(step),
150 "merge_state": state.get("merge"),
151 "capture_state": state.get("capture"),
152 "close_state": state.get("close"),
153 }
156def _next_item(record: dict[str, Any] | None) -> dict[str, Any]:
157 if record is None:
158 return {"issue": None, "source": "no-active-run"}
159 queue = record.get("queue", {})
160 active = queue.get("active_issue")
161 issues = queue.get("issues", [])
162 if active in issues:
163 active_index = issues.index(active)
164 if active_index + 1 < len(issues):
165 return {"issue": issues[active_index + 1], "source": "checkpoint.queue"}
166 elif issues:
167 return {"issue": issues[0], "source": "checkpoint.queue"}
168 return {"issue": None, "source": "checkpoint.queue"}
171def _history(records: list[dict[str, Any]]) -> dict[str, Any]:
172 items = [_history_item(record) for record in records]
173 counts = {"shipped": 0, "blocked": 0, "deferred": 0, "skipped": 0}
174 for item in items:
175 counts[item["state"]] += 1
176 return {
177 "total": len(items),
178 "counts": counts,
179 "items": items,
180 }
183def _history_item(record: dict[str, Any]) -> dict[str, Any]:
184 assessment = record.get("assessment", {})
185 merge = assessment.get("merge", {})
186 capture = record.get("capture", {})
187 state = _item_state(record)
188 return {
189 "run_id": record.get("run_id"),
190 "issue": (record.get("issue") or {}).get("number"),
191 "pull_request": (record.get("pull_request") or {}).get("number"),
192 "state": state,
193 "reason": merge.get("reason") or capture.get("reason"),
194 "step": "s12" if state == "shipped" else None,
195 "capture": capture.get("status"),
196 }
199def _item_state(record: dict[str, Any]) -> str:
200 assessment = record.get("assessment", {})
201 merge = assessment.get("merge", {})
202 if merge.get("action") == "skip":
203 return "skipped"
204 if assessment.get("halted") or merge.get("action") == "defer":
205 return "deferred"
206 if record.get("verdict", {}).get("blocked") or merge.get("action") == "block":
207 return "blocked"
208 return "shipped"