Coverage for src/keel/runner.py: 100%

53 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Thin I/O: execute shell-command gates (build / lint / command extensions). 

2 

3This is the only place keel shells out for gates. It is deliberately thin and 

4**fail-soft**: a timeout or a missing binary becomes a failed :class:`CommandResult` 

5rather than an exception. The subprocess call is injectable (``_run``) so the gate 

6runner is fully unit-testable offline; agentic gates are dispatched elsewhere. 

7""" 

8 

9from __future__ import annotations 

10 

11import re 

12import subprocess 

13from collections.abc import Callable 

14from dataclasses import dataclass 

15from typing import TYPE_CHECKING 

16 

17from .findings import Finding 

18 

19if TYPE_CHECKING: # pragma: no cover 

20 from .gates import GateSpec 

21 

22GateRunner = Callable[["GateSpec"], tuple[bool, list[Finding]]] 

23 

24_ON_FAIL_SEVERITY = {"block": "major", "suggest": "minor", "warn": "nit"} 

25 

26#: reviewdog-style errorformat: ``path:line[:col]: message`` (first hit wins). 

27#: A single multiline ``search`` replaces a per-``splitlines`` loop: ``^`` is 

28#: anchored to each line by ``re.MULTILINE``, the path classes exclude ``\n`` so a 

29#: match can never span lines, and the trailing ``(?:[:\s]|$)`` accepts the line 

30#: number at end-of-line. 

31_LOCATION_RE = re.compile( 

32 r"^[ \t]*(?P<path>[^\s\n:][^:\n]*?):(?P<line>\d+)(?::\d+)?(?:[:\s]|$)", 

33 re.MULTILINE, 

34) 

35 

36 

37def first_location(text: str) -> tuple[str | None, int | None]: 

38 """Extract the first ``path:line`` location from tool output (``(None, None)`` if none).""" 

39 m = _LOCATION_RE.search(text) 

40 return (m.group("path"), int(m.group("line"))) if m else (None, None) 

41 

42 

43@dataclass(frozen=True) 

44class CommandResult: 

45 ok: bool 

46 code: int 

47 output: str 

48 

49 

50def run_command( 

51 cmd: str, *, cwd: str | None = None, timeout: int = 600, _run=subprocess.run 

52) -> CommandResult: 

53 """Run ``cmd`` in a shell, capturing output. Fail-soft on timeout/OS error.""" 

54 try: 

55 # Intentional shell boundary: cmd must come only from operator-controlled 

56 # project config or extension YAML, never from PR content or agent output. 

57 proc = _run(cmd, shell=True, cwd=cwd, capture_output=True, text=True, timeout=timeout) 

58 except subprocess.TimeoutExpired: 

59 return CommandResult(False, 124, f"timed out after {timeout}s") 

60 except OSError as exc: 

61 return CommandResult(False, 127, str(exc)) 

62 output = (proc.stdout or "") + (proc.stderr or "") 

63 return CommandResult(proc.returncode == 0, proc.returncode, output) 

64 

65 

66def run_argv( 

67 argv: list[str], *, cwd: str | None = None, timeout: int = 120, _run=subprocess.run 

68) -> CommandResult: 

69 """Run an argv list (no shell). Fail-soft on timeout/OS error. Used by git/gh wrappers.""" 

70 try: 

71 proc = _run(argv, cwd=cwd, capture_output=True, text=True, timeout=timeout) 

72 except subprocess.TimeoutExpired: 

73 return CommandResult(False, 124, f"timed out after {timeout}s") 

74 except OSError as exc: 

75 return CommandResult(False, 127, str(exc)) 

76 output = (proc.stdout or "") + (proc.stderr or "") 

77 return CommandResult(proc.returncode == 0, proc.returncode, output) 

78 

79 

80def _tail(text: str, n: int = 20) -> str: 

81 return "\n".join(text.strip().splitlines()[-n:]) 

82 

83 

84def command_gate_runner( 

85 repo_root: str | None = None, *, timeout: int = 600, _run=subprocess.run 

86) -> GateRunner: 

87 """A :data:`keel.gates.GateRunner` that executes ``command`` gates via the shell. 

88 

89 Non-command gates (agentic / builtin like ``jury``) are not executed here — in 

90 command-only mode they pass as no-ops; the agent-dispatch layer runs those. 

91 """ 

92 

93 def runner(spec: GateSpec) -> tuple[bool, list[Finding]]: 

94 if spec.kind != "command" or not spec.run: 

95 return True, [] 

96 result = run_command(spec.run, cwd=repo_root, timeout=timeout, _run=_run) 

97 if result.ok: 

98 return True, [] 

99 severity = _ON_FAIL_SEVERITY[spec.on_fail] 

100 message = f"{spec.id} failed (exit {result.code})" 

101 tail = _tail(result.output) 

102 if tail: 

103 message += f": {tail}" 

104 path, line = first_location(result.output) 

105 return False, [Finding( 

106 severity, message, spec.id, 

107 path=path, line=line, anchorable=path is not None and line is not None, 

108 )] 

109 

110 return runner