Coverage for src/keel/intake.py: 100%

143 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Pure issue intake and readiness classification for work-owning commands.""" 

2 

3from __future__ import annotations 

4 

5import re 

6from dataclasses import dataclass 

7from typing import Any 

8 

9READY = "ready" 

10NEEDS_INPUT = "needs-input" 

11BLOCKED = "blocked" 

12OUT_OF_SCOPE = "out-of-scope" 

13READINESS_STATUSES = (READY, NEEDS_INPUT, BLOCKED, OUT_OF_SCOPE) 

14 

15_SECTION_RE = re.compile(r"^#{1,6}\s+(?P<title>.+?)\s*$", re.MULTILINE) 

16_BULLET_RE = re.compile(r"^\s*(?:[-*+]|\d+[.)])\s+(?P<text>.+?)\s*$") 

17_BLOCKED_RE = re.compile( 

18 r"\b(blocked by|depends on|dependency|waiting on|needs dependency|blocked until)\b", 

19 re.IGNORECASE, 

20) 

21_NON_BLOCKING_DEPENDENCY_RE = re.compile( 

22 r"\b(no dependenc(?:y|ies)|dependenc(?:y|ies):\s*(?:none|no|n/a)|" 

23 r"depends on:\s*(?:none|no|n/a)|blocked by:\s*(?:none|no|n/a)|" 

24 r"waiting on\s+(?:none|no one|nobody|no-one))\b", 

25 re.IGNORECASE, 

26) 

27_AMBIGUOUS_RE = re.compile( 

28 r"\b(tbd|todo|unclear|ambiguous|maybe|not sure|needs clarification|decide later)\b", 

29 re.IGNORECASE, 

30) 

31_OUT_OF_SCOPE_LABELS = frozenset({"out-of-scope", "wontfix", "not-planned"}) 

32_OUT_OF_SCOPE_RE = re.compile( 

33 r"\b(out[- ]of[- ]scope|not planned|wontfix|won't fix|non-goal|not in scope)\b", 

34 re.IGNORECASE, 

35) 

36_DOCS_RE = re.compile(r"\b(doc|docs|documentation|readme|changelog)\b", re.IGNORECASE) 

37_TESTS_RE = re.compile(r"\b(test|tests|coverage|ci|lint)\b", re.IGNORECASE) 

38_BLOCKED_LABELS = frozenset({"blocked", "status:blocked", "needs-dependency"}) 

39_RISK_RE = re.compile( 

40 r"\b(security|release|migration|schema|api|breaking|billing|secret|credential|ci|" 

41 r"production|compatibility)\b", 

42 re.IGNORECASE, 

43) 

44 

45 

46@dataclass(frozen=True) 

47class IssueContext: 

48 """Issue text supplied by an adapter before code mutation starts.""" 

49 

50 title: str | None = None 

51 body: str | None = None 

52 labels: tuple[str, ...] = () 

53 

54 @property 

55 def provided(self) -> bool: 

56 return bool((self.title or "").strip() or (self.body or "").strip() or self.labels) 

57 

58 

59def assess_issue( 

60 *, 

61 title: str | None = None, 

62 body: str | None = None, 

63 labels: tuple[str, ...] = (), 

64) -> dict[str, Any]: 

65 """Return a deterministic readiness record for an issue-like work item.""" 

66 context = IssueContext(title=title, body=body, labels=tuple(labels)) 

67 sections = _sections(context.body or "") 

68 acceptance = _acceptance_criteria(sections) 

69 objective = _objective(context.title, context.body, sections) 

70 deliverable = _deliverable(sections, acceptance) 

71 combined = " ".join(filter(None, (context.title, context.body, " ".join(context.labels)))) 

72 normalized_labels = tuple(label.strip().lower() for label in context.labels) 

73 

74 missing: list[str] = [] 

75 blockers: list[str] = [] 

76 questions: list[str] = [] 

77 

78 if not objective: 

79 missing.append("objective") 

80 questions.append("What objective should this issue accomplish?") 

81 if not deliverable: 

82 missing.append("deliverable") 

83 questions.append("What concrete deliverable should be produced?") 

84 if not acceptance: 

85 missing.append("acceptance_criteria") 

86 questions.append("What acceptance criteria define done for this issue?") 

87 

88 status = READY 

89 reason = "Issue has an objective, deliverable, and acceptance criteria." 

90 if _is_out_of_scope(combined, normalized_labels): 

91 status = OUT_OF_SCOPE 

92 reason = "Issue is marked out of scope or not planned." 

93 questions = [] 

94 elif _is_blocked(combined, normalized_labels): 

95 status = BLOCKED 

96 reason = "Issue declares a dependency or waiting condition." 

97 blockers.append(_blocked_summary(combined)) 

98 questions.append("What dependency must clear before this issue can start?") 

99 elif missing: 

100 status = NEEDS_INPUT 

101 reason = f"Missing required intake field(s): {', '.join(missing)}." 

102 elif _AMBIGUOUS_RE.search(combined): 

103 status = NEEDS_INPUT 

104 reason = "Issue scope contains ambiguous or deferred wording." 

105 missing.append("scope_clarity") 

106 questions.append("Which exact scope should be implemented now?") 

107 

108 return { 

109 "schema_version": "keel.issue-intake.v1", 

110 "provided": context.provided, 

111 "status": status, 

112 "can_mutate_code": status == READY, 

113 "work_block_policy": { 

114 "skip_when_not_ready": status != READY, 

115 "continue_with_next_ready_issue": status != READY, 

116 "non_ready_statuses": [NEEDS_INPUT, BLOCKED, OUT_OF_SCOPE], 

117 }, 

118 "reason": reason, 

119 "objective": objective, 

120 "deliverable": deliverable, 

121 "acceptance_criteria": acceptance, 

122 "risk_tier_inputs": _risk_inputs(combined, normalized_labels), 

123 "required_docs_tests": _required_docs_tests(combined, acceptance), 

124 "missing_info": missing, 

125 "blockers": blockers, 

126 "questions": _unique(questions)[:3], 

127 "ledger_record": { 

128 "readiness": status, 

129 "mutation_allowed": status == READY, 

130 "skip_reason": None if status == READY else reason, 

131 "question_count": len(_unique(questions)[:3]), 

132 }, 

133 } 

134 

135 

136def _sections(body: str) -> dict[str, str]: 

137 matches = list(_SECTION_RE.finditer(body)) 

138 if not matches: 

139 return {} 

140 sections: dict[str, str] = {} 

141 for index, match in enumerate(matches): 

142 title = _normalize_heading(match.group("title")) 

143 start = match.end() 

144 end = matches[index + 1].start() if index + 1 < len(matches) else len(body) 

145 sections[title] = body[start:end].strip() 

146 return sections 

147 

148 

149def _normalize_heading(value: str) -> str: 

150 return re.sub(r"[^a-z0-9]+", " ", value.lower()).strip() 

151 

152 

153def _objective(title: str | None, body: str | None, sections: dict[str, str]) -> str | None: 

154 for key in ("objective", "problem", "summary", "context"): 

155 if value := _first_sentence_or_bullet(sections.get(key, "")): 

156 return value 

157 if body and not sections: 

158 return _first_sentence_or_bullet(body) 

159 return title.strip() if title and title.strip() else None 

160 

161 

162def _deliverable(sections: dict[str, str], acceptance: list[str]) -> str | None: 

163 del acceptance 

164 for key in ("deliverable", "proposed direction", "proposal", "scope", "implementation"): 

165 if value := _first_sentence_or_bullet(sections.get(key, "")): 

166 return value 

167 return None 

168 

169 

170def _acceptance_criteria(sections: dict[str, str]) -> list[str]: 

171 for key in ( 

172 "acceptance criteria", 

173 "acceptance", 

174 "definition of done", 

175 "done when", 

176 "dod", 

177 ): 

178 if key in sections: 

179 bullets = _bullets(sections[key]) 

180 return bullets if bullets else _sentences(sections[key]) 

181 return [] 

182 

183 

184def _first_sentence_or_bullet(text: str) -> str | None: 

185 bullets = _bullets(text) 

186 if bullets: 

187 return bullets[0] 

188 sentences = _sentences(text) 

189 return sentences[0] if sentences else None 

190 

191 

192def _bullets(text: str) -> list[str]: 

193 items: list[str] = [] 

194 for line in text.splitlines(): 

195 if match := _BULLET_RE.match(line): 

196 item = match.group("text").strip() 

197 if item: 

198 items.append(item) 

199 return items 

200 

201 

202def _sentences(text: str) -> list[str]: 

203 compact = " ".join(line.strip() for line in text.splitlines() if line.strip()) 

204 if not compact: 

205 return [] 

206 parts = [part.strip() for part in re.split(r"(?<=[.!?])\s+", compact) if part.strip()] 

207 return parts or [compact] 

208 

209 

210def _is_out_of_scope(combined: str, labels: tuple[str, ...]) -> bool: 

211 return not _OUT_OF_SCOPE_LABELS.isdisjoint(labels) or bool( 

212 _OUT_OF_SCOPE_RE.search(combined) 

213 ) 

214 

215 

216def _is_blocked(combined: str, labels: tuple[str, ...]) -> bool: 

217 if not _BLOCKED_LABELS.isdisjoint(labels): 

218 return True 

219 return any(_is_actionable_blocker(sentence) for sentence in _sentences(combined)) 

220 

221 

222def _blocked_summary(combined: str) -> str: 

223 sentences = _sentences(combined) 

224 for sentence in sentences: 

225 if _is_actionable_blocker(sentence): 

226 return sentence 

227 return "Declared blocked dependency." 

228 

229 

230def _is_actionable_blocker(text: str) -> bool: 

231 return bool(_BLOCKED_RE.search(text)) and not bool(_NON_BLOCKING_DEPENDENCY_RE.search(text)) 

232 

233 

234def _risk_inputs(combined: str, labels: tuple[str, ...]) -> dict[str, Any]: 

235 keywords = _unique(match.group(0).lower() for match in _RISK_RE.finditer(combined)) 

236 risk_labels = [label for label in labels if "risk" in label or label.startswith("tier")] 

237 return { 

238 "keywords": keywords, 

239 "labels": risk_labels, 

240 "has_high_risk_signal": bool(keywords or risk_labels), 

241 } 

242 

243 

244def _required_docs_tests(combined: str, acceptance: list[str]) -> dict[str, Any]: 

245 text = " ".join([combined, *acceptance]) 

246 return { 

247 "docs": "required" if _DOCS_RE.search(text) else "unspecified", 

248 "tests": "required" if _TESTS_RE.search(text) else "unspecified", 

249 } 

250 

251 

252def _unique(items) -> list[str]: 

253 return list(dict.fromkeys(v for item in items if (v := str(item).strip())))