Coverage for src/keel/capture.py: 100%

253 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Consumer-neutral post-merge capture contract and verification helpers.""" 

2 

3from __future__ import annotations 

4 

5import re 

6from dataclasses import dataclass 

7from typing import Any 

8 

9from . import config as cfg 

10 

11CAPTURE_SCHEMA_VERSION = "keel.capture.v1" 

12RECONCILE_SCHEMA_VERSION = "keel.capture-reconcile.v1" 

13LEARNING_DECISION_SCHEMA_VERSION = "keel.capture-learning.v1" 

14MARKER_PREFIX = "compound-learning" 

15STATUSES = ("applied", "deferred", "skipped") 

16SKIP_REASONS = ( 

17 "dry-run", 

18 "deferred", 

19 "merge-failed", 

20 "recursion-guard", 

21 "capability-unavailable", 

22 "no-policy", 

23) 

24LEARNING_DECISIONS = ("create-learning", "marker-only", "defer", "duplicate") 

25 

26_MARKER_RE = re.compile( 

27 r"^compound-learning:\s+pr=(?P<pr>[1-9][0-9]*)\s+status=" 

28 r"(?P<status>applied|deferred|skipped(?::[a-z0-9-]+)?)$" 

29) 

30 

31 

32class CaptureError(ValueError): 

33 """Raised when a capture marker or capture record is invalid.""" 

34 

35 

36@dataclass(frozen=True) 

37class CaptureMarker: 

38 """One stable capture marker emitted after a merged PR.""" 

39 

40 pr_number: int 

41 status: str 

42 reason: str | None = None 

43 

44 def as_text(self) -> str: 

45 return marker_text( 

46 pr_number=self.pr_number, 

47 status=self.status, 

48 reason=self.reason, 

49 ) 

50 

51 def as_dict(self) -> dict[str, Any]: 

52 return { 

53 "schema_version": CAPTURE_SCHEMA_VERSION, 

54 "prefix": MARKER_PREFIX, 

55 "pr": self.pr_number, 

56 "status": self.status, 

57 "reason": self.reason, 

58 "text": self.as_text(), 

59 } 

60 

61 

62def contract_as_dict(config: cfg.ProjectConfig | None = None) -> dict[str, Any]: 

63 """Return the stable capture contract consumed by adapters and verifiers.""" 

64 capture_policy = _capture_policy(config) 

65 return { 

66 "schema_version": CAPTURE_SCHEMA_VERSION, 

67 "marker": { 

68 "prefix": MARKER_PREFIX, 

69 "format": "compound-learning: pr=<N> status=<applied|deferred|skipped:reason>", 

70 "statuses": list(STATUSES), 

71 "skip_reasons": list(SKIP_REASONS), 

72 "required_after_merged_pr": True, 

73 }, 

74 "extension_slots": ["capture", "post-merge"], 

75 "policy_source": "policy_pack.capture + capture/post-merge extensions", 

76 "policy_enabled": bool(capture_policy.get("enabled", False)), 

77 "policy_mode": capture_policy.get("mode", "extension"), 

78 "recursion_guard": { 

79 "enabled": True, 

80 "reason": "recursion-guard", 

81 "never_capture_capture_work": True, 

82 }, 

83 "fail_soft": { 

84 "enabled": True, 

85 "merge_revert_on_capture_failure": False, 

86 "failure_marker": "skipped:capability-unavailable", 

87 }, 

88 "durable_artifacts": { 

89 "requires_redaction": True, 

90 "redaction_contract": "run_ledger.capture_redaction", 

91 "core_destination": "run-ledger", 

92 "project_destination": "extension-owned", 

93 }, 

94 "learning_quality": learning_quality_contract_as_dict(config), 

95 "session_end_verifier": { 

96 "primitive": "capture.verify_session", 

97 "cli": "keel capture-verify", 

98 "missing_marker_status": "missing", 

99 "invalid_marker_status": "invalid", 

100 }, 

101 "reconcile": { 

102 "schema_version": RECONCILE_SCHEMA_VERSION, 

103 "primitive": "capture.reconcile_session", 

104 "cli": "keel capture-reconcile", 

105 "idempotent": True, 

106 "never_reopens_implementation": True, 

107 "never_pushes_code": True, 

108 "never_merges_prs": True, 

109 "actions": [ 

110 "emit-capture-marker", 

111 "run-capture-extension", 

112 "post-closure-summary", 

113 "close-linked-issue", 

114 "record-skip", 

115 ], 

116 }, 

117 } 

118 

119 

120def learning_quality_contract_as_dict(config: cfg.ProjectConfig | None = None) -> dict[str, Any]: 

121 """Return the consumer-neutral durable-learning quality contract.""" 

122 policy = _learning_policy(config) 

123 dedupe = policy.get("dedupe") if isinstance(policy.get("dedupe"), dict) else {} 

124 return { 

125 "schema_version": LEARNING_DECISION_SCHEMA_VERSION, 

126 "decisions": list(LEARNING_DECISIONS), 

127 "policy_source": "policy_pack.capture.learning", 

128 "policy_enabled": bool(policy.get("enabled", False)), 

129 "policy_mode": policy.get("mode", "policy-unavailable"), 

130 "default_decision": "marker-only", 

131 "default_reason": "policy-unavailable", 

132 "marker_required_for_every_merge": True, 

133 "durable_learning_optional": True, 

134 "dedupe": { 

135 "enabled": bool(dedupe.get("enabled", True)), 

136 "fingerprint": "sha256(normalized title + labels + changed files)", 

137 "matching": "stable fingerprint plus configured matching rules", 

138 }, 

139 "ledger_field": "capture.learning", 

140 "closure_summary_field": "Capture", 

141 } 

142 

143 

144def marker_text(*, pr_number: int, status: str, reason: str | None = None) -> str: 

145 """Render one stable capture marker.""" 

146 marker = build_marker(pr_number=pr_number, status=status, reason=reason) 

147 suffix = marker.status if marker.reason is None else f"{marker.status}:{marker.reason}" 

148 return f"{MARKER_PREFIX}: pr={marker.pr_number} status={suffix}" 

149 

150 

151def build_marker(*, pr_number: int, status: str, reason: str | None = None) -> CaptureMarker: 

152 """Validate and build a capture marker.""" 

153 if pr_number <= 0: 

154 raise CaptureError("capture marker requires a positive PR number") 

155 status, reason = normalize_status(status, reason) 

156 return CaptureMarker(pr_number=pr_number, status=status, reason=reason) 

157 

158 

159def normalize_status(status: str | None, reason: str | None = None) -> tuple[str, str | None]: 

160 """Normalize ``skipped:<reason>`` into a structured status and reason.""" 

161 if not status: 

162 raise CaptureError("capture status is required") 

163 raw = status.strip() 

164 if raw.startswith("skipped:"): 

165 raw, embedded_reason = raw.split(":", 1) 

166 reason = embedded_reason 

167 if raw not in STATUSES: 

168 raise CaptureError(f"unsupported capture status: {status}") 

169 clean_reason = reason.strip() if isinstance(reason, str) and reason.strip() else None 

170 if raw == "skipped": 

171 if clean_reason not in SKIP_REASONS: 

172 raise CaptureError("skipped capture requires an allowed skip reason") 

173 else: 

174 clean_reason = None 

175 return raw, clean_reason 

176 

177 

178def parse_marker(text: str) -> CaptureMarker: 

179 """Parse a stable marker string into structured data.""" 

180 match = _MARKER_RE.match(text.strip()) 

181 if not match: 

182 raise CaptureError("invalid capture marker") 

183 status_text = match.group("status") 

184 status, reason = normalize_status(status_text) 

185 return CaptureMarker( 

186 pr_number=int(match.group("pr")), 

187 status=status, 

188 reason=reason, 

189 ) 

190 

191 

192def record_marker( 

193 *, 

194 pr_number: int | None, 

195 status: str | None, 

196 reason: str | None = None, 

197 artifact: str | None = None, 

198 title: str | None = None, 

199 labels: list[str] | tuple[str, ...] = (), 

200 changed_files: list[str] | tuple[str, ...] = (), 

201 existing_records: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (), 

202 config: cfg.ProjectConfig | None = None, 

203) -> dict[str, Any]: 

204 """Build the capture block stored in a ship run ledger record. 

205 

206 ``artifact`` is an optional reference (path or content hash) to the durable 

207 capture artifact. It is the proof that an ``applied`` capture actually 

208 produced something; capture reconcile treats ``applied`` with no artifact as 

209 a finding. ``deferred``/``skipped`` need no artifact. 

210 """ 

211 clean_artifact = artifact.strip() if isinstance(artifact, str) and artifact.strip() else None 

212 if status is None: 

213 return { 

214 "schema_version": CAPTURE_SCHEMA_VERSION, 

215 "status": None, 

216 "reason": reason, 

217 "marker_reason": None, 

218 "marker": None, 

219 "artifact": clean_artifact, 

220 "fail_soft": True, 

221 "learning": learning_decision( 

222 title=title, 

223 labels=labels, 

224 changed_files=changed_files, 

225 capture_status=None, 

226 capture_reason=reason, 

227 existing_records=existing_records, 

228 config=config, 

229 ), 

230 } 

231 learning = learning_decision( 

232 title=title, 

233 labels=labels, 

234 changed_files=changed_files, 

235 capture_status=status, 

236 capture_reason=reason, 

237 existing_records=existing_records, 

238 config=config, 

239 ) 

240 marker_reason = _marker_reason(status, reason) 

241 if pr_number is None: 

242 clean_status, clean_marker_reason = normalize_status(status, marker_reason) 

243 return { 

244 "schema_version": CAPTURE_SCHEMA_VERSION, 

245 "status": clean_status, 

246 "reason": reason, 

247 "marker_reason": clean_marker_reason, 

248 "marker": None, 

249 "artifact": clean_artifact, 

250 "fail_soft": True, 

251 "learning": learning, 

252 } 

253 marker = build_marker(pr_number=pr_number, status=status, reason=marker_reason) 

254 return { 

255 "schema_version": CAPTURE_SCHEMA_VERSION, 

256 "status": marker.status, 

257 "reason": reason, 

258 "marker_reason": marker.reason, 

259 "marker": marker.as_text(), 

260 "artifact": clean_artifact, 

261 "fail_soft": True, 

262 "learning": learning, 

263 } 

264 

265 

266def learning_decision( 

267 *, 

268 title: str | None = None, 

269 labels: list[str] | tuple[str, ...] = (), 

270 changed_files: list[str] | tuple[str, ...] = (), 

271 capture_status: str | None = None, 

272 capture_reason: str | None = None, 

273 existing_records: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (), 

274 config: cfg.ProjectConfig | None = None, 

275) -> dict[str, Any]: 

276 """Classify whether a merged PR deserves a durable learning artifact. 

277 

278 The marker is mandatory and independent from this decision. Durable learning is 

279 optional, policy-driven, and deduped by a stable fingerprint so routine merges can stay 

280 marker-only without losing auditability. 

281 """ 

282 policy = _learning_policy(config) 

283 fingerprint = learning_fingerprint( 

284 title=title, 

285 labels=labels, 

286 changed_files=changed_files, 

287 ) 

288 if _learning_dedupe_enabled(policy): 

289 duplicate_of = _duplicate_learning_fingerprint(fingerprint, existing_records) 

290 if duplicate_of is not None: 

291 return _learning_result( 

292 "duplicate", 

293 reason="duplicate-learning", 

294 fingerprint=fingerprint, 

295 duplicate_of=duplicate_of, 

296 policy=policy, 

297 ) 

298 if not policy.get("enabled"): 

299 return _learning_result( 

300 "marker-only", 

301 reason="policy-unavailable", 

302 fingerprint=fingerprint, 

303 policy=policy, 

304 ) 

305 mode = policy.get("mode", "marker-only") 

306 if mode == "create-learning": 

307 if capture_status and capture_status.startswith("skipped"): 

308 return _learning_result( 

309 "marker-only", 

310 reason="capture-skipped", 

311 fingerprint=fingerprint, 

312 policy=policy, 

313 ) 

314 return _learning_result( 

315 "create-learning", 

316 reason=_policy_reason(policy, "policy-requested-learning"), 

317 fingerprint=fingerprint, 

318 policy=policy, 

319 ) 

320 if mode == "defer": 

321 return _learning_result( 

322 "defer", 

323 reason=_policy_reason(policy, "policy-deferred"), 

324 fingerprint=fingerprint, 

325 policy=policy, 

326 ) 

327 return _learning_result( 

328 "marker-only", 

329 reason=_policy_reason(policy, "marker-only-policy"), 

330 fingerprint=fingerprint, 

331 policy=policy, 

332 ) 

333 

334 

335def learning_fingerprint( 

336 *, 

337 title: str | None = None, 

338 labels: list[str] | tuple[str, ...] = (), 

339 changed_files: list[str] | tuple[str, ...] = (), 

340) -> str: 

341 """Return a stable, consumer-neutral dedupe fingerprint for learning candidates.""" 

342 import hashlib 

343 import json 

344 

345 payload = { 

346 "title": _normalize_text(title), 

347 "labels": sorted(_normalize_text(label) for label in _strings(labels)), 

348 "changed_files": sorted(_normalize_path(path) for path in _strings(changed_files)), 

349 } 

350 encoded = json.dumps(payload, sort_keys=True, separators=(",", ":")) 

351 return hashlib.sha256(encoded.encode("utf-8")).hexdigest() 

352 

353 

354def verify_session( 

355 records: list[dict[str, Any]], 

356 merged_prs: list[int] | tuple[int, ...], 

357) -> dict[str, Any]: 

358 """Verify that each merged PR has an applied/deferred/allowed-skip capture marker.""" 

359 results = [_verify_pr(records, pr) for pr in merged_prs] 

360 missing = [item for item in results if item["status"] == "missing"] 

361 invalid = [item for item in results if item["status"] == "invalid"] 

362 status = "complete" if not missing and not invalid else "incomplete" 

363 return { 

364 "schema_version": CAPTURE_SCHEMA_VERSION, 

365 "status": status, 

366 "expected_prs": list(merged_prs), 

367 "results": results, 

368 "summary": { 

369 "ok": sum(1 for item in results if item["ok"]), 

370 "missing": len(missing), 

371 "invalid": len(invalid), 

372 }, 

373 } 

374 

375 

376def reconcile_session( 

377 records: list[dict[str, Any]], 

378 merged_prs: list[int | dict[str, Any]] | tuple[int | dict[str, Any], ...], 

379 *, 

380 config: cfg.ProjectConfig | None = None, 

381 capture_capability_available: bool = False, 

382) -> dict[str, Any]: 

383 """Plan idempotent post-merge reconciliation actions for capture gaps. 

384 

385 The returned plan is pure data. It never writes ledger records, comments, issues, git 

386 state, or PR state; adapters may apply the listed actions after their own transport and 

387 consent checks. This keeps reconcile recovery deterministic and safe to run repeatedly. 

388 """ 

389 items = [_merged_pr_info(item) for item in merged_prs] 

390 results = [ 

391 _reconcile_pr( 

392 records, 

393 item, 

394 config=config, 

395 capture_capability_available=capture_capability_available, 

396 ) 

397 for item in items 

398 ] 

399 actionable = [item for item in results if item["actions"]] 

400 blocked = [item for item in results if item["status"] in {"invalid", "ambiguous"}] 

401 complete = [item for item in results if item["status"] == "complete"] 

402 status = "blocked" if blocked else "actionable" if actionable else "complete" 

403 return { 

404 "schema_version": RECONCILE_SCHEMA_VERSION, 

405 "status": status, 

406 "dry_run_safe": True, 

407 "idempotent": True, 

408 "no_code_mutations": True, 

409 "expected_prs": [item["number"] for item in items], 

410 "results": results, 

411 "summary": { 

412 "complete": len(complete), 

413 "actionable": len(actionable), 

414 "blocked": len(blocked), 

415 }, 

416 } 

417 

418 

419def recursion_guard( 

420 *, 

421 title: str | None = None, 

422 labels: list[str] | tuple[str, ...] = (), 

423 changed_files: list[str] | tuple[str, ...] = (), 

424) -> bool: 

425 """Return true when capture should skip to avoid capture-on-capture recursion.""" 

426 title_hit = bool(title and "capture" in title.lower()) 

427 label_hit = any(label.lower() == "capture" for label in labels) 

428 path_hit = any("/capture" in path.lower() or path.lower().endswith("capture.py") 

429 for path in changed_files) 

430 return title_hit or label_hit or path_hit 

431 

432 

433def _merged_pr_info(item: int | dict[str, Any]) -> dict[str, Any]: 

434 if isinstance(item, int): 

435 return { 

436 "number": item, 

437 "title": None, 

438 "labels": [], 

439 "changed_files": [], 

440 "issue_numbers": [], 

441 } 

442 number = item.get("number") 

443 if not isinstance(number, int) or number <= 0: 

444 raise CaptureError("merged PR entry requires a positive number") 

445 return { 

446 "number": number, 

447 "title": item.get("title") if isinstance(item.get("title"), str) else None, 

448 "labels": _strings(item.get("labels")), 

449 "changed_files": _strings(item.get("changed_files")), 

450 "issue_numbers": _positive_ints(item.get("issue_numbers")), 

451 } 

452 

453 

454def _reconcile_pr( 

455 records: list[dict[str, Any]], 

456 item: dict[str, Any], 

457 *, 

458 config: cfg.ProjectConfig | None, 

459 capture_capability_available: bool, 

460) -> dict[str, Any]: 

461 pr_number = item["number"] 

462 verification = _verify_pr(records, pr_number) 

463 issue_numbers = _linked_issue_numbers(records, item) 

464 if len(issue_numbers) > 1: 

465 return _reconcile_result( 

466 pr_number, 

467 status="ambiguous", 

468 reason="multiple linked issues found for merged PR", 

469 verification=verification, 

470 issue_numbers=issue_numbers, 

471 blocked=True, 

472 ) 

473 if verification["ok"]: 

474 if len(issue_numbers) == 1: 

475 return _reconcile_result( 

476 pr_number, 

477 status="actionable", 

478 reason="capture marker already present; linked issue closeout can be reconciled", 

479 verification=verification, 

480 issue_numbers=issue_numbers, 

481 marker=verification["marker"], 

482 actions=[ 

483 _action("close-linked-issue", pr_number=pr_number, 

484 issue_number=issue_numbers[0]), 

485 ], 

486 ) 

487 return _reconcile_result( 

488 pr_number, 

489 status="complete", 

490 reason="capture marker already present", 

491 verification=verification, 

492 ) 

493 if verification["status"] == "invalid": 

494 return _reconcile_result( 

495 pr_number, 

496 status="invalid", 

497 reason=verification["reason"], 

498 verification=verification, 

499 issue_numbers=issue_numbers, 

500 blocked=True, 

501 ) 

502 marker_status, marker_reason, reason = _reconcile_marker_decision( 

503 item, 

504 config=config, 

505 capture_capability_available=capture_capability_available, 

506 ) 

507 marker = marker_text( 

508 pr_number=pr_number, 

509 status=marker_status, 

510 reason=marker_reason, 

511 ) 

512 actions = [ 

513 _action( 

514 "emit-capture-marker", 

515 pr_number=pr_number, 

516 marker=marker, 

517 status=marker_status, 

518 reason=marker_reason, 

519 ), 

520 _action("post-closure-summary", pr_number=pr_number), 

521 ] 

522 if marker_status == "deferred": 

523 actions.insert(0, _action("run-capture-extension", pr_number=pr_number)) 

524 if marker_status == "skipped": 

525 actions.append(_action("record-skip", pr_number=pr_number, reason=marker_reason)) 

526 if len(issue_numbers) == 1: 

527 actions.append(_action("close-linked-issue", pr_number=pr_number, 

528 issue_number=issue_numbers[0])) 

529 return _reconcile_result( 

530 pr_number, 

531 status="actionable", 

532 reason=reason, 

533 verification=verification, 

534 issue_numbers=issue_numbers, 

535 marker=marker, 

536 actions=actions, 

537 ) 

538 

539 

540def _verify_pr(records: list[dict[str, Any]], pr_number: int) -> dict[str, Any]: 

541 candidates = [ 

542 record for record in records 

543 if record.get("record_type") == "ship_run" 

544 and (record.get("pull_request") or {}).get("number") == pr_number 

545 ] 

546 markers = [ 

547 capture_block.get("marker") 

548 for record in candidates 

549 if isinstance(capture_block := record.get("capture"), dict) 

550 and capture_block.get("marker") 

551 ] 

552 if len(markers) > 1: 

553 return { 

554 "pr": pr_number, 

555 "ok": False, 

556 "status": "invalid", 

557 "reason": "multiple capture markers found for merged PR", 

558 "marker": markers[-1], 

559 "marker_count": len(markers), 

560 } 

561 for marker in markers: 

562 try: 

563 parsed = parse_marker(marker) 

564 except CaptureError as exc: 

565 return { 

566 "pr": pr_number, 

567 "ok": False, 

568 "status": "invalid", 

569 "reason": str(exc), 

570 "marker": marker, 

571 } 

572 if parsed.pr_number != pr_number: 

573 return { 

574 "pr": pr_number, 

575 "ok": False, 

576 "status": "invalid", 

577 "reason": "marker PR does not match ledger PR", 

578 "marker": marker, 

579 } 

580 return { 

581 "pr": pr_number, 

582 "ok": True, 

583 "status": parsed.status, 

584 "reason": parsed.reason, 

585 "marker": marker, 

586 } 

587 return { 

588 "pr": pr_number, 

589 "ok": False, 

590 "status": "missing", 

591 "reason": "no capture marker found for merged PR", 

592 "marker": None, 

593 } 

594 

595 

596def _reconcile_result( 

597 pr_number: int, 

598 *, 

599 status: str, 

600 reason: str, 

601 verification: dict[str, Any], 

602 issue_numbers: list[int] | None = None, 

603 marker: str | None = None, 

604 actions: list[dict[str, Any]] | None = None, 

605 blocked: bool = False, 

606) -> dict[str, Any]: 

607 return { 

608 "pr": pr_number, 

609 "status": status, 

610 "reason": reason, 

611 "verification_status": verification["status"], 

612 "blocked": blocked, 

613 "issue_numbers": list(issue_numbers or ()), 

614 "marker": marker, 

615 "actions": list(actions or ()), 

616 } 

617 

618 

619def _reconcile_marker_decision( 

620 item: dict[str, Any], 

621 *, 

622 config: cfg.ProjectConfig | None, 

623 capture_capability_available: bool, 

624) -> tuple[str, str | None, str]: 

625 if recursion_guard( 

626 title=item["title"], 

627 labels=item["labels"], 

628 changed_files=item["changed_files"], 

629 ): 

630 return "skipped", "recursion-guard", "capture recursion guard matched" 

631 policy = _capture_policy(config) 

632 if policy.get("enabled") and policy.get("mode", "extension") == "marker-only": 

633 return "applied", None, "marker-only capture policy configured" 

634 if policy.get("enabled") and policy.get("mode", "extension") == "extension": 

635 if capture_capability_available: 

636 return "deferred", None, "capture extension can be rerun" 

637 return "skipped", "capability-unavailable", "capture extension capability unavailable" 

638 return "skipped", "no-policy", "no capture policy configured" 

639 

640 

641def _linked_issue_numbers(records: list[dict[str, Any]], item: dict[str, Any]) -> list[int]: 

642 numbers = set(item["issue_numbers"]) 

643 pr_number = item["number"] 

644 for record in records: 

645 if record.get("record_type") != "ship_run": 

646 continue 

647 if (record.get("pull_request") or {}).get("number") != pr_number: 

648 continue 

649 issue_number = (record.get("issue") or {}).get("number") 

650 if isinstance(issue_number, int) and issue_number > 0: 

651 numbers.add(issue_number) 

652 return sorted(numbers) 

653 

654 

655def _action( 

656 action_type: str, 

657 *, 

658 pr_number: int, 

659 marker: str | None = None, 

660 status: str | None = None, 

661 reason: str | None = None, 

662 issue_number: int | None = None, 

663) -> dict[str, Any]: 

664 action = { 

665 "type": action_type, 

666 "pr": pr_number, 

667 "idempotency_key": f"{action_type}:pr-{pr_number}", 

668 } 

669 if marker is not None: 

670 action["marker"] = marker 

671 if status is not None: 

672 action["status"] = status 

673 if reason is not None: 

674 action["reason"] = reason 

675 if issue_number is not None: 

676 action["issue"] = issue_number 

677 action["idempotency_key"] = f"{action_type}:issue-{issue_number}:pr-{pr_number}" 

678 return action 

679 

680 

681def _capture_policy(config: cfg.ProjectConfig | None) -> dict[str, Any]: 

682 if config is None or not isinstance(config.policy_pack, dict): 

683 return {} 

684 policy = config.policy_pack.get("capture") 

685 return policy if isinstance(policy, dict) else {} 

686 

687 

688def _learning_policy(config: cfg.ProjectConfig | None) -> dict[str, Any]: 

689 policy = _capture_policy(config) 

690 learning = policy.get("learning") if isinstance(policy, dict) else None 

691 return learning if isinstance(learning, dict) else {} 

692 

693 

694def _learning_dedupe_enabled(policy: dict[str, Any]) -> bool: 

695 dedupe = policy.get("dedupe") 

696 if not isinstance(dedupe, dict): 

697 return True 

698 return bool(dedupe.get("enabled", True)) 

699 

700 

701def _marker_reason(status: str, reason: str | None) -> str | None: 

702 raw = status.strip() 

703 if raw.startswith("skipped:"): 

704 return None 

705 if raw != "skipped": 

706 return None 

707 if reason in SKIP_REASONS: 

708 return reason 

709 return "no-policy" 

710 

711 

712def _strings(value: Any) -> list[str]: 

713 if not isinstance(value, list | tuple): 

714 return [] 

715 return [item for item in value if isinstance(item, str)] 

716 

717 

718def _positive_ints(value: Any) -> list[int]: 

719 if not isinstance(value, list | tuple): 

720 return [] 

721 return [item for item in value if isinstance(item, int) and item > 0] 

722 

723 

724def _duplicate_learning_fingerprint( 

725 fingerprint: str, 

726 records: list[dict[str, Any]] | tuple[dict[str, Any], ...], 

727) -> str | None: 

728 for record in records: 

729 if not isinstance(record, dict): 

730 continue 

731 capture_block = record.get("capture") 

732 learning = capture_block.get("learning") if isinstance(capture_block, dict) else None 

733 if not isinstance(learning, dict): 

734 continue 

735 if learning.get("fingerprint") != fingerprint: 

736 continue 

737 decision = learning.get("decision") 

738 if decision in {"create-learning", "duplicate"}: 

739 return str(record.get("run_id") or (record.get("pull_request") or {}).get("number")) 

740 return None 

741 

742 

743def _learning_result( 

744 decision: str, 

745 *, 

746 reason: str, 

747 fingerprint: str, 

748 policy: dict[str, Any], 

749 duplicate_of: str | None = None, 

750) -> dict[str, Any]: 

751 if decision not in LEARNING_DECISIONS: 

752 raise CaptureError(f"unsupported learning decision: {decision}") 

753 result = { 

754 "schema_version": LEARNING_DECISION_SCHEMA_VERSION, 

755 "decision": decision, 

756 "reason": reason, 

757 "fingerprint": fingerprint, 

758 "policy_source": "policy_pack.capture.learning", 

759 "policy_mode": policy.get("mode", "policy-unavailable"), 

760 "durable_artifact": decision == "create-learning", 

761 } 

762 if duplicate_of is not None: 

763 result["duplicate_of"] = duplicate_of 

764 return result 

765 

766 

767def _policy_reason(policy: dict[str, Any], default: str) -> str: 

768 reason = policy.get("reason") 

769 return reason.strip() if isinstance(reason, str) and reason.strip() else default 

770 

771 

772def _normalize_text(value: str | None) -> str: 

773 return " ".join(value.lower().split()) if isinstance(value, str) else "" 

774 

775 

776def _normalize_path(value: str) -> str: 

777 return "/".join(value.strip().lower().replace("\\", "/").split("/"))