Coverage for src/keel/provenance.py: 100%

27 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Agent-output provenance tags for cross-agent containment.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6 

7from .capabilities import KNOWN_CAPABILITIES 

8 

9SCHEMA_VERSION = "keel.agent-output-provenance.v1" 

10UNTRUSTED_ROLE = "untrusted-agent-output" 

11 

12 

13def contract_as_dict() -> dict[str, Any]: 

14 """Return the deterministic provenance-tagging contract.""" 

15 return { 

16 "schema_version": SCHEMA_VERSION, 

17 "consumer_neutral": True, 

18 "deterministic": True, 

19 "stdlib_only": True, 

20 "threat_model": "prior-agent-output-is-data-not-instructions", 

21 "trusted_as_instructions": False, 

22 "default_role": UNTRUSTED_ROLE, 

23 "source_fields": ["agent_id", "step_id", "vendor", "model"], 

24 "capability_scope": { 

25 "default_allowed_capabilities": [], 

26 "rule": "tagged content cannot expand downstream capabilities", 

27 }, 

28 "consumers": [ 

29 "findings", 

30 "step_handoff", 

31 "review_verdict", 

32 "jury_verdict", 

33 "feedback_workflow", 

34 ], 

35 } 

36 

37 

38def source_tag( 

39 *, 

40 source_agent: str | None, 

41 step_id: str | None, 

42 vendor: str | None = None, 

43 model: str | None = None, 

44 allowed_capabilities: tuple[str, ...] | list[str] = (), 

45) -> dict[str, Any]: 

46 """Build a source-only provenance tag for structured records.""" 

47 capabilities, unknown = _capabilities(allowed_capabilities) 

48 return { 

49 "schema_version": SCHEMA_VERSION, 

50 "role": UNTRUSTED_ROLE, 

51 "trusted_as_instructions": False, 

52 "source": { 

53 "agent_id": _value(source_agent, "unknown-agent"), 

54 "step_id": _value(step_id, "unknown-step"), 

55 "vendor": _optional(vendor), 

56 "model": _optional(model), 

57 }, 

58 "capability_scope": { 

59 "allowed_capabilities": capabilities, 

60 "unknown_capabilities": unknown, 

61 "can_expand_capabilities": False, 

62 }, 

63 } 

64 

65 

66def tag_output( 

67 content: str, 

68 *, 

69 source_agent: str | None, 

70 step_id: str | None, 

71 vendor: str | None = None, 

72 model: str | None = None, 

73 allowed_capabilities: tuple[str, ...] | list[str] = (), 

74) -> dict[str, Any]: 

75 """Wrap text produced by an agent as untrusted data with provenance.""" 

76 return { 

77 **source_tag( 

78 source_agent=source_agent, 

79 step_id=step_id, 

80 vendor=vendor, 

81 model=model, 

82 allowed_capabilities=allowed_capabilities, 

83 ), 

84 "content": content if isinstance(content, str) else "", 

85 } 

86 

87 

88def normalize_tag( 

89 value: dict[str, Any] | None, 

90 *, 

91 fallback_agent: str, 

92 step_id: str, 

93) -> dict[str, Any]: 

94 """Return a valid untrusted provenance tag, preserving valid source fields.""" 

95 if not isinstance(value, dict) or value.get("schema_version") != SCHEMA_VERSION: 

96 return source_tag(source_agent=fallback_agent, step_id=step_id) 

97 source = value.get("source") if isinstance(value.get("source"), dict) else {} 

98 scope = value.get("capability_scope") if isinstance(value.get("capability_scope"), dict) else {} 

99 allowed = scope.get("allowed_capabilities") 

100 return source_tag( 

101 source_agent=_value(source.get("agent_id"), fallback_agent), 

102 step_id=_value(source.get("step_id"), step_id), 

103 vendor=_optional(source.get("vendor")), 

104 model=_optional(source.get("model")), 

105 allowed_capabilities=allowed if isinstance(allowed, list) else [], 

106 ) 

107 

108 

109def _value(value: Any, fallback: str) -> str: 

110 return value.strip() if isinstance(value, str) and value.strip() else fallback 

111 

112 

113def _optional(value: Any) -> str | None: 

114 return value.strip() if isinstance(value, str) and value.strip() else None 

115 

116 

117def _capabilities(values: tuple[str, ...] | list[str]) -> tuple[list[str], list[str]]: 

118 known = set(KNOWN_CAPABILITIES) 

119 clean = { 

120 item.strip() 

121 for item in values 

122 if isinstance(item, str) and item.strip() 

123 } 

124 return ( 

125 sorted(item for item in clean if item in known), 

126 sorted(item for item in clean if item not in known), 

127 )