Coverage for src/keel/runner.py: 100%
53 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Thin I/O: execute shell-command gates (build / lint / command extensions).
3This is the only place keel shells out for gates. It is deliberately thin and
4**fail-soft**: a timeout or a missing binary becomes a failed :class:`CommandResult`
5rather than an exception. The subprocess call is injectable (``_run``) so the gate
6runner is fully unit-testable offline; agentic gates are dispatched elsewhere.
7"""
9from __future__ import annotations
11import re
12import subprocess
13from collections.abc import Callable
14from dataclasses import dataclass
15from typing import TYPE_CHECKING
17from .findings import Finding
19if TYPE_CHECKING: # pragma: no cover
20 from .gates import GateSpec
22GateRunner = Callable[["GateSpec"], tuple[bool, list[Finding]]]
24_ON_FAIL_SEVERITY = {"block": "major", "suggest": "minor", "warn": "nit"}
26#: reviewdog-style errorformat: ``path:line[:col]: message`` (first hit wins).
27#: A single multiline ``search`` replaces a per-``splitlines`` loop: ``^`` is
28#: anchored to each line by ``re.MULTILINE``, the path classes exclude ``\n`` so a
29#: match can never span lines, and the trailing ``(?:[:\s]|$)`` accepts the line
30#: number at end-of-line.
31_LOCATION_RE = re.compile(
32 r"^[ \t]*(?P<path>[^\s\n:][^:\n]*?):(?P<line>\d+)(?::\d+)?(?:[:\s]|$)",
33 re.MULTILINE,
34)
37def first_location(text: str) -> tuple[str | None, int | None]:
38 """Extract the first ``path:line`` location from tool output (``(None, None)`` if none)."""
39 m = _LOCATION_RE.search(text)
40 return (m.group("path"), int(m.group("line"))) if m else (None, None)
43@dataclass(frozen=True)
44class CommandResult:
45 ok: bool
46 code: int
47 output: str
50def run_command(
51 cmd: str, *, cwd: str | None = None, timeout: int = 600, _run=subprocess.run
52) -> CommandResult:
53 """Run ``cmd`` in a shell, capturing output. Fail-soft on timeout/OS error."""
54 try:
55 # Intentional shell boundary: cmd must come only from operator-controlled
56 # project config or extension YAML, never from PR content or agent output.
57 proc = _run(cmd, shell=True, cwd=cwd, capture_output=True, text=True, timeout=timeout)
58 except subprocess.TimeoutExpired:
59 return CommandResult(False, 124, f"timed out after {timeout}s")
60 except OSError as exc:
61 return CommandResult(False, 127, str(exc))
62 output = (proc.stdout or "") + (proc.stderr or "")
63 return CommandResult(proc.returncode == 0, proc.returncode, output)
66def run_argv(
67 argv: list[str], *, cwd: str | None = None, timeout: int = 120, _run=subprocess.run
68) -> CommandResult:
69 """Run an argv list (no shell). Fail-soft on timeout/OS error. Used by git/gh wrappers."""
70 try:
71 proc = _run(argv, cwd=cwd, capture_output=True, text=True, timeout=timeout)
72 except subprocess.TimeoutExpired:
73 return CommandResult(False, 124, f"timed out after {timeout}s")
74 except OSError as exc:
75 return CommandResult(False, 127, str(exc))
76 output = (proc.stdout or "") + (proc.stderr or "")
77 return CommandResult(proc.returncode == 0, proc.returncode, output)
80def _tail(text: str, n: int = 20) -> str:
81 return "\n".join(text.strip().splitlines()[-n:])
84def command_gate_runner(
85 repo_root: str | None = None, *, timeout: int = 600, _run=subprocess.run
86) -> GateRunner:
87 """A :data:`keel.gates.GateRunner` that executes ``command`` gates via the shell.
89 Non-command gates (agentic / builtin like ``jury``) are not executed here — in
90 command-only mode they pass as no-ops; the agent-dispatch layer runs those.
91 """
93 def runner(spec: GateSpec) -> tuple[bool, list[Finding]]:
94 if spec.kind != "command" or not spec.run:
95 return True, []
96 result = run_command(spec.run, cwd=repo_root, timeout=timeout, _run=_run)
97 if result.ok:
98 return True, []
99 severity = _ON_FAIL_SEVERITY[spec.on_fail]
100 message = f"{spec.id} failed (exit {result.code})"
101 tail = _tail(result.output)
102 if tail:
103 message += f": {tail}"
104 path, line = first_location(result.output)
105 return False, [Finding(
106 severity, message, spec.id,
107 path=path, line=line, anchorable=path is not None and line is not None,
108 )]
110 return runner