Coverage for src/ai_jury/github.py: 100%

223 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-05 20:29 +0000

1"""Thin GitHub helpers built on the `gh` CLI. 

2 

3Used to pull a PR diff in and to post the jury verdict back as a comment. 

4Kept dependency-free; if `gh` is unavailable these raise a clear error. 

5""" 

6from __future__ import annotations 

7 

8import hashlib 

9import json 

10import re 

11import shutil 

12import subprocess 

13 

14# Every `gh` invocation is bounded (#246): a stalled network call or an 

15# interactive auth/2FA prompt would otherwise block `subprocess.run` forever and 

16# hang the whole jury run with no per-call ceiling. On timeout we fail soft with 

17# a clear, actionable error like any other gh failure. 

18_GH_TIMEOUT_S = 90 

19 

20 

21def _gh(*args: str) -> str: 

22 if shutil.which("gh") is None: 

23 raise RuntimeError("the GitHub CLI `gh` is not installed or not on PATH") 

24 try: 

25 proc = subprocess.run( 

26 ["gh", *args], capture_output=True, text=True, timeout=_GH_TIMEOUT_S 

27 ) 

28 except subprocess.TimeoutExpired: 

29 raise RuntimeError( 

30 f"gh {' '.join(args)} timed out after {_GH_TIMEOUT_S}s" 

31 ) from None 

32 if proc.returncode != 0: 

33 raise RuntimeError(f"gh {' '.join(args)} failed: {proc.stderr.strip()}") 

34 return proc.stdout 

35 

36 

37def pr_diff(pr: str, repo: str | None = None) -> str: 

38 args = ["pr", "diff"] 

39 if repo: 

40 args += ["--repo", repo] 

41 args += ["--", str(pr)] 

42 return _gh(*args) 

43 

44 

45def pr_context(pr: str, repo: str | None = None) -> str: 

46 """Return 'title\\n\\nbody' for a PR, best-effort.""" 

47 args = ["pr", "view", "--json", "title,body", 

48 "--jq", '.title + "\\n\\n" + (.body // "")'] 

49 if repo: 

50 args += ["--repo", repo] 

51 args += ["--", str(pr)] 

52 try: 

53 return _gh(*args).strip() 

54 except RuntimeError: 

55 return "" 

56 

57 

58def post_pr_comment(pr: str, body: str, repo: str | None = None) -> None: 

59 args = ["pr", "comment", "--body", body] 

60 if repo: 

61 args += ["--repo", repo] 

62 args += ["--", str(pr)] 

63 _gh(*args) 

64 

65 

66def issue_body(number: str, repo: str | None = None) -> str: 

67 """Return a reviewable text rendering of a GitHub issue, best-effort. 

68 

69 Formats the issue as ``"# <title>\\n\\n_labels: a, b_\\n\\n<body>"`` so the 

70 reviewer sees the title, labels, and description as one prose block. Mirrors 

71 :func:`pr_context`'s error handling: any ``gh`` failure degrades to a minimal 

72 string (the bare number) rather than crashing the run. 

73 """ 

74 args = ["issue", "view", "--json", "title,body,labels", 

75 "--jq", 

76 '"# " + .title + "\\n\\n_labels: " ' 

77 '+ ((.labels | map(.name)) | join(", ")) + "_\\n\\n" + (.body // "")'] 

78 if repo: 

79 args += ["--repo", repo] 

80 args += ["--", str(number)] 

81 try: 

82 return _gh(*args).strip() 

83 except RuntimeError: 

84 return f"# issue #{number}" 

85 

86 

87def post_issue_comment(number: str, body: str, repo: str | None = None) -> None: 

88 """Post a comment on a plain GitHub issue. 

89 

90 A separate function from :func:`post_pr_comment` because ``gh pr comment`` 

91 only works for pull requests; ``gh issue comment`` is the issue-side command. 

92 """ 

93 args = ["issue", "comment", "--body", body] 

94 if repo: 

95 args += ["--repo", repo] 

96 args += ["--", str(number)] 

97 _gh(*args) 

98 

99 

100def pr_head_sha(pr: str, repo: str | None = None) -> str: 

101 """Return the current head commit SHA of a PR (best-effort, '' on failure).""" 

102 args = ["pr", "view", "--json", "headRefOid", "--jq", ".headRefOid"] 

103 if repo: 

104 args += ["--repo", repo] 

105 args += ["--", str(pr)] 

106 try: 

107 return _gh(*args).strip() 

108 except RuntimeError: 

109 return "" 

110 

111 

112def pr_comment_bodies(pr: str, repo: str | None = None) -> list[str]: 

113 """Return the bodies of a PR's issue comments (best-effort, [] on failure). 

114 

115 Used by incremental mode (issue #9) to find the jury's prior 

116 reviewed-SHA marker. Network errors degrade to an empty list so the caller 

117 safely falls back to a full review. 

118 """ 

119 args = ["pr", "view", "--json", "comments", "--jq", ".comments[].body"] 

120 if repo: 

121 args += ["--repo", repo] 

122 args += ["--", str(pr)] 

123 try: 

124 out = _gh(*args) 

125 except RuntimeError: 

126 return [] 

127 return out.splitlines() 

128 

129 

130def compare_diff(base: str, head: str, repo: str | None = None) -> str: 

131 """Return the unified diff between two SHAs via the compare API (issue #9). 

132 

133 Uses the ``application/vnd.github.v3.diff`` media type so the response is a 

134 ready-to-review unified diff. Returns '' on failure so callers can fall back. 

135 """ 

136 resolved = _resolve_repo(repo) 

137 if not resolved: 

138 return "" 

139 try: 

140 return _gh( 

141 "api", 

142 "-H", "Accept: application/vnd.github.v3.diff", 

143 "--", f"repos/{resolved}/compare/{base}...{head}", 

144 ) 

145 except RuntimeError: 

146 return "" 

147 

148 

149def build_label_args(pr: str, labels, repo: str | None = None) -> list[str]: 

150 """Build the ``gh pr edit`` arg vector for applying labels (pure). 

151 

152 Returns ``[]`` when there are no labels (nothing to do). Kept pure and 

153 network-free so the arg construction can be unit-tested without invoking 

154 ``gh`` or hitting GitHub. 

155 """ 

156 clean = [str(label) for label in (labels or []) if str(label).strip()] 

157 if not clean: 

158 return [] 

159 args = ["pr", "edit"] 

160 for label in clean: 

161 args += ["--add-label", label] 

162 if repo: 

163 args += ["--repo", repo] 

164 args += ["--", str(pr)] 

165 return args 

166 

167 

168def apply_labels(pr: str, labels, repo: str | None = None) -> list[str]: 

169 """Best-effort: apply ``labels`` to ``pr`` via ``gh pr edit --add-label``. 

170 

171 Only called when labeling is explicitly enabled (CLI ``--label``); it never 

172 runs by default. No-op (returns ``[]``) when there are no labels. Returns the 

173 ``gh`` arg vector that was invoked so callers can log it. 

174 """ 

175 args = build_label_args(pr, labels, repo) 

176 if not args: 

177 return args 

178 _gh(*args) 

179 return args 

180 

181 

182# Marker prefix identifying comments authored by the jury (enables dedup). 

183INLINE_MARKER = "<!-- arc-inline -->" 

184 

185# Hidden per-finding signature marker, embedded in the comment body so that 

186# re-runs can match an existing comment back to the finding that produced it. 

187# Two distinct findings on the same (path, line) get distinct signatures and 

188# therefore do NOT collapse into one another during dedup. 

189_SIG_MARKER_RE = re.compile(r"<!-- arc-sig:([0-9a-f]+) -->") 

190 

191 

192def _finding_signature(finding) -> str: 

193 """Return a short, stable hash identifying a finding. 

194 

195 Derived from the normalized ``severity`` and ``claim`` (lowercased and 

196 stripped) so the same finding yields the same signature across runs, while 

197 a different severity or claim yields a different one. 

198 """ 

199 sev = (getattr(finding, "severity", "") or "").strip().lower() 

200 claim = (getattr(finding, "claim", "") or "").strip().lower() 

201 raw = f"{sev}|{claim}" 

202 return hashlib.sha1(raw.encode("utf-8")).hexdigest()[:12] 

203 

204 

205def _sig_marker(signature: str) -> str: 

206 return f"<!-- arc-sig:{signature} -->" 

207 

208 

209def _sig_from_body(body: str | None) -> str: 

210 """Extract the embedded finding signature from a comment body ('' if none).""" 

211 if not body: 

212 return "" 

213 match = _SIG_MARKER_RE.search(body) 

214 return match.group(1) if match else "" 

215 

216 

217def _comment_body(finding) -> str: 

218 sev = getattr(finding, "severity", "info") 

219 claim = getattr(finding, "claim", "") or "" 

220 fix = getattr(finding, "suggested_fix", "") or "" 

221 text = f"[{sev}] {claim}" 

222 if fix: 

223 text += f"{fix}" 

224 # The signature marker is hidden (HTML comment); the visible body for humans 

225 # is unchanged. 

226 sig = _sig_marker(_finding_signature(finding)) 

227 return f"{INLINE_MARKER}{sig}\n{text}" 

228 

229 

230def _review_body(n: int) -> str: 

231 """Top-level review body. GitHub's create-review API requires a non-empty 

232 ``body`` when ``event`` is COMMENT (omitting it can 422) — issue #122.""" 

233 return f"{INLINE_MARKER}\n🏛️ AI Jury — {n} inline finding(s)." 

234 

235 

236def build_inline_payload(findings) -> list[dict]: 

237 """Build the inline review-comment array for the GitHub reviews API. 

238 

239 Pure: one comment per finding that has BOTH a file and a line. Findings 

240 without a file or line are skipped (they cannot be anchored inline). 

241 """ 

242 payload: list[dict] = [] 

243 for f in findings or []: 

244 path = getattr(f, "file", None) 

245 line = getattr(f, "line", None) 

246 if not path or line is None: 

247 continue 

248 payload.append( 

249 { 

250 "path": str(path), 

251 "line": int(line), 

252 "side": "RIGHT", 

253 "body": _comment_body(f), 

254 } 

255 ) 

256 return payload 

257 

258 

259def _resolve_repo(repo: str | None) -> str: 

260 if repo: 

261 return repo 

262 try: 

263 out = _gh("repo", "view", "--json", "nameWithOwner") 

264 return json.loads(out).get("nameWithOwner", "") 

265 except (RuntimeError, json.JSONDecodeError): 

266 return "" 

267 

268 

269def _existing_inline_keys(pr: str, repo: str) -> set: 

270 """Return ``(path, line, signature)`` keys for existing jury comments. 

271 

272 Best-effort. ``line`` falls back to ``original_line`` when GitHub reports a 

273 null ``line`` (e.g. for outdated comments). ``signature`` is parsed back out 

274 of the comment body so distinct findings on the same line are tracked 

275 independently. 

276 """ 

277 keys: set = set() 

278 try: 

279 out = _gh("api", "--paginate", "--", f"repos/{repo}/pulls/{pr}/comments") 

280 data = json.loads(out) 

281 except (RuntimeError, json.JSONDecodeError): 

282 return keys 

283 if not isinstance(data, list): 

284 return keys 

285 for c in data: 

286 if not isinstance(c, dict): 

287 continue 

288 body = c.get("body", "") or "" 

289 if INLINE_MARKER not in body: 

290 continue 

291 line = c.get("line") 

292 if line is None: 

293 line = c.get("original_line") 

294 keys.add((c.get("path"), line, _sig_from_body(body))) 

295 return keys 

296 

297 

298def _gh_with_input(args: list[str], stdin_data: str) -> str: 

299 if shutil.which("gh") is None: 

300 raise RuntimeError("the GitHub CLI `gh` is not installed or not on PATH") 

301 try: 

302 proc = subprocess.run( 

303 ["gh", *args], input=stdin_data, capture_output=True, text=True, 

304 timeout=_GH_TIMEOUT_S, 

305 ) 

306 except subprocess.TimeoutExpired: 

307 raise RuntimeError( 

308 f"gh {' '.join(args)} timed out after {_GH_TIMEOUT_S}s" 

309 ) from None 

310 if proc.returncode != 0: 

311 raise RuntimeError(f"gh {' '.join(args)} failed: {proc.stderr.strip()}") 

312 return proc.stdout 

313 

314 

315def post_inline_comments( 

316 pr: str, 

317 findings, 

318 repo: str | None = None, 

319 dry_run: bool = False, 

320) -> dict: 

321 """Post inline review comments as a single PR review. 

322 

323 Best-effort dedup: skips comments whose ``(path, line, finding-signature)`` 

324 already has a jury inline comment. Keying on the signature means two 

325 distinct findings on the same line are both posted. When ``dry_run`` is True 

326 the payload is printed and returned without any network call. Returns the 

327 review payload (would-be) posted. 

328 """ 

329 comments = build_inline_payload(findings) 

330 

331 if dry_run: 

332 payload = {"event": "COMMENT", "body": _review_body(len(comments)), "comments": comments} 

333 print(json.dumps(payload, indent=2)) 

334 return payload 

335 

336 resolved = _resolve_repo(repo) 

337 existing = _existing_inline_keys(pr, resolved) if resolved else set() 

338 deduped = [ 

339 c 

340 for c in comments 

341 if (c["path"], c["line"], _sig_from_body(c["body"])) not in existing 

342 ] 

343 

344 payload = {"event": "COMMENT", "body": _review_body(len(deduped)), "comments": deduped} 

345 if not deduped: 

346 return payload 

347 

348 _gh_with_input( 

349 ["api", "--method", "POST", "--input", "-", "--", f"repos/{resolved}/pulls/{pr}/reviews"], 

350 json.dumps(payload), 

351 ) 

352 return payload 

353 

354 

355# Hidden marker identifying the jury's single sticky progress comment (issue #125). 

356PROGRESS_MARKER = "<!-- arc-progress -->" 

357 

358 

359def render_progress_body(stages: list[str], *, done: bool = False, final: str | None = None) -> str: 

360 """Render the sticky progress-comment body (pure, issue #125). 

361 

362 ``stages`` is the ordered list of milestones reached. When ``done`` and a 

363 ``final`` report is given, the comment becomes the verdict (with the marker 

364 kept so the same comment is reused on a re-run). 

365 """ 

366 if done and final is not None: 

367 return f"{PROGRESS_MARKER}\n{final}" 

368 header = "🏛️ **AI Jury** — review complete." if done else "🏛️ **AI Jury** — review in progress…" 

369 lines = [PROGRESS_MARKER, header, ""] 

370 for s in stages: 

371 lines.append(f"- {s}") 

372 if not done: 

373 lines.append("\n_Updating live; the verdict will replace this when done._") 

374 return "\n".join(lines) 

375 

376 

377def _create_issue_comment(pr: str, body: str, repo: str) -> int | None: 

378 """Create a PR/issue comment, returning its numeric id (or None).""" 

379 try: 

380 out = _gh_with_input( 

381 ["api", "--method", "POST", "--input", "-", "--", f"repos/{repo}/issues/{pr}/comments"], 

382 json.dumps({"body": body}), 

383 ) 

384 return json.loads(out).get("id") 

385 except (RuntimeError, json.JSONDecodeError): 

386 return None 

387 

388 

389def _edit_issue_comment(comment_id: int, body: str, repo: str) -> bool: 

390 try: 

391 _gh_with_input( 

392 ["api", "--method", "PATCH", "--input", "-", "--", f"repos/{repo}/issues/comments/{comment_id}"], 

393 json.dumps({"body": body}), 

394 ) 

395 return True 

396 except RuntimeError: 

397 return False 

398 

399 

400class ProgressReporter: 

401 """Maintains ONE sticky PR comment, updated as the run advances (issue #125). 

402 

403 Best-effort and resilient: a resolve/create/edit failure is swallowed so a 

404 GitHub hiccup never crashes the review. The first ``update`` creates the 

405 comment; subsequent updates edit it in place; ``finish`` turns it into the 

406 final verdict. 

407 """ 

408 

409 def __init__(self, pr: str, repo: str | None = None): 

410 self.pr = str(pr) 

411 self.repo = _resolve_repo(repo) 

412 self.comment_id: int | None = None 

413 self.stages: list[str] = [] 

414 

415 def _push(self, body: str) -> None: 

416 if not self.repo: 

417 return 

418 if self.comment_id is None: 

419 self.comment_id = _create_issue_comment(self.pr, body, self.repo) 

420 else: 

421 _edit_issue_comment(self.comment_id, body, self.repo) 

422 

423 def update(self, milestone: str) -> None: 

424 self.stages.append(milestone) 

425 self._push(render_progress_body(self.stages, done=False)) 

426 

427 def finish(self, final_report: str) -> None: 

428 self._push(render_progress_body(self.stages, done=True, final=final_report))