Coverage for src/keel/workcreation.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Deterministic policy for signal-driven work creation.""" 

2 

3from __future__ import annotations 

4 

5import re 

6from dataclasses import dataclass 

7from typing import Any 

8 

9SCHEMA_VERSION = "keel.work-creation.v1" 

10DEFAULT_MIN_OCCURRENCES = 2 

11DEFAULT_MIN_CONFIDENCE = 0.6 

12DEFAULT_MAX_CREATIONS = 5 

13DEFAULT_NEAR_TEXT_SIMILARITY = 0.6 

14 

15DECISIONS = ( 

16 "create", 

17 "suppress-transient", 

18 "suppress-duplicate", 

19 "limit-reached", 

20) 

21 

22_TOKEN_RE = re.compile(r"[a-z0-9]+") 

23 

24 

25@dataclass(frozen=True) 

26class WorkDecision: 

27 """One deterministic work-creation decision.""" 

28 

29 candidate_id: str 

30 decision: str 

31 reason: str 

32 title: str 

33 duplicate_of: int | None = None 

34 

35 def as_dict(self) -> dict[str, Any]: 

36 result: dict[str, Any] = { 

37 "candidate_id": self.candidate_id, 

38 "decision": self.decision, 

39 "reason": self.reason, 

40 "title": self.title, 

41 "creates_issue": self.decision == "create", 

42 } 

43 if self.duplicate_of is not None: 

44 result["duplicate_of"] = self.duplicate_of 

45 return result 

46 

47 

48def contract_as_dict() -> dict[str, Any]: 

49 """Return the shared work-creation policy contract.""" 

50 return { 

51 "schema_version": SCHEMA_VERSION, 

52 "consumer_neutral": True, 

53 "deterministic": True, 

54 "stdlib_only": True, 

55 "source": "signal-driven commands", 

56 "decisions": list(DECISIONS), 

57 "transient_filter": { 

58 "default_min_occurrences": DEFAULT_MIN_OCCURRENCES, 

59 "default_min_confidence": DEFAULT_MIN_CONFIDENCE, 

60 "transient_outcome": "suppress-transient", 

61 }, 

62 "dedupe": { 

63 "against": "open work", 

64 "keys": ["dedupe_key", "normalized_title", "near_text"], 

65 "near_text_similarity": DEFAULT_NEAR_TEXT_SIMILARITY, 

66 "duplicate_outcome": "suppress-duplicate", 

67 }, 

68 "cycle_limit": { 

69 "default_max_creations": DEFAULT_MAX_CREATIONS, 

70 "limit_outcome": "limit-reached", 

71 }, 

72 "consumers": [ 

73 "regression", 

74 "review-all-day", 

75 "coverage", 

76 "deps-audit", 

77 "flake-audit", 

78 ], 

79 } 

80 

81 

82def evaluate_candidates( 

83 candidates: list[dict[str, Any]] | tuple[dict[str, Any], ...], 

84 existing_work: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (), 

85 *, 

86 min_occurrences: int = DEFAULT_MIN_OCCURRENCES, 

87 min_confidence: float = DEFAULT_MIN_CONFIDENCE, 

88 max_creations: int = DEFAULT_MAX_CREATIONS, 

89 near_text_similarity: float = DEFAULT_NEAR_TEXT_SIMILARITY, 

90) -> dict[str, Any]: 

91 """Evaluate candidate signals against transient, dedupe, and cycle-limit policy.""" 

92 policy = { 

93 "min_occurrences": _positive_int(min_occurrences, DEFAULT_MIN_OCCURRENCES), 

94 "min_confidence": _confidence(min_confidence, DEFAULT_MIN_CONFIDENCE), 

95 "max_creations": _positive_int(max_creations, DEFAULT_MAX_CREATIONS), 

96 "near_text_similarity": _confidence( 

97 near_text_similarity, 

98 DEFAULT_NEAR_TEXT_SIMILARITY, 

99 ), 

100 } 

101 normalized_existing = [ 

102 _normalize_existing(item) 

103 for item in existing_work 

104 if isinstance(item, dict) and _is_open(item) 

105 ] 

106 created = 0 

107 decisions: list[WorkDecision] = [] 

108 for index, raw in enumerate(candidates, start=1): 

109 if not isinstance(raw, dict): 

110 continue 

111 candidate = _normalize_candidate(raw, index) 

112 duplicate = _find_duplicate(candidate, normalized_existing, policy["near_text_similarity"]) 

113 if _is_transient(candidate, policy): 

114 decisions.append(WorkDecision( 

115 candidate["id"], 

116 "suppress-transient", 

117 "transient-signal", 

118 candidate["title"], 

119 )) 

120 elif duplicate is not None: 

121 decisions.append(WorkDecision( 

122 candidate["id"], 

123 "suppress-duplicate", 

124 "open-work-duplicate", 

125 candidate["title"], 

126 duplicate_of=duplicate["number"], 

127 )) 

128 elif created >= policy["max_creations"]: 

129 decisions.append(WorkDecision( 

130 candidate["id"], 

131 "limit-reached", 

132 "per-cycle-limit-reached", 

133 candidate["title"], 

134 )) 

135 else: 

136 created += 1 

137 decisions.append(WorkDecision( 

138 candidate["id"], 

139 "create", 

140 "eligible", 

141 candidate["title"], 

142 )) 

143 normalized_existing.append(_created_as_existing(candidate, created)) 

144 decision_dicts = [decision.as_dict() for decision in decisions] 

145 return { 

146 "schema_version": SCHEMA_VERSION, 

147 "status": "pass", 

148 "policy": policy, 

149 "summary": { 

150 "candidates": len([item for item in candidates if isinstance(item, dict)]), 

151 "create": _count(decision_dicts, "create"), 

152 "suppress_transient": _count(decision_dicts, "suppress-transient"), 

153 "suppress_duplicate": _count(decision_dicts, "suppress-duplicate"), 

154 "limit_reached": _count(decision_dicts, "limit-reached"), 

155 }, 

156 "decisions": decision_dicts, 

157 } 

158 

159 

160def _normalize_candidate(raw: dict[str, Any], index: int) -> dict[str, Any]: 

161 title = _string(raw.get("title")) or f"candidate-{index}" 

162 body = _string(raw.get("body")) 

163 return { 

164 "id": _string(raw.get("id")) or f"candidate-{index}", 

165 "title": title, 

166 "body": body, 

167 "dedupe_key": _string(raw.get("dedupe_key")), 

168 "occurrences": _positive_int(raw.get("occurrences"), 1), 

169 "confidence": _confidence(raw.get("confidence"), 1.0), 

170 "normalized_title": _normalize_text(title), 

171 "tokens": _tokens(f"{title} {body}"), 

172 } 

173 

174 

175def _normalize_existing(raw: dict[str, Any]) -> dict[str, Any]: 

176 title = _string(raw.get("title")) 

177 body = _string(raw.get("body")) 

178 return { 

179 "number": raw.get("number") if isinstance(raw.get("number"), int) else None, 

180 "title": title, 

181 "body": body, 

182 "dedupe_key": _string(raw.get("dedupe_key")), 

183 "normalized_title": _normalize_text(title), 

184 "tokens": _tokens(f"{title} {body}"), 

185 } 

186 

187 

188def _created_as_existing(candidate: dict[str, Any], created_index: int) -> dict[str, Any]: 

189 return { 

190 "number": -created_index, 

191 "title": candidate["title"], 

192 "body": candidate["body"], 

193 "dedupe_key": candidate["dedupe_key"], 

194 "normalized_title": candidate["normalized_title"], 

195 "tokens": candidate["tokens"], 

196 } 

197 

198 

199def _is_transient(candidate: dict[str, Any], policy: dict[str, Any]) -> bool: 

200 return ( 

201 candidate["occurrences"] < policy["min_occurrences"] 

202 or candidate["confidence"] < policy["min_confidence"] 

203 ) 

204 

205 

206def _find_duplicate( 

207 candidate: dict[str, Any], 

208 existing: list[dict[str, Any]], 

209 threshold: float, 

210) -> dict[str, Any] | None: 

211 for item in existing: 

212 if _same_key(candidate, item) or _same_title(candidate, item): 

213 return item 

214 if _jaccard(candidate["tokens"], item["tokens"]) >= threshold: 

215 return item 

216 return None 

217 

218 

219def _same_key(candidate: dict[str, Any], existing: dict[str, Any]) -> bool: 

220 return bool(candidate["dedupe_key"] and candidate["dedupe_key"] == existing["dedupe_key"]) 

221 

222 

223def _same_title(candidate: dict[str, Any], existing: dict[str, Any]) -> bool: 

224 return bool( 

225 candidate["normalized_title"] 

226 and candidate["normalized_title"] == existing["normalized_title"] 

227 ) 

228 

229 

230def _is_open(raw: dict[str, Any]) -> bool: 

231 state = _string(raw.get("state")).lower() 

232 # Missing state is treated as open so incomplete GitHub/search fixtures 

233 # suppress duplicates conservatively instead of creating duplicate work. 

234 return state in {"", "open"} 

235 

236 

237def _tokens(text: str) -> set[str]: 

238 return set(_TOKEN_RE.findall(_normalize_text(text))) 

239 

240 

241def _jaccard(left: set[str], right: set[str]) -> float: 

242 if not left or not right: 

243 return 0.0 

244 return len(left & right) / len(left | right) 

245 

246 

247def _normalize_text(text: str) -> str: 

248 return " ".join(_TOKEN_RE.findall(text.lower())) 

249 

250 

251def _string(value: Any) -> str: 

252 return value.strip() if isinstance(value, str) and value.strip() else "" 

253 

254 

255def _positive_int(value: Any, default: int) -> int: 

256 return value if isinstance(value, int) and value > 0 else default 

257 

258 

259def _confidence(value: Any, default: float) -> float: 

260 return value if isinstance(value, int | float) and 0 <= value <= 1 else default 

261 

262 

263def _count(decisions: list[dict[str, Any]], decision: str) -> int: 

264 return sum(item["decision"] == decision for item in decisions)