Coverage for src/keel/jury.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""The ``jury`` built-in gate — run the ai-jury CLI on the diff (optional, fail-soft). 

2 

3keel does **not** depend on ai-jury. If the ``jury`` CLI is on PATH, this gate runs it on 

4the change's diff and maps its findings into keel :class:`~keel.findings.Finding`s; if it is 

5absent, the gate is a fail-soft no-op (the flow runs with or without jury). Parsing is pure 

6and unit-tested; the subprocess is behind the injectable ``_run`` seam. 

7""" 

8 

9from __future__ import annotations 

10 

11import json 

12import os 

13import tempfile 

14 

15from .findings import Finding 

16from .runner import run_argv 

17 

18#: ai-jury severities → keel severities (unknown ⇒ ``minor``). 

19_SEVERITY = { 

20 "critical": "critical", "blocker": "critical", 

21 "major": "major", 

22 "minor": "minor", 

23 "nit": "nit", "info": "nit", "note": "nit", 

24} 

25 

26MAX_DIFF_BYTES = 1_000_000 

27 

28 

29def map_severity(severity: str) -> str: 

30 """Map an ai-jury severity onto a keel severity (default ``minor``).""" 

31 return _SEVERITY.get((severity or "").strip().lower(), "minor") 

32 

33 

34def parse_findings(data: dict | str) -> list[Finding]: 

35 """Map an ai-jury JSON report (dict or raw string) into keel Findings.""" 

36 if isinstance(data, str): 

37 try: 

38 data = json.loads(data) 

39 except json.JSONDecodeError: 

40 return [] 

41 if not isinstance(data, dict): 

42 return [] 

43 out: list[Finding] = [] 

44 for f in data.get("findings") or []: 

45 path = f.get("file") 

46 line = f.get("line") 

47 line = line if isinstance(line, int) else None 

48 out.append(Finding( 

49 severity=map_severity(f.get("severity", "")), 

50 message=f.get("claim") or "(jury finding)", 

51 source=f"jury:{f.get('reviewer') or 'consensus'}", 

52 path=path, 

53 line=line, 

54 anchorable=bool(path) and line is not None, 

55 )) 

56 return out 

57 

58 

59def _kw(_run): 

60 return {"_run": _run} if _run is not None else {} 

61 

62 

63def available(*, cwd: str | None = None, _run=None) -> bool: 

64 """True if the ``jury`` CLI is callable.""" 

65 return run_argv(["jury", "--version"], cwd=cwd, timeout=30, **_kw(_run)).ok 

66 

67 

68def _oversize_finding(size: int, *, severity: str = "nit") -> Finding: 

69 """Record that the jury gate skipped an oversize diff. 

70 

71 Advisory jury mode keeps the finding non-blocking (``nit``). Gating jury mode 

72 escalates it to ``major`` so an oversize diff cannot bypass the blocking 

73 cross-vendor review gate. 

74 """ 

75 return Finding( 

76 severity=severity, 

77 message=(f"jury skipped: diff is {size} bytes, over the {MAX_DIFF_BYTES}-byte " 

78 "limit (ai-jury large-diff chunking not applied)"), 

79 source="jury:skipped-oversize", 

80 path=None, 

81 line=None, 

82 anchorable=False, 

83 ) 

84 

85 

86def run_gate( 

87 diff_text: str, 

88 *, 

89 cwd: str | None = None, 

90 mode: str = "advisory", 

91 _run=None, 

92) -> tuple[bool, list[Finding]]: 

93 """Run ``jury`` on ``diff_text`` and map its findings. 

94 

95 Returns ``(ok, findings)``; ``ok`` is False only when a finding blocks 

96 (critical/major). Fail-soft no-op (``(True, [])``) when there is no diff or the 

97 ``jury`` CLI is not installed. In advisory mode, an oversize diff passes but 

98 emits a non-blocking advisory finding. In gating mode, an oversize diff fails 

99 closed with a blocking finding. 

100 """ 

101 if not diff_text: 

102 return True, [] 

103 size = len(diff_text.encode("utf-8")) 

104 if size > MAX_DIFF_BYTES: 

105 if mode == "gating": 

106 return False, [_oversize_finding(size, severity="major")] 

107 return True, [_oversize_finding(size)] 

108 if not available(cwd=cwd, _run=_run): 

109 return True, [] 

110 fd, path = tempfile.mkstemp(suffix=".diff") 

111 try: 

112 with os.fdopen(fd, "w", encoding="utf-8") as fh: 

113 fh.write(diff_text) 

114 result = run_argv(["jury", "--format", "json", "--diff-file", path], 

115 cwd=cwd, timeout=600, **_kw(_run)) 

116 finally: 

117 os.unlink(path) 

118 findings = parse_findings(result.output) 

119 blocked = any(f.severity in ("critical", "major") for f in findings) 

120 return (not blocked), findings