Coverage for src/keel/provenance.py: 100%
27 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Agent-output provenance tags for cross-agent containment."""
3from __future__ import annotations
5from typing import Any
7from .capabilities import KNOWN_CAPABILITIES
9SCHEMA_VERSION = "keel.agent-output-provenance.v1"
10UNTRUSTED_ROLE = "untrusted-agent-output"
13def contract_as_dict() -> dict[str, Any]:
14 """Return the deterministic provenance-tagging contract."""
15 return {
16 "schema_version": SCHEMA_VERSION,
17 "consumer_neutral": True,
18 "deterministic": True,
19 "stdlib_only": True,
20 "threat_model": "prior-agent-output-is-data-not-instructions",
21 "trusted_as_instructions": False,
22 "default_role": UNTRUSTED_ROLE,
23 "source_fields": ["agent_id", "step_id", "vendor", "model"],
24 "capability_scope": {
25 "default_allowed_capabilities": [],
26 "rule": "tagged content cannot expand downstream capabilities",
27 },
28 "consumers": [
29 "findings",
30 "step_handoff",
31 "review_verdict",
32 "jury_verdict",
33 "feedback_workflow",
34 ],
35 }
38def source_tag(
39 *,
40 source_agent: str | None,
41 step_id: str | None,
42 vendor: str | None = None,
43 model: str | None = None,
44 allowed_capabilities: tuple[str, ...] | list[str] = (),
45) -> dict[str, Any]:
46 """Build a source-only provenance tag for structured records."""
47 capabilities, unknown = _capabilities(allowed_capabilities)
48 return {
49 "schema_version": SCHEMA_VERSION,
50 "role": UNTRUSTED_ROLE,
51 "trusted_as_instructions": False,
52 "source": {
53 "agent_id": _value(source_agent, "unknown-agent"),
54 "step_id": _value(step_id, "unknown-step"),
55 "vendor": _optional(vendor),
56 "model": _optional(model),
57 },
58 "capability_scope": {
59 "allowed_capabilities": capabilities,
60 "unknown_capabilities": unknown,
61 "can_expand_capabilities": False,
62 },
63 }
66def tag_output(
67 content: str,
68 *,
69 source_agent: str | None,
70 step_id: str | None,
71 vendor: str | None = None,
72 model: str | None = None,
73 allowed_capabilities: tuple[str, ...] | list[str] = (),
74) -> dict[str, Any]:
75 """Wrap text produced by an agent as untrusted data with provenance."""
76 return {
77 **source_tag(
78 source_agent=source_agent,
79 step_id=step_id,
80 vendor=vendor,
81 model=model,
82 allowed_capabilities=allowed_capabilities,
83 ),
84 "content": content if isinstance(content, str) else "",
85 }
88def normalize_tag(
89 value: dict[str, Any] | None,
90 *,
91 fallback_agent: str,
92 step_id: str,
93) -> dict[str, Any]:
94 """Return a valid untrusted provenance tag, preserving valid source fields."""
95 if not isinstance(value, dict) or value.get("schema_version") != SCHEMA_VERSION:
96 return source_tag(source_agent=fallback_agent, step_id=step_id)
97 source = value.get("source") if isinstance(value.get("source"), dict) else {}
98 scope = value.get("capability_scope") if isinstance(value.get("capability_scope"), dict) else {}
99 allowed = scope.get("allowed_capabilities")
100 return source_tag(
101 source_agent=_value(source.get("agent_id"), fallback_agent),
102 step_id=_value(source.get("step_id"), step_id),
103 vendor=_optional(source.get("vendor")),
104 model=_optional(source.get("model")),
105 allowed_capabilities=allowed if isinstance(allowed, list) else [],
106 )
109def _value(value: Any, fallback: str) -> str:
110 return value.strip() if isinstance(value, str) and value.strip() else fallback
113def _optional(value: Any) -> str | None:
114 return value.strip() if isinstance(value, str) and value.strip() else None
117def _capabilities(values: tuple[str, ...] | list[str]) -> tuple[list[str], list[str]]:
118 known = set(KNOWN_CAPABILITIES)
119 clean = {
120 item.strip()
121 for item in values
122 if isinstance(item, str) and item.strip()
123 }
124 return (
125 sorted(item for item in clean if item in known),
126 sorted(item for item in clean if item not in known),
127 )