Coverage for src/ai_jury/github.py: 100%
223 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
1"""Thin GitHub helpers built on the `gh` CLI.
3Used to pull a PR diff in and to post the jury verdict back as a comment.
4Kept dependency-free; if `gh` is unavailable these raise a clear error.
5"""
6from __future__ import annotations
8import hashlib
9import json
10import re
11import shutil
12import subprocess
14# Every `gh` invocation is bounded (#246): a stalled network call or an
15# interactive auth/2FA prompt would otherwise block `subprocess.run` forever and
16# hang the whole jury run with no per-call ceiling. On timeout we fail soft with
17# a clear, actionable error like any other gh failure.
18_GH_TIMEOUT_S = 90
21def _gh(*args: str) -> str:
22 if shutil.which("gh") is None:
23 raise RuntimeError("the GitHub CLI `gh` is not installed or not on PATH")
24 try:
25 proc = subprocess.run(
26 ["gh", *args], capture_output=True, text=True, timeout=_GH_TIMEOUT_S
27 )
28 except subprocess.TimeoutExpired:
29 raise RuntimeError(
30 f"gh {' '.join(args)} timed out after {_GH_TIMEOUT_S}s"
31 ) from None
32 if proc.returncode != 0:
33 raise RuntimeError(f"gh {' '.join(args)} failed: {proc.stderr.strip()}")
34 return proc.stdout
37def pr_diff(pr: str, repo: str | None = None) -> str:
38 args = ["pr", "diff"]
39 if repo:
40 args += ["--repo", repo]
41 args += ["--", str(pr)]
42 return _gh(*args)
45def pr_context(pr: str, repo: str | None = None) -> str:
46 """Return 'title\\n\\nbody' for a PR, best-effort."""
47 args = ["pr", "view", "--json", "title,body",
48 "--jq", '.title + "\\n\\n" + (.body // "")']
49 if repo:
50 args += ["--repo", repo]
51 args += ["--", str(pr)]
52 try:
53 return _gh(*args).strip()
54 except RuntimeError:
55 return ""
58def post_pr_comment(pr: str, body: str, repo: str | None = None) -> None:
59 args = ["pr", "comment", "--body", body]
60 if repo:
61 args += ["--repo", repo]
62 args += ["--", str(pr)]
63 _gh(*args)
66def issue_body(number: str, repo: str | None = None) -> str:
67 """Return a reviewable text rendering of a GitHub issue, best-effort.
69 Formats the issue as ``"# <title>\\n\\n_labels: a, b_\\n\\n<body>"`` so the
70 reviewer sees the title, labels, and description as one prose block. Mirrors
71 :func:`pr_context`'s error handling: any ``gh`` failure degrades to a minimal
72 string (the bare number) rather than crashing the run.
73 """
74 args = ["issue", "view", "--json", "title,body,labels",
75 "--jq",
76 '"# " + .title + "\\n\\n_labels: " '
77 '+ ((.labels | map(.name)) | join(", ")) + "_\\n\\n" + (.body // "")']
78 if repo:
79 args += ["--repo", repo]
80 args += ["--", str(number)]
81 try:
82 return _gh(*args).strip()
83 except RuntimeError:
84 return f"# issue #{number}"
87def post_issue_comment(number: str, body: str, repo: str | None = None) -> None:
88 """Post a comment on a plain GitHub issue.
90 A separate function from :func:`post_pr_comment` because ``gh pr comment``
91 only works for pull requests; ``gh issue comment`` is the issue-side command.
92 """
93 args = ["issue", "comment", "--body", body]
94 if repo:
95 args += ["--repo", repo]
96 args += ["--", str(number)]
97 _gh(*args)
100def pr_head_sha(pr: str, repo: str | None = None) -> str:
101 """Return the current head commit SHA of a PR (best-effort, '' on failure)."""
102 args = ["pr", "view", "--json", "headRefOid", "--jq", ".headRefOid"]
103 if repo:
104 args += ["--repo", repo]
105 args += ["--", str(pr)]
106 try:
107 return _gh(*args).strip()
108 except RuntimeError:
109 return ""
112def pr_comment_bodies(pr: str, repo: str | None = None) -> list[str]:
113 """Return the bodies of a PR's issue comments (best-effort, [] on failure).
115 Used by incremental mode (issue #9) to find the jury's prior
116 reviewed-SHA marker. Network errors degrade to an empty list so the caller
117 safely falls back to a full review.
118 """
119 args = ["pr", "view", "--json", "comments", "--jq", ".comments[].body"]
120 if repo:
121 args += ["--repo", repo]
122 args += ["--", str(pr)]
123 try:
124 out = _gh(*args)
125 except RuntimeError:
126 return []
127 return out.splitlines()
130def compare_diff(base: str, head: str, repo: str | None = None) -> str:
131 """Return the unified diff between two SHAs via the compare API (issue #9).
133 Uses the ``application/vnd.github.v3.diff`` media type so the response is a
134 ready-to-review unified diff. Returns '' on failure so callers can fall back.
135 """
136 resolved = _resolve_repo(repo)
137 if not resolved:
138 return ""
139 try:
140 return _gh(
141 "api",
142 "-H", "Accept: application/vnd.github.v3.diff",
143 "--", f"repos/{resolved}/compare/{base}...{head}",
144 )
145 except RuntimeError:
146 return ""
149def build_label_args(pr: str, labels, repo: str | None = None) -> list[str]:
150 """Build the ``gh pr edit`` arg vector for applying labels (pure).
152 Returns ``[]`` when there are no labels (nothing to do). Kept pure and
153 network-free so the arg construction can be unit-tested without invoking
154 ``gh`` or hitting GitHub.
155 """
156 clean = [str(label) for label in (labels or []) if str(label).strip()]
157 if not clean:
158 return []
159 args = ["pr", "edit"]
160 for label in clean:
161 args += ["--add-label", label]
162 if repo:
163 args += ["--repo", repo]
164 args += ["--", str(pr)]
165 return args
168def apply_labels(pr: str, labels, repo: str | None = None) -> list[str]:
169 """Best-effort: apply ``labels`` to ``pr`` via ``gh pr edit --add-label``.
171 Only called when labeling is explicitly enabled (CLI ``--label``); it never
172 runs by default. No-op (returns ``[]``) when there are no labels. Returns the
173 ``gh`` arg vector that was invoked so callers can log it.
174 """
175 args = build_label_args(pr, labels, repo)
176 if not args:
177 return args
178 _gh(*args)
179 return args
182# Marker prefix identifying comments authored by the jury (enables dedup).
183INLINE_MARKER = "<!-- arc-inline -->"
185# Hidden per-finding signature marker, embedded in the comment body so that
186# re-runs can match an existing comment back to the finding that produced it.
187# Two distinct findings on the same (path, line) get distinct signatures and
188# therefore do NOT collapse into one another during dedup.
189_SIG_MARKER_RE = re.compile(r"<!-- arc-sig:([0-9a-f]+) -->")
192def _finding_signature(finding) -> str:
193 """Return a short, stable hash identifying a finding.
195 Derived from the normalized ``severity`` and ``claim`` (lowercased and
196 stripped) so the same finding yields the same signature across runs, while
197 a different severity or claim yields a different one.
198 """
199 sev = (getattr(finding, "severity", "") or "").strip().lower()
200 claim = (getattr(finding, "claim", "") or "").strip().lower()
201 raw = f"{sev}|{claim}"
202 return hashlib.sha1(raw.encode("utf-8")).hexdigest()[:12]
205def _sig_marker(signature: str) -> str:
206 return f"<!-- arc-sig:{signature} -->"
209def _sig_from_body(body: str | None) -> str:
210 """Extract the embedded finding signature from a comment body ('' if none)."""
211 if not body:
212 return ""
213 match = _SIG_MARKER_RE.search(body)
214 return match.group(1) if match else ""
217def _comment_body(finding) -> str:
218 sev = getattr(finding, "severity", "info")
219 claim = getattr(finding, "claim", "") or ""
220 fix = getattr(finding, "suggested_fix", "") or ""
221 text = f"[{sev}] {claim}"
222 if fix:
223 text += f" — {fix}"
224 # The signature marker is hidden (HTML comment); the visible body for humans
225 # is unchanged.
226 sig = _sig_marker(_finding_signature(finding))
227 return f"{INLINE_MARKER}{sig}\n{text}"
230def _review_body(n: int) -> str:
231 """Top-level review body. GitHub's create-review API requires a non-empty
232 ``body`` when ``event`` is COMMENT (omitting it can 422) — issue #122."""
233 return f"{INLINE_MARKER}\n🏛️ AI Jury — {n} inline finding(s)."
236def build_inline_payload(findings) -> list[dict]:
237 """Build the inline review-comment array for the GitHub reviews API.
239 Pure: one comment per finding that has BOTH a file and a line. Findings
240 without a file or line are skipped (they cannot be anchored inline).
241 """
242 payload: list[dict] = []
243 for f in findings or []:
244 path = getattr(f, "file", None)
245 line = getattr(f, "line", None)
246 if not path or line is None:
247 continue
248 payload.append(
249 {
250 "path": str(path),
251 "line": int(line),
252 "side": "RIGHT",
253 "body": _comment_body(f),
254 }
255 )
256 return payload
259def _resolve_repo(repo: str | None) -> str:
260 if repo:
261 return repo
262 try:
263 out = _gh("repo", "view", "--json", "nameWithOwner")
264 return json.loads(out).get("nameWithOwner", "")
265 except (RuntimeError, json.JSONDecodeError):
266 return ""
269def _existing_inline_keys(pr: str, repo: str) -> set:
270 """Return ``(path, line, signature)`` keys for existing jury comments.
272 Best-effort. ``line`` falls back to ``original_line`` when GitHub reports a
273 null ``line`` (e.g. for outdated comments). ``signature`` is parsed back out
274 of the comment body so distinct findings on the same line are tracked
275 independently.
276 """
277 keys: set = set()
278 try:
279 out = _gh("api", "--paginate", "--", f"repos/{repo}/pulls/{pr}/comments")
280 data = json.loads(out)
281 except (RuntimeError, json.JSONDecodeError):
282 return keys
283 if not isinstance(data, list):
284 return keys
285 for c in data:
286 if not isinstance(c, dict):
287 continue
288 body = c.get("body", "") or ""
289 if INLINE_MARKER not in body:
290 continue
291 line = c.get("line")
292 if line is None:
293 line = c.get("original_line")
294 keys.add((c.get("path"), line, _sig_from_body(body)))
295 return keys
298def _gh_with_input(args: list[str], stdin_data: str) -> str:
299 if shutil.which("gh") is None:
300 raise RuntimeError("the GitHub CLI `gh` is not installed or not on PATH")
301 try:
302 proc = subprocess.run(
303 ["gh", *args], input=stdin_data, capture_output=True, text=True,
304 timeout=_GH_TIMEOUT_S,
305 )
306 except subprocess.TimeoutExpired:
307 raise RuntimeError(
308 f"gh {' '.join(args)} timed out after {_GH_TIMEOUT_S}s"
309 ) from None
310 if proc.returncode != 0:
311 raise RuntimeError(f"gh {' '.join(args)} failed: {proc.stderr.strip()}")
312 return proc.stdout
315def post_inline_comments(
316 pr: str,
317 findings,
318 repo: str | None = None,
319 dry_run: bool = False,
320) -> dict:
321 """Post inline review comments as a single PR review.
323 Best-effort dedup: skips comments whose ``(path, line, finding-signature)``
324 already has a jury inline comment. Keying on the signature means two
325 distinct findings on the same line are both posted. When ``dry_run`` is True
326 the payload is printed and returned without any network call. Returns the
327 review payload (would-be) posted.
328 """
329 comments = build_inline_payload(findings)
331 if dry_run:
332 payload = {"event": "COMMENT", "body": _review_body(len(comments)), "comments": comments}
333 print(json.dumps(payload, indent=2))
334 return payload
336 resolved = _resolve_repo(repo)
337 existing = _existing_inline_keys(pr, resolved) if resolved else set()
338 deduped = [
339 c
340 for c in comments
341 if (c["path"], c["line"], _sig_from_body(c["body"])) not in existing
342 ]
344 payload = {"event": "COMMENT", "body": _review_body(len(deduped)), "comments": deduped}
345 if not deduped:
346 return payload
348 _gh_with_input(
349 ["api", "--method", "POST", "--input", "-", "--", f"repos/{resolved}/pulls/{pr}/reviews"],
350 json.dumps(payload),
351 )
352 return payload
355# Hidden marker identifying the jury's single sticky progress comment (issue #125).
356PROGRESS_MARKER = "<!-- arc-progress -->"
359def render_progress_body(stages: list[str], *, done: bool = False, final: str | None = None) -> str:
360 """Render the sticky progress-comment body (pure, issue #125).
362 ``stages`` is the ordered list of milestones reached. When ``done`` and a
363 ``final`` report is given, the comment becomes the verdict (with the marker
364 kept so the same comment is reused on a re-run).
365 """
366 if done and final is not None:
367 return f"{PROGRESS_MARKER}\n{final}"
368 header = "🏛️ **AI Jury** — review complete." if done else "🏛️ **AI Jury** — review in progress…"
369 lines = [PROGRESS_MARKER, header, ""]
370 for s in stages:
371 lines.append(f"- {s}")
372 if not done:
373 lines.append("\n_Updating live; the verdict will replace this when done._")
374 return "\n".join(lines)
377def _create_issue_comment(pr: str, body: str, repo: str) -> int | None:
378 """Create a PR/issue comment, returning its numeric id (or None)."""
379 try:
380 out = _gh_with_input(
381 ["api", "--method", "POST", "--input", "-", "--", f"repos/{repo}/issues/{pr}/comments"],
382 json.dumps({"body": body}),
383 )
384 return json.loads(out).get("id")
385 except (RuntimeError, json.JSONDecodeError):
386 return None
389def _edit_issue_comment(comment_id: int, body: str, repo: str) -> bool:
390 try:
391 _gh_with_input(
392 ["api", "--method", "PATCH", "--input", "-", "--", f"repos/{repo}/issues/comments/{comment_id}"],
393 json.dumps({"body": body}),
394 )
395 return True
396 except RuntimeError:
397 return False
400class ProgressReporter:
401 """Maintains ONE sticky PR comment, updated as the run advances (issue #125).
403 Best-effort and resilient: a resolve/create/edit failure is swallowed so a
404 GitHub hiccup never crashes the review. The first ``update`` creates the
405 comment; subsequent updates edit it in place; ``finish`` turns it into the
406 final verdict.
407 """
409 def __init__(self, pr: str, repo: str | None = None):
410 self.pr = str(pr)
411 self.repo = _resolve_repo(repo)
412 self.comment_id: int | None = None
413 self.stages: list[str] = []
415 def _push(self, body: str) -> None:
416 if not self.repo:
417 return
418 if self.comment_id is None:
419 self.comment_id = _create_issue_comment(self.pr, body, self.repo)
420 else:
421 _edit_issue_comment(self.comment_id, body, self.repo)
423 def update(self, milestone: str) -> None:
424 self.stages.append(milestone)
425 self._push(render_progress_body(self.stages, done=False))
427 def finish(self, final_report: str) -> None:
428 self._push(render_progress_body(self.stages, done=True, final=final_report))