Coverage for src/keel/project_commands.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Project-provided command declarations. 

2 

3Project commands are durable policy data: keel can discover, list, plan, and capability-check 

4them, but their implementation remains in the consumer repository. 

5""" 

6 

7from __future__ import annotations 

8 

9from dataclasses import dataclass 

10from typing import Any 

11 

12from .config import ProjectConfig 

13 

14 

15@dataclass(frozen=True) 

16class ProjectCommand: 

17 """One project-owned command exposed through ``policy_pack``.""" 

18 

19 name: str 

20 command: str | None = None 

21 description: str | None = None 

22 agent_role: str | None = None 

23 paths: tuple[str, ...] = () 

24 required_capabilities: tuple[str, ...] = () 

25 optional_capabilities: tuple[str, ...] = () 

26 side_effects: tuple[str, ...] = () 

27 dry_run_safe: bool = False 

28 source: str = "policy_pack.project_commands" 

29 

30 def as_dict(self) -> dict[str, Any]: 

31 """Render as JSON-compatible contract data.""" 

32 return { 

33 "name": self.name, 

34 "command": self.command, 

35 "description": self.description, 

36 "agent_role": self.agent_role, 

37 "paths": list(self.paths), 

38 "required_capabilities": list(self.required_capabilities), 

39 "optional_capabilities": list(self.optional_capabilities), 

40 "side_effects": list(self.side_effects), 

41 "dry_run_safe": self.dry_run_safe, 

42 "source": self.source, 

43 } 

44 

45 

46def list_project_commands(config: ProjectConfig) -> tuple[ProjectCommand, ...]: 

47 """Return project-owned commands declared by the policy pack.""" 

48 pack = config.policy_pack 

49 commands: list[ProjectCommand] = [] 

50 commands.extend(_commands_from_map(pack.get("project_commands", {}), 

51 source="policy_pack.project_commands")) 

52 legacy_names = {cmd.name for cmd in commands} 

53 for command in _commands_from_map(pack.get("command_routing", {}), 

54 source="policy_pack.command_routing"): 

55 if command.name not in legacy_names: 

56 commands.append(command) 

57 return tuple(sorted(commands, key=lambda cmd: cmd.name)) 

58 

59 

60def get_project_command(config: ProjectConfig, name: str) -> ProjectCommand | None: 

61 """Return one project command by name, or ``None`` when absent.""" 

62 for command in list_project_commands(config): 

63 if command.name == name: 

64 return command 

65 return None 

66 

67 

68def _commands_from_map(value: Any, *, source: str) -> list[ProjectCommand]: 

69 if not isinstance(value, dict): 

70 return [] 

71 commands: list[ProjectCommand] = [] 

72 for name, raw in value.items(): 

73 if not isinstance(raw, dict): 

74 continue 

75 commands.append(ProjectCommand( 

76 name=name, 

77 command=raw.get("command"), 

78 description=raw.get("description"), 

79 agent_role=raw.get("agent_role"), 

80 paths=tuple(raw.get("paths", [])), 

81 required_capabilities=tuple(raw.get("required_capabilities", [])), 

82 optional_capabilities=tuple(raw.get("optional_capabilities", [])), 

83 side_effects=tuple(raw.get("side_effects", [])), 

84 dry_run_safe=bool(raw.get("dry_run_safe", False)), 

85 source=source, 

86 )) 

87 return commands