Coverage for src/keel/capture.py: 100%
253 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Consumer-neutral post-merge capture contract and verification helpers."""
3from __future__ import annotations
5import re
6from dataclasses import dataclass
7from typing import Any
9from . import config as cfg
11CAPTURE_SCHEMA_VERSION = "keel.capture.v1"
12RECONCILE_SCHEMA_VERSION = "keel.capture-reconcile.v1"
13LEARNING_DECISION_SCHEMA_VERSION = "keel.capture-learning.v1"
14MARKER_PREFIX = "compound-learning"
15STATUSES = ("applied", "deferred", "skipped")
16SKIP_REASONS = (
17 "dry-run",
18 "deferred",
19 "merge-failed",
20 "recursion-guard",
21 "capability-unavailable",
22 "no-policy",
23)
24LEARNING_DECISIONS = ("create-learning", "marker-only", "defer", "duplicate")
26_MARKER_RE = re.compile(
27 r"^compound-learning:\s+pr=(?P<pr>[1-9][0-9]*)\s+status="
28 r"(?P<status>applied|deferred|skipped(?::[a-z0-9-]+)?)$"
29)
32class CaptureError(ValueError):
33 """Raised when a capture marker or capture record is invalid."""
36@dataclass(frozen=True)
37class CaptureMarker:
38 """One stable capture marker emitted after a merged PR."""
40 pr_number: int
41 status: str
42 reason: str | None = None
44 def as_text(self) -> str:
45 return marker_text(
46 pr_number=self.pr_number,
47 status=self.status,
48 reason=self.reason,
49 )
51 def as_dict(self) -> dict[str, Any]:
52 return {
53 "schema_version": CAPTURE_SCHEMA_VERSION,
54 "prefix": MARKER_PREFIX,
55 "pr": self.pr_number,
56 "status": self.status,
57 "reason": self.reason,
58 "text": self.as_text(),
59 }
62def contract_as_dict(config: cfg.ProjectConfig | None = None) -> dict[str, Any]:
63 """Return the stable capture contract consumed by adapters and verifiers."""
64 capture_policy = _capture_policy(config)
65 return {
66 "schema_version": CAPTURE_SCHEMA_VERSION,
67 "marker": {
68 "prefix": MARKER_PREFIX,
69 "format": "compound-learning: pr=<N> status=<applied|deferred|skipped:reason>",
70 "statuses": list(STATUSES),
71 "skip_reasons": list(SKIP_REASONS),
72 "required_after_merged_pr": True,
73 },
74 "extension_slots": ["capture", "post-merge"],
75 "policy_source": "policy_pack.capture + capture/post-merge extensions",
76 "policy_enabled": bool(capture_policy.get("enabled", False)),
77 "policy_mode": capture_policy.get("mode", "extension"),
78 "recursion_guard": {
79 "enabled": True,
80 "reason": "recursion-guard",
81 "never_capture_capture_work": True,
82 },
83 "fail_soft": {
84 "enabled": True,
85 "merge_revert_on_capture_failure": False,
86 "failure_marker": "skipped:capability-unavailable",
87 },
88 "durable_artifacts": {
89 "requires_redaction": True,
90 "redaction_contract": "run_ledger.capture_redaction",
91 "core_destination": "run-ledger",
92 "project_destination": "extension-owned",
93 },
94 "learning_quality": learning_quality_contract_as_dict(config),
95 "session_end_verifier": {
96 "primitive": "capture.verify_session",
97 "cli": "keel capture-verify",
98 "missing_marker_status": "missing",
99 "invalid_marker_status": "invalid",
100 },
101 "reconcile": {
102 "schema_version": RECONCILE_SCHEMA_VERSION,
103 "primitive": "capture.reconcile_session",
104 "cli": "keel capture-reconcile",
105 "idempotent": True,
106 "never_reopens_implementation": True,
107 "never_pushes_code": True,
108 "never_merges_prs": True,
109 "actions": [
110 "emit-capture-marker",
111 "run-capture-extension",
112 "post-closure-summary",
113 "close-linked-issue",
114 "record-skip",
115 ],
116 },
117 }
120def learning_quality_contract_as_dict(config: cfg.ProjectConfig | None = None) -> dict[str, Any]:
121 """Return the consumer-neutral durable-learning quality contract."""
122 policy = _learning_policy(config)
123 dedupe = policy.get("dedupe") if isinstance(policy.get("dedupe"), dict) else {}
124 return {
125 "schema_version": LEARNING_DECISION_SCHEMA_VERSION,
126 "decisions": list(LEARNING_DECISIONS),
127 "policy_source": "policy_pack.capture.learning",
128 "policy_enabled": bool(policy.get("enabled", False)),
129 "policy_mode": policy.get("mode", "policy-unavailable"),
130 "default_decision": "marker-only",
131 "default_reason": "policy-unavailable",
132 "marker_required_for_every_merge": True,
133 "durable_learning_optional": True,
134 "dedupe": {
135 "enabled": bool(dedupe.get("enabled", True)),
136 "fingerprint": "sha256(normalized title + labels + changed files)",
137 "matching": "stable fingerprint plus configured matching rules",
138 },
139 "ledger_field": "capture.learning",
140 "closure_summary_field": "Capture",
141 }
144def marker_text(*, pr_number: int, status: str, reason: str | None = None) -> str:
145 """Render one stable capture marker."""
146 marker = build_marker(pr_number=pr_number, status=status, reason=reason)
147 suffix = marker.status if marker.reason is None else f"{marker.status}:{marker.reason}"
148 return f"{MARKER_PREFIX}: pr={marker.pr_number} status={suffix}"
151def build_marker(*, pr_number: int, status: str, reason: str | None = None) -> CaptureMarker:
152 """Validate and build a capture marker."""
153 if pr_number <= 0:
154 raise CaptureError("capture marker requires a positive PR number")
155 status, reason = normalize_status(status, reason)
156 return CaptureMarker(pr_number=pr_number, status=status, reason=reason)
159def normalize_status(status: str | None, reason: str | None = None) -> tuple[str, str | None]:
160 """Normalize ``skipped:<reason>`` into a structured status and reason."""
161 if not status:
162 raise CaptureError("capture status is required")
163 raw = status.strip()
164 if raw.startswith("skipped:"):
165 raw, embedded_reason = raw.split(":", 1)
166 reason = embedded_reason
167 if raw not in STATUSES:
168 raise CaptureError(f"unsupported capture status: {status}")
169 clean_reason = reason.strip() if isinstance(reason, str) and reason.strip() else None
170 if raw == "skipped":
171 if clean_reason not in SKIP_REASONS:
172 raise CaptureError("skipped capture requires an allowed skip reason")
173 else:
174 clean_reason = None
175 return raw, clean_reason
178def parse_marker(text: str) -> CaptureMarker:
179 """Parse a stable marker string into structured data."""
180 match = _MARKER_RE.match(text.strip())
181 if not match:
182 raise CaptureError("invalid capture marker")
183 status_text = match.group("status")
184 status, reason = normalize_status(status_text)
185 return CaptureMarker(
186 pr_number=int(match.group("pr")),
187 status=status,
188 reason=reason,
189 )
192def record_marker(
193 *,
194 pr_number: int | None,
195 status: str | None,
196 reason: str | None = None,
197 artifact: str | None = None,
198 title: str | None = None,
199 labels: list[str] | tuple[str, ...] = (),
200 changed_files: list[str] | tuple[str, ...] = (),
201 existing_records: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (),
202 config: cfg.ProjectConfig | None = None,
203) -> dict[str, Any]:
204 """Build the capture block stored in a ship run ledger record.
206 ``artifact`` is an optional reference (path or content hash) to the durable
207 capture artifact. It is the proof that an ``applied`` capture actually
208 produced something; capture reconcile treats ``applied`` with no artifact as
209 a finding. ``deferred``/``skipped`` need no artifact.
210 """
211 clean_artifact = artifact.strip() if isinstance(artifact, str) and artifact.strip() else None
212 if status is None:
213 return {
214 "schema_version": CAPTURE_SCHEMA_VERSION,
215 "status": None,
216 "reason": reason,
217 "marker_reason": None,
218 "marker": None,
219 "artifact": clean_artifact,
220 "fail_soft": True,
221 "learning": learning_decision(
222 title=title,
223 labels=labels,
224 changed_files=changed_files,
225 capture_status=None,
226 capture_reason=reason,
227 existing_records=existing_records,
228 config=config,
229 ),
230 }
231 learning = learning_decision(
232 title=title,
233 labels=labels,
234 changed_files=changed_files,
235 capture_status=status,
236 capture_reason=reason,
237 existing_records=existing_records,
238 config=config,
239 )
240 marker_reason = _marker_reason(status, reason)
241 if pr_number is None:
242 clean_status, clean_marker_reason = normalize_status(status, marker_reason)
243 return {
244 "schema_version": CAPTURE_SCHEMA_VERSION,
245 "status": clean_status,
246 "reason": reason,
247 "marker_reason": clean_marker_reason,
248 "marker": None,
249 "artifact": clean_artifact,
250 "fail_soft": True,
251 "learning": learning,
252 }
253 marker = build_marker(pr_number=pr_number, status=status, reason=marker_reason)
254 return {
255 "schema_version": CAPTURE_SCHEMA_VERSION,
256 "status": marker.status,
257 "reason": reason,
258 "marker_reason": marker.reason,
259 "marker": marker.as_text(),
260 "artifact": clean_artifact,
261 "fail_soft": True,
262 "learning": learning,
263 }
266def learning_decision(
267 *,
268 title: str | None = None,
269 labels: list[str] | tuple[str, ...] = (),
270 changed_files: list[str] | tuple[str, ...] = (),
271 capture_status: str | None = None,
272 capture_reason: str | None = None,
273 existing_records: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (),
274 config: cfg.ProjectConfig | None = None,
275) -> dict[str, Any]:
276 """Classify whether a merged PR deserves a durable learning artifact.
278 The marker is mandatory and independent from this decision. Durable learning is
279 optional, policy-driven, and deduped by a stable fingerprint so routine merges can stay
280 marker-only without losing auditability.
281 """
282 policy = _learning_policy(config)
283 fingerprint = learning_fingerprint(
284 title=title,
285 labels=labels,
286 changed_files=changed_files,
287 )
288 if _learning_dedupe_enabled(policy):
289 duplicate_of = _duplicate_learning_fingerprint(fingerprint, existing_records)
290 if duplicate_of is not None:
291 return _learning_result(
292 "duplicate",
293 reason="duplicate-learning",
294 fingerprint=fingerprint,
295 duplicate_of=duplicate_of,
296 policy=policy,
297 )
298 if not policy.get("enabled"):
299 return _learning_result(
300 "marker-only",
301 reason="policy-unavailable",
302 fingerprint=fingerprint,
303 policy=policy,
304 )
305 mode = policy.get("mode", "marker-only")
306 if mode == "create-learning":
307 if capture_status and capture_status.startswith("skipped"):
308 return _learning_result(
309 "marker-only",
310 reason="capture-skipped",
311 fingerprint=fingerprint,
312 policy=policy,
313 )
314 return _learning_result(
315 "create-learning",
316 reason=_policy_reason(policy, "policy-requested-learning"),
317 fingerprint=fingerprint,
318 policy=policy,
319 )
320 if mode == "defer":
321 return _learning_result(
322 "defer",
323 reason=_policy_reason(policy, "policy-deferred"),
324 fingerprint=fingerprint,
325 policy=policy,
326 )
327 return _learning_result(
328 "marker-only",
329 reason=_policy_reason(policy, "marker-only-policy"),
330 fingerprint=fingerprint,
331 policy=policy,
332 )
335def learning_fingerprint(
336 *,
337 title: str | None = None,
338 labels: list[str] | tuple[str, ...] = (),
339 changed_files: list[str] | tuple[str, ...] = (),
340) -> str:
341 """Return a stable, consumer-neutral dedupe fingerprint for learning candidates."""
342 import hashlib
343 import json
345 payload = {
346 "title": _normalize_text(title),
347 "labels": sorted(_normalize_text(label) for label in _strings(labels)),
348 "changed_files": sorted(_normalize_path(path) for path in _strings(changed_files)),
349 }
350 encoded = json.dumps(payload, sort_keys=True, separators=(",", ":"))
351 return hashlib.sha256(encoded.encode("utf-8")).hexdigest()
354def verify_session(
355 records: list[dict[str, Any]],
356 merged_prs: list[int] | tuple[int, ...],
357) -> dict[str, Any]:
358 """Verify that each merged PR has an applied/deferred/allowed-skip capture marker."""
359 results = [_verify_pr(records, pr) for pr in merged_prs]
360 missing = [item for item in results if item["status"] == "missing"]
361 invalid = [item for item in results if item["status"] == "invalid"]
362 status = "complete" if not missing and not invalid else "incomplete"
363 return {
364 "schema_version": CAPTURE_SCHEMA_VERSION,
365 "status": status,
366 "expected_prs": list(merged_prs),
367 "results": results,
368 "summary": {
369 "ok": sum(1 for item in results if item["ok"]),
370 "missing": len(missing),
371 "invalid": len(invalid),
372 },
373 }
376def reconcile_session(
377 records: list[dict[str, Any]],
378 merged_prs: list[int | dict[str, Any]] | tuple[int | dict[str, Any], ...],
379 *,
380 config: cfg.ProjectConfig | None = None,
381 capture_capability_available: bool = False,
382) -> dict[str, Any]:
383 """Plan idempotent post-merge reconciliation actions for capture gaps.
385 The returned plan is pure data. It never writes ledger records, comments, issues, git
386 state, or PR state; adapters may apply the listed actions after their own transport and
387 consent checks. This keeps reconcile recovery deterministic and safe to run repeatedly.
388 """
389 items = [_merged_pr_info(item) for item in merged_prs]
390 results = [
391 _reconcile_pr(
392 records,
393 item,
394 config=config,
395 capture_capability_available=capture_capability_available,
396 )
397 for item in items
398 ]
399 actionable = [item for item in results if item["actions"]]
400 blocked = [item for item in results if item["status"] in {"invalid", "ambiguous"}]
401 complete = [item for item in results if item["status"] == "complete"]
402 status = "blocked" if blocked else "actionable" if actionable else "complete"
403 return {
404 "schema_version": RECONCILE_SCHEMA_VERSION,
405 "status": status,
406 "dry_run_safe": True,
407 "idempotent": True,
408 "no_code_mutations": True,
409 "expected_prs": [item["number"] for item in items],
410 "results": results,
411 "summary": {
412 "complete": len(complete),
413 "actionable": len(actionable),
414 "blocked": len(blocked),
415 },
416 }
419def recursion_guard(
420 *,
421 title: str | None = None,
422 labels: list[str] | tuple[str, ...] = (),
423 changed_files: list[str] | tuple[str, ...] = (),
424) -> bool:
425 """Return true when capture should skip to avoid capture-on-capture recursion."""
426 title_hit = bool(title and "capture" in title.lower())
427 label_hit = any(label.lower() == "capture" for label in labels)
428 path_hit = any("/capture" in path.lower() or path.lower().endswith("capture.py")
429 for path in changed_files)
430 return title_hit or label_hit or path_hit
433def _merged_pr_info(item: int | dict[str, Any]) -> dict[str, Any]:
434 if isinstance(item, int):
435 return {
436 "number": item,
437 "title": None,
438 "labels": [],
439 "changed_files": [],
440 "issue_numbers": [],
441 }
442 number = item.get("number")
443 if not isinstance(number, int) or number <= 0:
444 raise CaptureError("merged PR entry requires a positive number")
445 return {
446 "number": number,
447 "title": item.get("title") if isinstance(item.get("title"), str) else None,
448 "labels": _strings(item.get("labels")),
449 "changed_files": _strings(item.get("changed_files")),
450 "issue_numbers": _positive_ints(item.get("issue_numbers")),
451 }
454def _reconcile_pr(
455 records: list[dict[str, Any]],
456 item: dict[str, Any],
457 *,
458 config: cfg.ProjectConfig | None,
459 capture_capability_available: bool,
460) -> dict[str, Any]:
461 pr_number = item["number"]
462 verification = _verify_pr(records, pr_number)
463 issue_numbers = _linked_issue_numbers(records, item)
464 if len(issue_numbers) > 1:
465 return _reconcile_result(
466 pr_number,
467 status="ambiguous",
468 reason="multiple linked issues found for merged PR",
469 verification=verification,
470 issue_numbers=issue_numbers,
471 blocked=True,
472 )
473 if verification["ok"]:
474 if len(issue_numbers) == 1:
475 return _reconcile_result(
476 pr_number,
477 status="actionable",
478 reason="capture marker already present; linked issue closeout can be reconciled",
479 verification=verification,
480 issue_numbers=issue_numbers,
481 marker=verification["marker"],
482 actions=[
483 _action("close-linked-issue", pr_number=pr_number,
484 issue_number=issue_numbers[0]),
485 ],
486 )
487 return _reconcile_result(
488 pr_number,
489 status="complete",
490 reason="capture marker already present",
491 verification=verification,
492 )
493 if verification["status"] == "invalid":
494 return _reconcile_result(
495 pr_number,
496 status="invalid",
497 reason=verification["reason"],
498 verification=verification,
499 issue_numbers=issue_numbers,
500 blocked=True,
501 )
502 marker_status, marker_reason, reason = _reconcile_marker_decision(
503 item,
504 config=config,
505 capture_capability_available=capture_capability_available,
506 )
507 marker = marker_text(
508 pr_number=pr_number,
509 status=marker_status,
510 reason=marker_reason,
511 )
512 actions = [
513 _action(
514 "emit-capture-marker",
515 pr_number=pr_number,
516 marker=marker,
517 status=marker_status,
518 reason=marker_reason,
519 ),
520 _action("post-closure-summary", pr_number=pr_number),
521 ]
522 if marker_status == "deferred":
523 actions.insert(0, _action("run-capture-extension", pr_number=pr_number))
524 if marker_status == "skipped":
525 actions.append(_action("record-skip", pr_number=pr_number, reason=marker_reason))
526 if len(issue_numbers) == 1:
527 actions.append(_action("close-linked-issue", pr_number=pr_number,
528 issue_number=issue_numbers[0]))
529 return _reconcile_result(
530 pr_number,
531 status="actionable",
532 reason=reason,
533 verification=verification,
534 issue_numbers=issue_numbers,
535 marker=marker,
536 actions=actions,
537 )
540def _verify_pr(records: list[dict[str, Any]], pr_number: int) -> dict[str, Any]:
541 candidates = [
542 record for record in records
543 if record.get("record_type") == "ship_run"
544 and (record.get("pull_request") or {}).get("number") == pr_number
545 ]
546 markers = [
547 capture_block.get("marker")
548 for record in candidates
549 if isinstance(capture_block := record.get("capture"), dict)
550 and capture_block.get("marker")
551 ]
552 if len(markers) > 1:
553 return {
554 "pr": pr_number,
555 "ok": False,
556 "status": "invalid",
557 "reason": "multiple capture markers found for merged PR",
558 "marker": markers[-1],
559 "marker_count": len(markers),
560 }
561 for marker in markers:
562 try:
563 parsed = parse_marker(marker)
564 except CaptureError as exc:
565 return {
566 "pr": pr_number,
567 "ok": False,
568 "status": "invalid",
569 "reason": str(exc),
570 "marker": marker,
571 }
572 if parsed.pr_number != pr_number:
573 return {
574 "pr": pr_number,
575 "ok": False,
576 "status": "invalid",
577 "reason": "marker PR does not match ledger PR",
578 "marker": marker,
579 }
580 return {
581 "pr": pr_number,
582 "ok": True,
583 "status": parsed.status,
584 "reason": parsed.reason,
585 "marker": marker,
586 }
587 return {
588 "pr": pr_number,
589 "ok": False,
590 "status": "missing",
591 "reason": "no capture marker found for merged PR",
592 "marker": None,
593 }
596def _reconcile_result(
597 pr_number: int,
598 *,
599 status: str,
600 reason: str,
601 verification: dict[str, Any],
602 issue_numbers: list[int] | None = None,
603 marker: str | None = None,
604 actions: list[dict[str, Any]] | None = None,
605 blocked: bool = False,
606) -> dict[str, Any]:
607 return {
608 "pr": pr_number,
609 "status": status,
610 "reason": reason,
611 "verification_status": verification["status"],
612 "blocked": blocked,
613 "issue_numbers": list(issue_numbers or ()),
614 "marker": marker,
615 "actions": list(actions or ()),
616 }
619def _reconcile_marker_decision(
620 item: dict[str, Any],
621 *,
622 config: cfg.ProjectConfig | None,
623 capture_capability_available: bool,
624) -> tuple[str, str | None, str]:
625 if recursion_guard(
626 title=item["title"],
627 labels=item["labels"],
628 changed_files=item["changed_files"],
629 ):
630 return "skipped", "recursion-guard", "capture recursion guard matched"
631 policy = _capture_policy(config)
632 if policy.get("enabled") and policy.get("mode", "extension") == "marker-only":
633 return "applied", None, "marker-only capture policy configured"
634 if policy.get("enabled") and policy.get("mode", "extension") == "extension":
635 if capture_capability_available:
636 return "deferred", None, "capture extension can be rerun"
637 return "skipped", "capability-unavailable", "capture extension capability unavailable"
638 return "skipped", "no-policy", "no capture policy configured"
641def _linked_issue_numbers(records: list[dict[str, Any]], item: dict[str, Any]) -> list[int]:
642 numbers = set(item["issue_numbers"])
643 pr_number = item["number"]
644 for record in records:
645 if record.get("record_type") != "ship_run":
646 continue
647 if (record.get("pull_request") or {}).get("number") != pr_number:
648 continue
649 issue_number = (record.get("issue") or {}).get("number")
650 if isinstance(issue_number, int) and issue_number > 0:
651 numbers.add(issue_number)
652 return sorted(numbers)
655def _action(
656 action_type: str,
657 *,
658 pr_number: int,
659 marker: str | None = None,
660 status: str | None = None,
661 reason: str | None = None,
662 issue_number: int | None = None,
663) -> dict[str, Any]:
664 action = {
665 "type": action_type,
666 "pr": pr_number,
667 "idempotency_key": f"{action_type}:pr-{pr_number}",
668 }
669 if marker is not None:
670 action["marker"] = marker
671 if status is not None:
672 action["status"] = status
673 if reason is not None:
674 action["reason"] = reason
675 if issue_number is not None:
676 action["issue"] = issue_number
677 action["idempotency_key"] = f"{action_type}:issue-{issue_number}:pr-{pr_number}"
678 return action
681def _capture_policy(config: cfg.ProjectConfig | None) -> dict[str, Any]:
682 if config is None or not isinstance(config.policy_pack, dict):
683 return {}
684 policy = config.policy_pack.get("capture")
685 return policy if isinstance(policy, dict) else {}
688def _learning_policy(config: cfg.ProjectConfig | None) -> dict[str, Any]:
689 policy = _capture_policy(config)
690 learning = policy.get("learning") if isinstance(policy, dict) else None
691 return learning if isinstance(learning, dict) else {}
694def _learning_dedupe_enabled(policy: dict[str, Any]) -> bool:
695 dedupe = policy.get("dedupe")
696 if not isinstance(dedupe, dict):
697 return True
698 return bool(dedupe.get("enabled", True))
701def _marker_reason(status: str, reason: str | None) -> str | None:
702 raw = status.strip()
703 if raw.startswith("skipped:"):
704 return None
705 if raw != "skipped":
706 return None
707 if reason in SKIP_REASONS:
708 return reason
709 return "no-policy"
712def _strings(value: Any) -> list[str]:
713 if not isinstance(value, list | tuple):
714 return []
715 return [item for item in value if isinstance(item, str)]
718def _positive_ints(value: Any) -> list[int]:
719 if not isinstance(value, list | tuple):
720 return []
721 return [item for item in value if isinstance(item, int) and item > 0]
724def _duplicate_learning_fingerprint(
725 fingerprint: str,
726 records: list[dict[str, Any]] | tuple[dict[str, Any], ...],
727) -> str | None:
728 for record in records:
729 if not isinstance(record, dict):
730 continue
731 capture_block = record.get("capture")
732 learning = capture_block.get("learning") if isinstance(capture_block, dict) else None
733 if not isinstance(learning, dict):
734 continue
735 if learning.get("fingerprint") != fingerprint:
736 continue
737 decision = learning.get("decision")
738 if decision in {"create-learning", "duplicate"}:
739 return str(record.get("run_id") or (record.get("pull_request") or {}).get("number"))
740 return None
743def _learning_result(
744 decision: str,
745 *,
746 reason: str,
747 fingerprint: str,
748 policy: dict[str, Any],
749 duplicate_of: str | None = None,
750) -> dict[str, Any]:
751 if decision not in LEARNING_DECISIONS:
752 raise CaptureError(f"unsupported learning decision: {decision}")
753 result = {
754 "schema_version": LEARNING_DECISION_SCHEMA_VERSION,
755 "decision": decision,
756 "reason": reason,
757 "fingerprint": fingerprint,
758 "policy_source": "policy_pack.capture.learning",
759 "policy_mode": policy.get("mode", "policy-unavailable"),
760 "durable_artifact": decision == "create-learning",
761 }
762 if duplicate_of is not None:
763 result["duplicate_of"] = duplicate_of
764 return result
767def _policy_reason(policy: dict[str, Any], default: str) -> str:
768 reason = policy.get("reason")
769 return reason.strip() if isinstance(reason, str) and reason.strip() else default
772def _normalize_text(value: str | None) -> str:
773 return " ".join(value.lower().split()) if isinstance(value, str) else ""
776def _normalize_path(value: str) -> str:
777 return "/".join(value.strip().lower().replace("\\", "/").split("/"))