Coverage for src/keel/intake.py: 100%
143 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Pure issue intake and readiness classification for work-owning commands."""
3from __future__ import annotations
5import re
6from dataclasses import dataclass
7from typing import Any
9READY = "ready"
10NEEDS_INPUT = "needs-input"
11BLOCKED = "blocked"
12OUT_OF_SCOPE = "out-of-scope"
13READINESS_STATUSES = (READY, NEEDS_INPUT, BLOCKED, OUT_OF_SCOPE)
15_SECTION_RE = re.compile(r"^#{1,6}\s+(?P<title>.+?)\s*$", re.MULTILINE)
16_BULLET_RE = re.compile(r"^\s*(?:[-*+]|\d+[.)])\s+(?P<text>.+?)\s*$")
17_BLOCKED_RE = re.compile(
18 r"\b(blocked by|depends on|dependency|waiting on|needs dependency|blocked until)\b",
19 re.IGNORECASE,
20)
21_NON_BLOCKING_DEPENDENCY_RE = re.compile(
22 r"\b(no dependenc(?:y|ies)|dependenc(?:y|ies):\s*(?:none|no|n/a)|"
23 r"depends on:\s*(?:none|no|n/a)|blocked by:\s*(?:none|no|n/a)|"
24 r"waiting on\s+(?:none|no one|nobody|no-one))\b",
25 re.IGNORECASE,
26)
27_AMBIGUOUS_RE = re.compile(
28 r"\b(tbd|todo|unclear|ambiguous|maybe|not sure|needs clarification|decide later)\b",
29 re.IGNORECASE,
30)
31_OUT_OF_SCOPE_LABELS = frozenset({"out-of-scope", "wontfix", "not-planned"})
32_OUT_OF_SCOPE_RE = re.compile(
33 r"\b(out[- ]of[- ]scope|not planned|wontfix|won't fix|non-goal|not in scope)\b",
34 re.IGNORECASE,
35)
36_DOCS_RE = re.compile(r"\b(doc|docs|documentation|readme|changelog)\b", re.IGNORECASE)
37_TESTS_RE = re.compile(r"\b(test|tests|coverage|ci|lint)\b", re.IGNORECASE)
38_BLOCKED_LABELS = frozenset({"blocked", "status:blocked", "needs-dependency"})
39_RISK_RE = re.compile(
40 r"\b(security|release|migration|schema|api|breaking|billing|secret|credential|ci|"
41 r"production|compatibility)\b",
42 re.IGNORECASE,
43)
46@dataclass(frozen=True)
47class IssueContext:
48 """Issue text supplied by an adapter before code mutation starts."""
50 title: str | None = None
51 body: str | None = None
52 labels: tuple[str, ...] = ()
54 @property
55 def provided(self) -> bool:
56 return bool((self.title or "").strip() or (self.body or "").strip() or self.labels)
59def assess_issue(
60 *,
61 title: str | None = None,
62 body: str | None = None,
63 labels: tuple[str, ...] = (),
64) -> dict[str, Any]:
65 """Return a deterministic readiness record for an issue-like work item."""
66 context = IssueContext(title=title, body=body, labels=tuple(labels))
67 sections = _sections(context.body or "")
68 acceptance = _acceptance_criteria(sections)
69 objective = _objective(context.title, context.body, sections)
70 deliverable = _deliverable(sections, acceptance)
71 combined = " ".join(filter(None, (context.title, context.body, " ".join(context.labels))))
72 normalized_labels = tuple(label.strip().lower() for label in context.labels)
74 missing: list[str] = []
75 blockers: list[str] = []
76 questions: list[str] = []
78 if not objective:
79 missing.append("objective")
80 questions.append("What objective should this issue accomplish?")
81 if not deliverable:
82 missing.append("deliverable")
83 questions.append("What concrete deliverable should be produced?")
84 if not acceptance:
85 missing.append("acceptance_criteria")
86 questions.append("What acceptance criteria define done for this issue?")
88 status = READY
89 reason = "Issue has an objective, deliverable, and acceptance criteria."
90 if _is_out_of_scope(combined, normalized_labels):
91 status = OUT_OF_SCOPE
92 reason = "Issue is marked out of scope or not planned."
93 questions = []
94 elif _is_blocked(combined, normalized_labels):
95 status = BLOCKED
96 reason = "Issue declares a dependency or waiting condition."
97 blockers.append(_blocked_summary(combined))
98 questions.append("What dependency must clear before this issue can start?")
99 elif missing:
100 status = NEEDS_INPUT
101 reason = f"Missing required intake field(s): {', '.join(missing)}."
102 elif _AMBIGUOUS_RE.search(combined):
103 status = NEEDS_INPUT
104 reason = "Issue scope contains ambiguous or deferred wording."
105 missing.append("scope_clarity")
106 questions.append("Which exact scope should be implemented now?")
108 return {
109 "schema_version": "keel.issue-intake.v1",
110 "provided": context.provided,
111 "status": status,
112 "can_mutate_code": status == READY,
113 "work_block_policy": {
114 "skip_when_not_ready": status != READY,
115 "continue_with_next_ready_issue": status != READY,
116 "non_ready_statuses": [NEEDS_INPUT, BLOCKED, OUT_OF_SCOPE],
117 },
118 "reason": reason,
119 "objective": objective,
120 "deliverable": deliverable,
121 "acceptance_criteria": acceptance,
122 "risk_tier_inputs": _risk_inputs(combined, normalized_labels),
123 "required_docs_tests": _required_docs_tests(combined, acceptance),
124 "missing_info": missing,
125 "blockers": blockers,
126 "questions": _unique(questions)[:3],
127 "ledger_record": {
128 "readiness": status,
129 "mutation_allowed": status == READY,
130 "skip_reason": None if status == READY else reason,
131 "question_count": len(_unique(questions)[:3]),
132 },
133 }
136def _sections(body: str) -> dict[str, str]:
137 matches = list(_SECTION_RE.finditer(body))
138 if not matches:
139 return {}
140 sections: dict[str, str] = {}
141 for index, match in enumerate(matches):
142 title = _normalize_heading(match.group("title"))
143 start = match.end()
144 end = matches[index + 1].start() if index + 1 < len(matches) else len(body)
145 sections[title] = body[start:end].strip()
146 return sections
149def _normalize_heading(value: str) -> str:
150 return re.sub(r"[^a-z0-9]+", " ", value.lower()).strip()
153def _objective(title: str | None, body: str | None, sections: dict[str, str]) -> str | None:
154 for key in ("objective", "problem", "summary", "context"):
155 if value := _first_sentence_or_bullet(sections.get(key, "")):
156 return value
157 if body and not sections:
158 return _first_sentence_or_bullet(body)
159 return title.strip() if title and title.strip() else None
162def _deliverable(sections: dict[str, str], acceptance: list[str]) -> str | None:
163 del acceptance
164 for key in ("deliverable", "proposed direction", "proposal", "scope", "implementation"):
165 if value := _first_sentence_or_bullet(sections.get(key, "")):
166 return value
167 return None
170def _acceptance_criteria(sections: dict[str, str]) -> list[str]:
171 for key in (
172 "acceptance criteria",
173 "acceptance",
174 "definition of done",
175 "done when",
176 "dod",
177 ):
178 if key in sections:
179 bullets = _bullets(sections[key])
180 return bullets if bullets else _sentences(sections[key])
181 return []
184def _first_sentence_or_bullet(text: str) -> str | None:
185 bullets = _bullets(text)
186 if bullets:
187 return bullets[0]
188 sentences = _sentences(text)
189 return sentences[0] if sentences else None
192def _bullets(text: str) -> list[str]:
193 items: list[str] = []
194 for line in text.splitlines():
195 if match := _BULLET_RE.match(line):
196 item = match.group("text").strip()
197 if item:
198 items.append(item)
199 return items
202def _sentences(text: str) -> list[str]:
203 compact = " ".join(line.strip() for line in text.splitlines() if line.strip())
204 if not compact:
205 return []
206 parts = [part.strip() for part in re.split(r"(?<=[.!?])\s+", compact) if part.strip()]
207 return parts or [compact]
210def _is_out_of_scope(combined: str, labels: tuple[str, ...]) -> bool:
211 return not _OUT_OF_SCOPE_LABELS.isdisjoint(labels) or bool(
212 _OUT_OF_SCOPE_RE.search(combined)
213 )
216def _is_blocked(combined: str, labels: tuple[str, ...]) -> bool:
217 if not _BLOCKED_LABELS.isdisjoint(labels):
218 return True
219 return any(_is_actionable_blocker(sentence) for sentence in _sentences(combined))
222def _blocked_summary(combined: str) -> str:
223 sentences = _sentences(combined)
224 for sentence in sentences:
225 if _is_actionable_blocker(sentence):
226 return sentence
227 return "Declared blocked dependency."
230def _is_actionable_blocker(text: str) -> bool:
231 return bool(_BLOCKED_RE.search(text)) and not bool(_NON_BLOCKING_DEPENDENCY_RE.search(text))
234def _risk_inputs(combined: str, labels: tuple[str, ...]) -> dict[str, Any]:
235 keywords = _unique(match.group(0).lower() for match in _RISK_RE.finditer(combined))
236 risk_labels = [label for label in labels if "risk" in label or label.startswith("tier")]
237 return {
238 "keywords": keywords,
239 "labels": risk_labels,
240 "has_high_risk_signal": bool(keywords or risk_labels),
241 }
244def _required_docs_tests(combined: str, acceptance: list[str]) -> dict[str, Any]:
245 text = " ".join([combined, *acceptance])
246 return {
247 "docs": "required" if _DOCS_RE.search(text) else "unspecified",
248 "tests": "required" if _TESTS_RE.search(text) else "unspecified",
249 }
252def _unique(items) -> list[str]:
253 return list(dict.fromkeys(v for item in items if (v := str(item).strip())))