Coverage for src/keel/workcreation.py: 100%
88 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Deterministic policy for signal-driven work creation."""
3from __future__ import annotations
5import re
6from dataclasses import dataclass
7from typing import Any
9SCHEMA_VERSION = "keel.work-creation.v1"
10DEFAULT_MIN_OCCURRENCES = 2
11DEFAULT_MIN_CONFIDENCE = 0.6
12DEFAULT_MAX_CREATIONS = 5
13DEFAULT_NEAR_TEXT_SIMILARITY = 0.6
15DECISIONS = (
16 "create",
17 "suppress-transient",
18 "suppress-duplicate",
19 "limit-reached",
20)
22_TOKEN_RE = re.compile(r"[a-z0-9]+")
25@dataclass(frozen=True)
26class WorkDecision:
27 """One deterministic work-creation decision."""
29 candidate_id: str
30 decision: str
31 reason: str
32 title: str
33 duplicate_of: int | None = None
35 def as_dict(self) -> dict[str, Any]:
36 result: dict[str, Any] = {
37 "candidate_id": self.candidate_id,
38 "decision": self.decision,
39 "reason": self.reason,
40 "title": self.title,
41 "creates_issue": self.decision == "create",
42 }
43 if self.duplicate_of is not None:
44 result["duplicate_of"] = self.duplicate_of
45 return result
48def contract_as_dict() -> dict[str, Any]:
49 """Return the shared work-creation policy contract."""
50 return {
51 "schema_version": SCHEMA_VERSION,
52 "consumer_neutral": True,
53 "deterministic": True,
54 "stdlib_only": True,
55 "source": "signal-driven commands",
56 "decisions": list(DECISIONS),
57 "transient_filter": {
58 "default_min_occurrences": DEFAULT_MIN_OCCURRENCES,
59 "default_min_confidence": DEFAULT_MIN_CONFIDENCE,
60 "transient_outcome": "suppress-transient",
61 },
62 "dedupe": {
63 "against": "open work",
64 "keys": ["dedupe_key", "normalized_title", "near_text"],
65 "near_text_similarity": DEFAULT_NEAR_TEXT_SIMILARITY,
66 "duplicate_outcome": "suppress-duplicate",
67 },
68 "cycle_limit": {
69 "default_max_creations": DEFAULT_MAX_CREATIONS,
70 "limit_outcome": "limit-reached",
71 },
72 "consumers": [
73 "regression",
74 "review-all-day",
75 "coverage",
76 "deps-audit",
77 "flake-audit",
78 ],
79 }
82def evaluate_candidates(
83 candidates: list[dict[str, Any]] | tuple[dict[str, Any], ...],
84 existing_work: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (),
85 *,
86 min_occurrences: int = DEFAULT_MIN_OCCURRENCES,
87 min_confidence: float = DEFAULT_MIN_CONFIDENCE,
88 max_creations: int = DEFAULT_MAX_CREATIONS,
89 near_text_similarity: float = DEFAULT_NEAR_TEXT_SIMILARITY,
90) -> dict[str, Any]:
91 """Evaluate candidate signals against transient, dedupe, and cycle-limit policy."""
92 policy = {
93 "min_occurrences": _positive_int(min_occurrences, DEFAULT_MIN_OCCURRENCES),
94 "min_confidence": _confidence(min_confidence, DEFAULT_MIN_CONFIDENCE),
95 "max_creations": _positive_int(max_creations, DEFAULT_MAX_CREATIONS),
96 "near_text_similarity": _confidence(
97 near_text_similarity,
98 DEFAULT_NEAR_TEXT_SIMILARITY,
99 ),
100 }
101 normalized_existing = [
102 _normalize_existing(item)
103 for item in existing_work
104 if isinstance(item, dict) and _is_open(item)
105 ]
106 created = 0
107 decisions: list[WorkDecision] = []
108 for index, raw in enumerate(candidates, start=1):
109 if not isinstance(raw, dict):
110 continue
111 candidate = _normalize_candidate(raw, index)
112 duplicate = _find_duplicate(candidate, normalized_existing, policy["near_text_similarity"])
113 if _is_transient(candidate, policy):
114 decisions.append(WorkDecision(
115 candidate["id"],
116 "suppress-transient",
117 "transient-signal",
118 candidate["title"],
119 ))
120 elif duplicate is not None:
121 decisions.append(WorkDecision(
122 candidate["id"],
123 "suppress-duplicate",
124 "open-work-duplicate",
125 candidate["title"],
126 duplicate_of=duplicate["number"],
127 ))
128 elif created >= policy["max_creations"]:
129 decisions.append(WorkDecision(
130 candidate["id"],
131 "limit-reached",
132 "per-cycle-limit-reached",
133 candidate["title"],
134 ))
135 else:
136 created += 1
137 decisions.append(WorkDecision(
138 candidate["id"],
139 "create",
140 "eligible",
141 candidate["title"],
142 ))
143 normalized_existing.append(_created_as_existing(candidate, created))
144 decision_dicts = [decision.as_dict() for decision in decisions]
145 return {
146 "schema_version": SCHEMA_VERSION,
147 "status": "pass",
148 "policy": policy,
149 "summary": {
150 "candidates": len([item for item in candidates if isinstance(item, dict)]),
151 "create": _count(decision_dicts, "create"),
152 "suppress_transient": _count(decision_dicts, "suppress-transient"),
153 "suppress_duplicate": _count(decision_dicts, "suppress-duplicate"),
154 "limit_reached": _count(decision_dicts, "limit-reached"),
155 },
156 "decisions": decision_dicts,
157 }
160def _normalize_candidate(raw: dict[str, Any], index: int) -> dict[str, Any]:
161 title = _string(raw.get("title")) or f"candidate-{index}"
162 body = _string(raw.get("body"))
163 return {
164 "id": _string(raw.get("id")) or f"candidate-{index}",
165 "title": title,
166 "body": body,
167 "dedupe_key": _string(raw.get("dedupe_key")),
168 "occurrences": _positive_int(raw.get("occurrences"), 1),
169 "confidence": _confidence(raw.get("confidence"), 1.0),
170 "normalized_title": _normalize_text(title),
171 "tokens": _tokens(f"{title} {body}"),
172 }
175def _normalize_existing(raw: dict[str, Any]) -> dict[str, Any]:
176 title = _string(raw.get("title"))
177 body = _string(raw.get("body"))
178 return {
179 "number": raw.get("number") if isinstance(raw.get("number"), int) else None,
180 "title": title,
181 "body": body,
182 "dedupe_key": _string(raw.get("dedupe_key")),
183 "normalized_title": _normalize_text(title),
184 "tokens": _tokens(f"{title} {body}"),
185 }
188def _created_as_existing(candidate: dict[str, Any], created_index: int) -> dict[str, Any]:
189 return {
190 "number": -created_index,
191 "title": candidate["title"],
192 "body": candidate["body"],
193 "dedupe_key": candidate["dedupe_key"],
194 "normalized_title": candidate["normalized_title"],
195 "tokens": candidate["tokens"],
196 }
199def _is_transient(candidate: dict[str, Any], policy: dict[str, Any]) -> bool:
200 return (
201 candidate["occurrences"] < policy["min_occurrences"]
202 or candidate["confidence"] < policy["min_confidence"]
203 )
206def _find_duplicate(
207 candidate: dict[str, Any],
208 existing: list[dict[str, Any]],
209 threshold: float,
210) -> dict[str, Any] | None:
211 for item in existing:
212 if _same_key(candidate, item) or _same_title(candidate, item):
213 return item
214 if _jaccard(candidate["tokens"], item["tokens"]) >= threshold:
215 return item
216 return None
219def _same_key(candidate: dict[str, Any], existing: dict[str, Any]) -> bool:
220 return bool(candidate["dedupe_key"] and candidate["dedupe_key"] == existing["dedupe_key"])
223def _same_title(candidate: dict[str, Any], existing: dict[str, Any]) -> bool:
224 return bool(
225 candidate["normalized_title"]
226 and candidate["normalized_title"] == existing["normalized_title"]
227 )
230def _is_open(raw: dict[str, Any]) -> bool:
231 state = _string(raw.get("state")).lower()
232 # Missing state is treated as open so incomplete GitHub/search fixtures
233 # suppress duplicates conservatively instead of creating duplicate work.
234 return state in {"", "open"}
237def _tokens(text: str) -> set[str]:
238 return set(_TOKEN_RE.findall(_normalize_text(text)))
241def _jaccard(left: set[str], right: set[str]) -> float:
242 if not left or not right:
243 return 0.0
244 return len(left & right) / len(left | right)
247def _normalize_text(text: str) -> str:
248 return " ".join(_TOKEN_RE.findall(text.lower()))
251def _string(value: Any) -> str:
252 return value.strip() if isinstance(value, str) and value.strip() else ""
255def _positive_int(value: Any, default: int) -> int:
256 return value if isinstance(value, int) and value > 0 else default
259def _confidence(value: Any, default: float) -> float:
260 return value if isinstance(value, int | float) and 0 <= value <= 1 else default
263def _count(decisions: list[dict[str, Any]], decision: str) -> int:
264 return sum(item["decision"] == decision for item in decisions)