Coverage for src/keel/status.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Operator-facing progress snapshots for active or recent keel runs.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6 

7from . import checkpoint, ledger 

8from . import config as cfg 

9 

10STATUS_SCHEMA_VERSION = "keel.progress-status.v1" 

11 

12_WAIT_REASONS = { 

13 "s1": "needs-input", 

14 "s6": "ci", 

15 "s7": "review", 

16 "s8": "test", 

17 "s10": "merge-window", 

18 "s11": "capture", 

19 "s12": "closeout", 

20} 

21 

22 

23def status_contract_as_dict(config: cfg.ProjectConfig) -> dict[str, Any]: 

24 """Return the progress status contract for adapters and operators.""" 

25 return { 

26 "schema_version": STATUS_SCHEMA_VERSION, 

27 "command": "status", 

28 "source": { 

29 "checkpoint": checkpoint.checkpoint_contract_as_dict(config), 

30 "run_ledger": ledger.ledger_contract_as_dict(config), 

31 }, 

32 "capture_health": ledger.capture_health_contract_as_dict(), 

33 "realtime": False, 

34 "snapshot_semantics": "last-safe-boundary", 

35 "consumer_neutral": True, 

36 "states": ["no-active-run", "active", "waiting", "interrupted", "completed"], 

37 "item_states": ["shipped", "blocked", "deferred", "skipped"], 

38 } 

39 

40 

41def build_status_snapshot( 

42 *, 

43 config: cfg.ProjectConfig, 

44 checkpoint_record: dict[str, Any] | None, 

45 ledger_records: list[dict[str, Any]], 

46 live_branches: list[str] | None = None, 

47 live_pull_requests: list[int] | None = None, 

48) -> dict[str, Any]: 

49 """Build a concise, consumer-neutral progress snapshot. 

50 

51 ``live_branches``/``live_pull_requests`` are the git/transport-side live 

52 references (supplied by the thin CLI layer). When present, orphans — live 

53 branches/PRs with no covering checkpoint or ledger record — are flagged 

54 advisorily (GAP-13, the git side of the missing-checkpoint hazard). 

55 """ 

56 history = _history(ledger_records) 

57 capture_health = ledger.capture_health_summary(ledger_records) 

58 current = _current(checkpoint_record) 

59 state = _state(checkpoint_record, history) 

60 orphans = checkpoint.find_orphans( 

61 live_branches=live_branches, 

62 live_pull_requests=live_pull_requests, 

63 checkpoint_record=checkpoint_record, 

64 ledger_records=ledger_records, 

65 ) 

66 return { 

67 "schema_version": STATUS_SCHEMA_VERSION, 

68 "status": state, 

69 "project": { 

70 "repo": config.repo, 

71 "base_branch": config.base_branch, 

72 "timezone": config.timezone, 

73 }, 

74 "current": current, 

75 "history": history, 

76 "capture_health": capture_health, 

77 "orphans": orphans, 

78 "next": _next_item(checkpoint_record), 

79 } 

80 

81 

82def render_status(snapshot: dict[str, Any]) -> str: 

83 """Render status for humans in actionability order.""" 

84 current = snapshot["current"] 

85 history = snapshot["history"] 

86 lines = [f"keel status — {snapshot['status']}"] 

87 if current: 

88 lines.append(f" issue : {current['issue'] or '-'}") 

89 lines.append(f" step : {current['step'] or '-'}") 

90 lines.append(f" wait : {current['wait_reason'] or '-'}") 

91 lines.append(f" pr : {current['pull_request'] or '-'}") 

92 lines.append(f" branch : {current['branch'] or '-'}") 

93 lines.append(f" shipped : {history['counts']['shipped']}") 

94 lines.append(f" blocked : {history['counts']['blocked']}") 

95 lines.append(f" deferred : {history['counts']['deferred']}") 

96 lines.append(f" skipped : {history['counts']['skipped']}") 

97 capture_health = snapshot["capture_health"] 

98 lines.append(f" capture : {capture_health['status']}") 

99 lines.append(f" capture gaps : {capture_health['counts']['needs_reconcile']}") 

100 orphans = snapshot["orphans"] 

101 lines.append(f" orphans : {orphans['orphan_count']}") 

102 if orphans["branches"]: 

103 lines.append(f" orphan branch : {', '.join(orphans['branches'])}") 

104 if orphans["pull_requests"]: 

105 joined = ", ".join(f"#{pr}" for pr in orphans["pull_requests"]) 

106 lines.append(f" orphan pr : {joined}") 

107 lines.append(f" next : {snapshot['next']['issue'] or '-'}") 

108 return "\n".join(lines) 

109 

110 

111def _state(checkpoint_record: dict[str, Any] | None, history: dict[str, Any]) -> str: 

112 if checkpoint_record is None: 

113 return "completed" if history["total"] else "no-active-run" 

114 run_state = checkpoint_record.get("state", {}) 

115 if ( 

116 run_state.get("merge") == "merged" 

117 and run_state.get("capture") not in {"not-started", "failed"} 

118 and run_state.get("close") == "closed" 

119 ): 

120 return "completed" 

121 stop_reason = checkpoint_record.get("state", {}).get("stop_reason") 

122 if stop_reason: 

123 return "interrupted" 

124 step = checkpoint_record.get("position", {}).get("current_step") 

125 if step in _WAIT_REASONS: 

126 return "waiting" 

127 return "active" 

128 

129 

130def _current(record: dict[str, Any] | None) -> dict[str, Any] | None: 

131 if record is None: 

132 return None 

133 queue = record.get("queue", {}) 

134 position = record.get("position", {}) 

135 identifiers = record.get("identifiers", {}) 

136 state = record.get("state", {}) 

137 step = position.get("current_step") 

138 return { 

139 "run_id": record.get("run_id"), 

140 "command": record.get("command"), 

141 "target": record.get("target"), 

142 "issue": queue.get("active_issue"), 

143 "step": step, 

144 "completed_steps": list(position.get("completed_steps", [])), 

145 "pull_request": identifiers.get("pull_request"), 

146 "branch": identifiers.get("branch"), 

147 "worktree": identifiers.get("worktree"), 

148 "head_sha": identifiers.get("head_sha"), 

149 "wait_reason": state.get("stop_reason") or _WAIT_REASONS.get(step), 

150 "merge_state": state.get("merge"), 

151 "capture_state": state.get("capture"), 

152 "close_state": state.get("close"), 

153 } 

154 

155 

156def _next_item(record: dict[str, Any] | None) -> dict[str, Any]: 

157 if record is None: 

158 return {"issue": None, "source": "no-active-run"} 

159 queue = record.get("queue", {}) 

160 active = queue.get("active_issue") 

161 issues = queue.get("issues", []) 

162 if active in issues: 

163 active_index = issues.index(active) 

164 if active_index + 1 < len(issues): 

165 return {"issue": issues[active_index + 1], "source": "checkpoint.queue"} 

166 elif issues: 

167 return {"issue": issues[0], "source": "checkpoint.queue"} 

168 return {"issue": None, "source": "checkpoint.queue"} 

169 

170 

171def _history(records: list[dict[str, Any]]) -> dict[str, Any]: 

172 items = [_history_item(record) for record in records] 

173 counts = {"shipped": 0, "blocked": 0, "deferred": 0, "skipped": 0} 

174 for item in items: 

175 counts[item["state"]] += 1 

176 return { 

177 "total": len(items), 

178 "counts": counts, 

179 "items": items, 

180 } 

181 

182 

183def _history_item(record: dict[str, Any]) -> dict[str, Any]: 

184 assessment = record.get("assessment", {}) 

185 merge = assessment.get("merge", {}) 

186 capture = record.get("capture", {}) 

187 state = _item_state(record) 

188 return { 

189 "run_id": record.get("run_id"), 

190 "issue": (record.get("issue") or {}).get("number"), 

191 "pull_request": (record.get("pull_request") or {}).get("number"), 

192 "state": state, 

193 "reason": merge.get("reason") or capture.get("reason"), 

194 "step": "s12" if state == "shipped" else None, 

195 "capture": capture.get("status"), 

196 } 

197 

198 

199def _item_state(record: dict[str, Any]) -> str: 

200 assessment = record.get("assessment", {}) 

201 merge = assessment.get("merge", {}) 

202 if merge.get("action") == "skip": 

203 return "skipped" 

204 if assessment.get("halted") or merge.get("action") == "defer": 

205 return "deferred" 

206 if record.get("verdict", {}).get("blocked") or merge.get("action") == "block": 

207 return "blocked" 

208 return "shipped"