Coverage for src/ai_jury/orchestrator.py: 99%

424 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-05 20:29 +0000

1"""Jury orchestration: review -> debate -> synthesis. 

2 

3The orchestrator owns the round structure and prompt assembly; adapters only run 

4their CLI. Rounds run agents concurrently (thread pool) because each call is an 

5independent, IO-bound subprocess. 

6""" 

7from __future__ import annotations 

8 

9import random 

10import string 

11import time 

12from concurrent.futures import ThreadPoolExecutor 

13from dataclasses import dataclass, field, replace 

14 

15from . import convergence, injection, largediff, prompts 

16from .adapters import RETRYABLE_ERROR_CODES, Adapter, AgentResult, make_adapter 

17from .config import JuryConfig 

18from .consensus import FindingGroup, group_findings 

19from .findings import Finding, Verdict, parse_findings, parse_verdicts 

20from .policy import ReviewPolicy, render_policy_section 

21from .privilege import audit_privilege 

22from .redaction import redact 

23 

24 

25class RunBudget: 

26 """Wall-clock budget for one jury run (issue #30). 

27 

28 Tracks the elapsed time since construction and derives the timeout to pass a 

29 single agent call from the optional total-run and per-phase budgets. ``None`` 

30 for either budget means uncapped; when both are unset ``call_timeout`` 

31 returns ``None`` so adapters fall back to their own per-agent timeout and 

32 behaviour is identical to having no budget at all. 

33 """ 

34 

35 def __init__(self, total_timeout: int | None, phase_timeout: int | None): 

36 self.total = total_timeout 

37 self.phase = phase_timeout 

38 self._start = time.monotonic() 

39 

40 def elapsed(self) -> float: 

41 return time.monotonic() - self._start 

42 

43 def remaining(self) -> float | None: 

44 if self.total is None: 

45 return None 

46 return max(0.0, self.total - self.elapsed()) 

47 

48 def expired(self) -> bool: 

49 return self.total is not None and self.elapsed() >= self.total 

50 

51 def call_timeout(self) -> int | None: 

52 """Per-call timeout: the min of the phase budget and remaining total. 

53 

54 The agent's own per-agent timeout is applied by the adapter (it takes the 

55 min with this value), so it is not needed here. Returns ``None`` when 

56 neither budget caps the call, leaving the adapter to use its configured 

57 per-agent timeout. 

58 """ 

59 caps: list[float] = [] 

60 if self.phase is not None: 

61 caps.append(float(self.phase)) 

62 remaining = self.remaining() 

63 if remaining is not None: 

64 caps.append(remaining) 

65 if not caps: 

66 return None 

67 return max(1, int(min(caps))) 

68 

69 

70def _run_with_retry( 

71 adapter: Adapter, 

72 prompt: str, 

73 phase: str, 

74 budget: RunBudget, 

75 retries: int, 

76 log, 

77) -> AgentResult: 

78 """Run one agent for one phase, retrying transient failures (issue #30). 

79 

80 Retries only failures whose typed error code is in 

81 ``RETRYABLE_ERROR_CODES`` (timeout/rate-limit/spawn), up to ``retries`` extra 

82 attempts. A deterministic failure (auth, missing CLI, empty output, generic 

83 nonzero exit) is returned immediately. The returned result's ``attempts`` 

84 records how many tries were made. Retrying stops early when the run budget is 

85 exhausted so a retry never overruns the total timeout. 

86 """ 

87 max_attempts = max(1, retries + 1) 

88 result = adapter.run(prompt, phase=phase, timeout=budget.call_timeout()) 

89 attempts = 1 

90 while ( 

91 not result.ok 

92 and result.error_code in RETRYABLE_ERROR_CODES 

93 and attempts < max_attempts 

94 and not budget.expired() 

95 ): 

96 log( 

97 f"{adapter.name}: {phase} attempt {attempts} failed " 

98 f"({result.error_code}); retrying" 

99 ) 

100 result = adapter.run( 

101 prompt, phase=phase, timeout=budget.call_timeout() 

102 ) 

103 attempts += 1 

104 result.attempts = attempts 

105 return result 

106 

107 

108def _order_by_agents(results: list[AgentResult], order: list[str]) -> list[AgentResult]: 

109 """Reorder phase results into the configured/enabled agent order. 

110 

111 Round phases run agents concurrently (ThreadPoolExecutor.map), so the order 

112 in which results arrive is not guaranteed across runs. The report and all 

113 downstream consumers must NOT depend on thread-completion order, so we sort 

114 every phase's results by each agent's index in ``order`` (the stable 

115 enabled-agent list). Agents not present in ``order`` (should not happen) 

116 sort to the end, preserving their relative arrival order as a stable 

117 tiebreak so the sort is total and deterministic. 

118 """ 

119 index = {name: i for i, name in enumerate(order)} 

120 fallback = len(order) 

121 return sorted(results, key=lambda r: index.get(r.agent, fallback)) 

122 

123 

124@dataclass 

125class JuryOutcome: 

126 reviews: list[AgentResult] 

127 debate: list[AgentResult] 

128 synthesis: AgentResult | None 

129 chair: str 

130 findings: list[Finding] = field(default_factory=list) 

131 warnings: list[str] = field(default_factory=list) 

132 groups: list[FindingGroup] = field(default_factory=list) 

133 verify: AgentResult | None = None 

134 verdicts: list[Verdict] = field(default_factory=list) 

135 context_mode: str = "diff-only" 

136 redact_secrets: bool = True 

137 redaction_count: int = 0 

138 injection_hits: list = field(default_factory=list) 

139 # Execution/partial-result signals (issue #30): agents skipped because their 

140 # CLI was unavailable (name, reason), and whether the run budget was 

141 # exhausted before all phases completed. 

142 skipped: list = field(default_factory=list) 

143 budget_exhausted: bool = False 

144 # Adaptive-rounds signals (issue #40): rounds actually executed and a short 

145 # human-readable reason for why the debate ran / stopped. 

146 rounds_executed: int = 1 

147 stop_reason: str = "" 

148 # Set when this outcome was served from the local result cache (issue #33), 

149 # so the report/metadata can mark it as cached rather than freshly computed. 

150 from_cache: bool = False 

151 

152 

153def _run_phase( 

154 adapters: list[Adapter], 

155 prompt_for: dict[str, str], 

156 phase: str, 

157 parallel: bool, 

158 *, 

159 budget: RunBudget, 

160 retries: int, 

161 log, 

162) -> list[AgentResult]: 

163 def task(a: Adapter) -> AgentResult: 

164 return _run_with_retry(a, prompt_for[a.name], phase, budget, retries, log) 

165 

166 if parallel and len(adapters) > 1: 

167 with ThreadPoolExecutor(max_workers=len(adapters)) as pool: 

168 return list(pool.map(task, adapters)) 

169 return [task(a) for a in adapters] 

170 

171 

172def _others(reviews: list[AgentResult], me: str) -> str: 

173 """Identity-labeled peer reviews (legacy path; ``anonymize_debate = false``). 

174 

175 Renders each *other* reviewer's round-1 output with its real agent/vendor 

176 identity in the stable enabled-agent order. This is the pre-#37 behaviour and 

177 leaks both identity and position; the anonymizing path below is the default. 

178 """ 

179 chunks = [ 

180 f"### {r.agent} ({r.vendor})\n{r.output}" 

181 for r in reviews 

182 if r.agent != me and r.ok and r.output 

183 ] 

184 return "\n\n".join(chunks) if chunks else "_(no other reviews available)_" 

185 

186 

187def _anon_label(i: int) -> str: 

188 """Stable anonymous reviewer label: 0->'A', 1->'B', ... 26->'AA'.""" 

189 letters = string.ascii_uppercase 

190 label = "" 

191 i += 1 

192 while i > 0: 

193 i, rem = divmod(i - 1, 26) 

194 label = letters[rem] + label 

195 return label 

196 

197 

198def _anonymize_peers( 

199 reviews: list[AgentResult], me: str, rng: random.Random 

200) -> tuple[str, dict[str, str]]: 

201 """Chatham House peer view for a debater (#37). 

202 

203 Returns ``(prompt_text, label_to_agent)`` where the prompt text renders each 

204 *other* successful reviewer's round-1 output under an anonymous 

205 ``### Reviewer A`` / ``### Reviewer B`` heading — NO vendor or agent name. 

206 The debater's OWN review is excluded (it is passed separately as 

207 ``own_review``). Presentation order is shuffled DETERMINISTICALLY using the 

208 shared run RNG so neither identity nor position is a stable signal; the same 

209 seed yields the same order, different seeds may differ. 

210 

211 ``label_to_agent`` keeps the anonymous-label -> real-agent mapping internal so 

212 callers can still recover authorship (the report attributes by real name). 

213 """ 

214 peers = [r for r in reviews if r.agent != me and r.ok and r.output] 

215 if not peers: 

216 return "_(no other reviews available)_", {} 

217 # Deterministic per-debater shuffle from the shared run RNG. We shuffle a 

218 # copy so the caller's review list (used elsewhere) is untouched. 

219 order = list(peers) 

220 rng.shuffle(order) 

221 chunks: list[str] = [] 

222 label_to_agent: dict[str, str] = {} 

223 for i, r in enumerate(order): 

224 label = f"Reviewer {_anon_label(i)}" 

225 label_to_agent[label] = r.agent 

226 chunks.append(f"### {label}\n{r.output}") 

227 return "\n\n".join(chunks), label_to_agent 

228 

229 

230def _debate_round( 

231 debaters: list[Adapter], 

232 reviews: list[AgentResult], 

233 diff: str, 

234 config: JuryConfig, 

235 run_rng: random.Random, 

236 agent_order: list[str], 

237 prior: list[AgentResult], 

238 budget: RunBudget, 

239 retries: int, 

240 log, 

241 round_no: int, 

242 template: str = prompts.DEBATE, 

243) -> list[AgentResult]: 

244 """Run one debate round and return its results in stable agent order. 

245 

246 ``prior`` holds the previous round's debate outputs (empty for the first 

247 debate round); when present they are appended to each debater's prompt as a 

248 "prior debate" addendum so later rounds in an adaptive run (issue #40) build 

249 on, rather than repeat, earlier cross-examination. The peer-review anonymizing 

250 path (#37) is preserved unchanged. 

251 """ 

252 log(f"round {round_no}: {len(debaters)} agents cross-examining") 

253 own = {r.agent: r.output for r in reviews if r.ok} 

254 prior_txt = "\n\n".join( 

255 f"### {r.agent}\n{r.output}" for r in prior if r.ok and r.output 

256 ) 

257 debate_prompt: dict[str, str] = {} 

258 for a in debaters: 

259 if config.anonymize_debate: 

260 # Per-debater deterministic shuffle: derive a child RNG from the 

261 # shared run RNG so each debater gets an independent but reproducible 

262 # peer ordering (same seed -> same order). 

263 peer_rng = random.Random(run_rng.random()) 

264 other_reviews, _label_map = _anonymize_peers(reviews, a.name, peer_rng) 

265 else: 

266 other_reviews = _others(reviews, a.name) 

267 text = template.format( 

268 name=a.name, 

269 diff=diff, 

270 own_review=own.get(a.name, "_(your review was unavailable)_"), 

271 other_reviews=other_reviews, 

272 notice=prompts._UNTRUSTED_NOTICE, 

273 ) 

274 if prior_txt: 

275 text += ( 

276 "\n\n=== PRIOR DEBATE (earlier round) ===\n" 

277 "Build on this; do not just repeat it. Only keep a DISPUTE or " 

278 "MISSED item if it is still unresolved.\n\n" + prior_txt + "\n" 

279 ) 

280 debate_prompt[a.name] = text 

281 results = _run_phase( 

282 debaters, debate_prompt, "debate", config.parallel, 

283 budget=budget, retries=retries, log=log, 

284 ) 

285 # Same stable-ordering guarantee as round 1: independent of thread-pool 

286 # completion order. 

287 return _order_by_agents(results, agent_order) 

288 

289 

290def run_jury( 

291 config: JuryConfig, 

292 diff: str, 

293 *, 

294 context: str = "", 

295 mock: bool = False, 

296 strict: bool = False, 

297 seed: int | None = None, 

298 policy: ReviewPolicy | None = None, 

299 log=lambda _msg: None, 

300 budget: RunBudget | None = None, 

301 on_event=None, 

302 mode: str = "code", 

303) -> JuryOutcome: 

304 # Jury mode (issue #221): "code" (default) reviews a diff with the code-review 

305 # rubric; "issue" reviews a GitHub issue's prose for completeness/clarity. 

306 # Only the prompt TEMPLATES differ — the round structure, consensus, voting, 

307 # verification, ordering, and determinism are identical. ``tmpl`` selects the 

308 # four phase templates; each is threaded into the phase that uses it so the 

309 # call sites are otherwise unchanged. 

310 tmpl = prompts.for_mode(mode) 

311 # Live play-by-play hook (issue #210): an optional callback fired after each 

312 # phase result is produced — ``on_event(kind, result, round_no=None)`` with 

313 # kind in {"review", "debate", "verify", "synthesis"}. It lets a caller stream 

314 # the deliberation as it happens (CLI ``--live``) without the orchestrator 

315 # doing any I/O itself. Fired in stable per-phase order (not thread-completion 

316 # order) so the event sequence is deterministic. Defaults to a no-op. 

317 emit = on_event or (lambda *_a, **_k: None) 

318 # Repository review policy (optional, #8): maintainer-authored, TRUSTED 

319 # content rendered into each REVIEW prompt in a clearly separated section. 

320 # When ``policy`` is None a sentinel placeholder is used, so the prompt is 

321 # unchanged except for that section. The policy is distinct from the 

322 # agent-runtime ``config`` and never enters the untrusted diff/context fences. 

323 policy_section = render_policy_section(policy) 

324 # Run reproducibility: a single shared RNG seeds every randomized 

325 # orchestration decision (future: anonymized-rebuttal order, rotating 

326 # chair, tie-breaks). The seed comes from the explicit ``seed`` argument if 

327 # given, else from ``config.seed``. We construct a dedicated 

328 # ``random.Random`` instance rather than touching the global ``random`` 

329 # module so seeding a jury run never perturbs unrelated global state. 

330 # When the seed is None the RNG is unseeded (still deterministic 

331 # orchestration; randomness, if any, is just not reproducible run-to-run). 

332 # LLM output itself is never made deterministic by this — only the 

333 # orchestration around it. ``run_rng`` is the shared run RNG: pass it to 

334 # any feature that needs reproducible randomness instead of using ``random``. 

335 run_seed = seed if seed is not None else config.seed 

336 run_rng = random.Random(run_seed) # shared run RNG (see docstring) 

337 

338 # Run budget (issue #30): a single wall-clock budget threaded through every 

339 # phase. Defaults (both None) leave behaviour identical to no budget, with 

340 # each agent bounded only by its own per-agent timeout. ``retries`` is the 

341 # number of extra attempts for transient (retryable) failures. A caller may 

342 # pass a SHARED budget so ``total_timeout`` spans a whole chunked review 

343 # rather than resetting per chunk (issue #31 / review finding). 

344 if budget is None: 

345 budget = RunBudget(config.total_timeout, config.phase_timeout) 

346 retries = config.retries 

347 

348 # Context policy: diff-only sends only the diff; expanded includes context. 

349 ctx_cfg = getattr(config, "context", None) 

350 context_mode = getattr(ctx_cfg, "mode", "diff-only") if ctx_cfg else "diff-only" 

351 redact_on = getattr(ctx_cfg, "redact_secrets", True) if ctx_cfg else True 

352 if context_mode == "diff-only": 

353 context = "" 

354 redaction_count = 0 

355 if redact_on: 

356 diff, _n1 = redact(diff) 

357 context, _n2 = redact(context) 

358 redaction_count = _n1 + _n2 

359 if redaction_count: 

360 log(f"redacted {redaction_count} secret(s) before sending to agents") 

361 

362 # Prompt-injection heuristic (OWASP LLM01): scan untrusted diff/context for 

363 # patterns that try to override instructions, then SURFACE them as a synthetic 

364 # finding/warning. We never act on them; the CI gate is derived from 

365 # structured consensus (see ci.evaluate_ci), so an injected "APPROVE" 

366 # cannot flip the verdict. 

367 injection_hits = injection.scan_inputs(diff, context) 

368 injection_findings: list[Finding] = [] 

369 if injection_hits: 

370 log(f"prompt-injection heuristic: {len(injection_hits)} suspicious pattern(s) flagged") 

371 syn = injection.hits_to_finding(injection_hits) 

372 if syn is not None: 372 ↛ 377line 372 didn't jump to line 377 because the condition on line 372 was always true

373 injection_findings.append(syn) 

374 

375 # Least-privilege audit: warn when a configured agent could perform 

376 # write/tool actions while reviewing attacker-controlled content. 

377 privilege_warnings = audit_privilege(config.enabled_agents) 

378 for w in privilege_warnings: 

379 log(f"least-privilege warning: {w}") 

380 if strict and privilege_warnings: 

381 raise RuntimeError( 

382 "least-privilege check failed (--strict): " 

383 + "; ".join(privilege_warnings) 

384 ) 

385 

386 specs = config.enabled_agents 

387 adapters = [make_adapter(s, mock=mock) for s in specs] 

388 

389 # Filter to available agents (unless strict, where a missing CLI is fatal). 

390 # Skipped agents are recorded (name, reason) so the report can state exactly 

391 # which agents never ran — part of the partial-result policy (issue #30). 

392 usable: list[Adapter] = [] 

393 skipped: list[tuple[str, str]] = [] 

394 for a in adapters: 

395 if a.available(): 

396 usable.append(a) 

397 elif strict: 

398 raise RuntimeError(f"agent '{a.name}' CLI not available: {a.spec.command}") 

399 else: 

400 reason = f"CLI not found ({a.spec.command})" 

401 log(f"skipping '{a.name}': {reason}") 

402 skipped.append((a.name, reason)) 

403 if not usable: 

404 raise RuntimeError("no usable agents — install at least one agent CLI or use --mock") 

405 

406 usable_names = [a.name for a in usable] 

407 

408 # Round 1: independent reviews. 

409 log(f"round 1: {len(usable)} agents reviewing") 

410 review_prompt = { 

411 a.name: tmpl["review"].format( 

412 name=a.name, 

413 context=context or "_(none)_", 

414 diff=diff, 

415 policy=policy_section, 

416 notice=prompts._UNTRUSTED_NOTICE, 

417 ) 

418 for a in usable 

419 } 

420 reviews = _run_phase( 

421 usable, review_prompt, "review", config.parallel, 

422 budget=budget, retries=retries, log=log, 

423 ) 

424 # Stable ordering: the thread pool can return results in any completion 

425 # order. Reorder to the enabled-agent order so the report (and every 

426 # downstream consumer) is independent of which thread finished first. 

427 agent_order = [a.name for a in usable] 

428 reviews = _order_by_agents(reviews, agent_order) 

429 

430 # Parse structured findings from each successful review and aggregate them. 

431 # Seed with the synthetic injection finding/warnings so they surface in the 

432 # report and outcome.warnings without ever influencing agent behaviour. 

433 all_findings: list[Finding] = list(injection_findings) 

434 all_warnings: list[str] = injection.hits_to_warnings(injection_hits) 

435 all_warnings.extend(privilege_warnings) 

436 for r in reviews: 

437 if not r.ok: 

438 continue 

439 found, warns = parse_findings(r.output, r.agent) 

440 r.findings = found 

441 r.warnings = warns 

442 all_findings.extend(found) 

443 all_warnings.extend(warns) 

444 

445 # Stream round-1 reviews as they're now finalized (stable order). 

446 for r in reviews: 

447 emit("review", r) 

448 

449 # Deterministic consensus grouping across reviewers. 

450 groups = group_findings(all_findings, len(reviews)) 

451 

452 # Names of agents whose round-1 review succeeded — the chair resolver uses 

453 # this to (optionally) prefer a non-reviewer chair (#38). 

454 reviewer_names = [r.agent for r in reviews if r.ok] 

455 

456 # Resolve the chair ONCE for the whole run so verify and synthesis use the 

457 # SAME chair. ``chair = "rotate"`` and prefer-non-reviewer both consume the 

458 # shared run RNG / reviewer info, so resolving once (rather than recomputing 

459 # per phase) is what keeps a rotating chair stable within a run (#38). 

460 chair_name = resolve_chair(config, usable_names, reviewer_names, run_rng) 

461 

462 # Round 2+: debate. Only agents whose round-1 review succeeded participate. 

463 # Two modes (issue #40): 

464 # - fixed (early_stop = false): honour ``rounds`` exactly — run one debate 

465 # round iff rounds >= 2. Reproducible fixed-N behaviour for benchmarking. 

466 # - adaptive (early_stop = true): skip the debate when round-1 reviewers 

467 # already agree, otherwise run debate up to ``max_rounds`` rounds and stop 

468 # as soon as a round resolves all disputes. 

469 debate: list[AgentResult] = [] 

470 rounds_executed = 1 

471 stop_reason = "" 

472 budget_exhausted = False 

473 debaters = [a for a in usable if any(r.agent == a.name and r.ok for r in reviews)] 

474 can_debate = len(debaters) >= 2 

475 

476 if config.early_stop: 

477 max_rounds = config.effective_max_rounds 

478 if not can_debate: 

479 stop_reason = "stopped after round 1: need >=2 successful reviews to debate" 

480 log(stop_reason) 

481 elif max_rounds < 2: 

482 stop_reason = "stopped after round 1: max_rounds < 2" 

483 log(stop_reason) 

484 else: 

485 converged, why = convergence.review_convergence(groups, len(reviews)) 

486 if converged: 

487 stop_reason = f"early stop after round 1: {why}" 

488 log(stop_reason) 

489 else: 

490 log(f"early stop active: {why}; running debate up to {max_rounds} round(s)") 

491 prior: list[AgentResult] = [] 

492 round_no = 1 

493 while round_no < max_rounds: 

494 if budget.expired(): 

495 budget_exhausted = True 

496 stop_reason = f"stopped at round {rounds_executed}: run budget exhausted" 

497 log(stop_reason) 

498 break 

499 round_no += 1 

500 debate = _debate_round( 

501 debaters, reviews, diff, config, run_rng, agent_order, 

502 prior, budget, retries, log, round_no, 

503 template=tmpl["debate"], 

504 ) 

505 rounds_executed = round_no 

506 for r in debate: 

507 emit("debate", r, round_no) 

508 dconv, dwhy = convergence.debate_convergence(debate) 

509 if dconv: 

510 stop_reason = f"converged after round {round_no}: {dwhy}" 

511 log(stop_reason) 

512 break 

513 prior = debate 

514 stop_reason = f"ran {round_no} rounds: {dwhy}" 

515 else: 

516 stop_reason = stop_reason or ( 

517 f"reached max_rounds ({max_rounds}) with disagreement remaining" 

518 ) 

519 else: 

520 # Fixed-N: exactly the historical behaviour. 

521 if config.rounds >= 2 and can_debate: 

522 if budget.expired(): 

523 budget_exhausted = True 

524 stop_reason = "round 2 skipped: run budget exhausted" 

525 log(stop_reason) 

526 else: 

527 debate = _debate_round( 

528 debaters, reviews, diff, config, run_rng, agent_order, 

529 [], budget, retries, log, 2, 

530 template=tmpl["debate"], 

531 ) 

532 rounds_executed = 2 

533 for r in debate: 

534 emit("debate", r, 2) 

535 elif config.rounds >= 2: 

536 stop_reason = "round 2 skipped: need >=2 successful reviews to debate" 

537 log(stop_reason) 

538 else: 

539 stop_reason = "single round (rounds = 1)" 

540 

541 # Verification: the chair judges candidate findings to reduce false 

542 # positives. Skipped when the run budget is exhausted (issue #30) so a 

543 # partial run still returns what completed instead of overrunning. 

544 verify_result: AgentResult | None = None 

545 verdicts: list[Verdict] = [] 

546 if config.verify: 

547 if budget.expired(): 

548 budget_exhausted = True 

549 msg = "verification skipped: run budget exhausted" 

550 log(msg) 

551 all_warnings.append(msg) 

552 else: 

553 verify_result, verdicts, verify_warnings = _verify( 

554 chair_name, usable, all_findings, diff, context, budget, retries, log, 

555 template=tmpl["verify"], 

556 ) 

557 all_warnings.extend(verify_warnings) 

558 _apply_verdicts(groups, verdicts) 

559 if verify_result is not None: 

560 emit("verify", verify_result) 

561 

562 # Synthesis: the chair consolidates. When the resolved chair is ALSO a 

563 # round-1 reviewer, feed it an anonymized view of the reviews (#38 guardrail) 

564 # so it cannot preferentially weight its own findings; the report still 

565 # attributes by real name because it renders the real outcome data, not this 

566 # synthesis prompt. 

567 synthesis: AgentResult | None = None 

568 if budget.expired(): 

569 budget_exhausted = True 

570 msg = "synthesis skipped: run budget exhausted" 

571 log(msg) 

572 if msg not in all_warnings: 572 ↛ 594line 572 didn't jump to line 594 because the condition on line 572 was always true

573 all_warnings.append(msg) 

574 else: 

575 chair_is_reviewer = chair_name in reviewer_names 

576 anonymize_synthesis = config.anonymize_debate and chair_is_reviewer 

577 synthesis = _synthesize( 

578 chair_name, 

579 usable, 

580 reviews, 

581 debate, 

582 diff, 

583 budget, 

584 retries, 

585 log, 

586 verdicts=verdicts, 

587 anonymize_reviews=anonymize_synthesis, 

588 rng=run_rng, 

589 template=tmpl["synthesis"], 

590 ) 

591 if synthesis is not None: 

592 emit("synthesis", synthesis) 

593 

594 return JuryOutcome( 

595 reviews=reviews, 

596 debate=debate, 

597 synthesis=synthesis, 

598 chair=chair_name, 

599 findings=all_findings, 

600 warnings=all_warnings, 

601 groups=groups, 

602 verify=verify_result, 

603 verdicts=verdicts, 

604 context_mode=context_mode, 

605 redact_secrets=redact_on, 

606 redaction_count=redaction_count, 

607 injection_hits=injection_hits, 

608 skipped=skipped, 

609 budget_exhausted=budget_exhausted, 

610 rounds_executed=rounds_executed, 

611 stop_reason=stop_reason, 

612 ) 

613 

614 

615def resolve_chair( 

616 config: JuryConfig, 

617 usable: list[str], 

618 reviewers: list[str], 

619 rng: random.Random, 

620) -> str: 

621 """Resolve the chair for a run as a PURE function of its inputs (#38). 

622 

623 Precedence: 

624 1. ``chair = "rotate"`` — pick deterministically from the usable agents 

625 using the shared run ``rng``. Same seed -> same chair; different seeds 

626 may differ. Falls back to the first usable agent when none are usable. 

627 2. An explicit ``config.chair`` that names a usable agent — honoured as-is 

628 (an operator-chosen chair always wins). 

629 3. ``prefer_non_reviewer_chair`` — when set and a usable agent that was NOT 

630 a successful round-1 reviewer exists, prefer the first such agent 

631 (neutral chair). This only applies when the configured chair is not 

632 itself a usable agent. 

633 4. Fallback to the first usable agent (legacy behaviour). 

634 

635 Keeping this pure (no Adapter objects, no I/O) makes it directly 

636 unit-testable and guarantees ``_verify`` and ``_synthesize`` agree because 

637 the caller resolves it ONCE and threads the result through both. 

638 """ 

639 if not usable: 

640 return config.chair 

641 names = set(usable) 

642 

643 if config.chair == "rotate": 

644 # Deterministic rotation: sort for a stable candidate order independent 

645 # of dict/thread ordering, then index with the shared run RNG. Sorting 

646 # the candidate list (not iterating the set) makes the pick a pure 

647 # function of (seed, usable-name set): same seed + same agents -> same 

648 # chair, regardless of RNG-consumption order elsewhere. 

649 candidates = sorted(names) 

650 return candidates[rng.randrange(len(candidates))] 

651 

652 if config.chair in names: 

653 return config.chair 

654 

655 if config.prefer_non_reviewer_chair: 

656 reviewer_set = set(reviewers) 

657 non_reviewers = [n for n in usable if n not in reviewer_set] 

658 if non_reviewers: 

659 return non_reviewers[0] 

660 

661 return usable[0] 

662 

663 

664def _format_findings_for_verify(findings: list[Finding]) -> str: 

665 """Render candidate findings for the chair's verification prompt. 

666 

667 Reviewer identity is omitted (#250) so the chair can't favour its own 

668 findings while judging them — parity with the #37/#38 anonymization. 

669 Verdicts match back by file/line/claim, so dropping it is safe. 

670 """ 

671 if not findings: 

672 return "_(no candidate findings)_" 

673 lines = [] 

674 for f in findings: 

675 loc = f.file or "?" 

676 if f.line is not None: 

677 loc = f"{loc}:{f.line}" 

678 lines.append(f"- [{f.severity}] {loc}{f.claim}") 

679 return "\n".join(lines) 

680 

681 

682def _format_verdicts(verdicts: list[Verdict]) -> str: 

683 if not verdicts: 

684 return "_(no verification verdicts)_" 

685 lines = [] 

686 for v in verdicts: 

687 loc = v.file or "?" 

688 if v.line is not None: 

689 loc = f"{loc}:{v.line}" 

690 lines.append(f"- [{v.status}] {loc}{v.claim}: {v.reasoning}") 

691 return "\n".join(lines) 

692 

693 

694def _verify( 

695 chair_name, usable, findings, diff, context, budget, retries, log, 

696 template=prompts.VERIFY, 

697) -> tuple[AgentResult | None, list[Verdict], list[str]]: 

698 chair = next((a for a in usable if a.name == chair_name), None) 

699 if chair is None: 

700 return None, [], [] 

701 log(f"verification: chair '{chair_name}' judging {len(findings)} candidate findings") 

702 prompt = template.format( 

703 diff=diff, 

704 findings=_format_findings_for_verify(findings), 

705 context=context or "_(none)_", 

706 notice=prompts._UNTRUSTED_NOTICE, 

707 ) 

708 result = _run_with_retry(chair, prompt, "verify", budget, retries, log) 

709 if not result.ok: 

710 return result, [], [f"verification failed: {result.error}"] 

711 verdicts, warnings = parse_verdicts(result.output, chair_name) 

712 return result, verdicts, warnings 

713 

714 

715def _verdict_matches_group(verdict: Verdict, group: FindingGroup) -> bool: 

716 from .consensus import _normalize_claim, _normalize_path 

717 

718 rep = group.representative 

719 if _normalize_path(verdict.file) != _normalize_path(rep.file): 

720 return False 

721 if verdict.line is not None and rep.line is not None and abs(verdict.line - rep.line) > 3: 

722 return False 

723 v_claim = _normalize_claim(verdict.claim) 

724 r_claim = _normalize_claim(rep.claim) 

725 if not v_claim or v_claim == r_claim: 

726 return True 

727 v_tokens, r_tokens = set(v_claim.split()), set(r_claim.split()) 

728 if not v_tokens or not r_tokens: 

729 return False 

730 inter = len(v_tokens & r_tokens) 

731 union = len(v_tokens | r_tokens) 

732 return (inter / union if union else 0.0) >= 0.5 

733 

734 

735def _apply_verdicts(groups: list[FindingGroup], verdicts: list[Verdict]) -> None: 

736 """Attach verification statuses to consensus groups. 

737 

738 unsupported -> bucket 'rejected'; needs_human_decision -> bucket 'disputed'; 

739 verified -> status recorded, bucket unchanged. 

740 """ 

741 for verdict in verdicts: 

742 # Apply to every matching group: when reviewers phrase the same issue 

743 # slightly differently it can land in more than one group, and all of 

744 # them should carry the verifier's judgement. 

745 for group in groups: 

746 if group.status: 

747 continue 

748 if _verdict_matches_group(verdict, group): 

749 group.status = verdict.status 

750 group.status_reasoning = verdict.reasoning 

751 if verdict.status == "unsupported": 

752 group.bucket = "rejected" 

753 elif verdict.status == "needs_human_decision": 

754 group.bucket = "disputed" 

755 

756 

757def _synthesize( 

758 chair_name, 

759 usable, 

760 reviews, 

761 debate, 

762 diff, 

763 budget, 

764 retries, 

765 log, 

766 verdicts=None, 

767 anonymize_reviews=False, 

768 rng=None, 

769 template=prompts.SYNTHESIS, 

770) -> AgentResult | None: 

771 chair = next((a for a in usable if a.name == chair_name), None) 

772 if chair is None: 

773 return None 

774 log(f"synthesis: chair '{chair_name}' consolidating verdict") 

775 if anonymize_reviews: 

776 # Chair self-preference guardrail (#38): present round-1 reviews to the 

777 # chair under anonymous labels (no agent/vendor identity, no stable 

778 # order) so it cannot tell which review is "its own". Uses the shared run 

779 # RNG for deterministic-but-unstable ordering. ``me=None`` keeps ALL 

780 # reviews (we are not excluding a debater here, only stripping identity). 

781 peer_rng = random.Random(rng.random()) if rng is not None else random.Random() 

782 reviews_txt, _label_map = _anonymize_peers(reviews, None, peer_rng) 

783 else: 

784 reviews_txt = "\n\n".join( 

785 f"### {r.agent} ({r.vendor})\n{r.output}" for r in reviews if r.ok and r.output 

786 ) or "_(no reviews)_" 

787 debate_txt = "\n\n".join( 

788 f"### {r.agent}\n{r.output}" for r in debate if r.ok and r.output 

789 ) or "_(no debate round)_" 

790 prompt = template.format( 

791 diff=diff, 

792 reviews=reviews_txt, 

793 debate=debate_txt, 

794 notice=prompts._UNTRUSTED_NOTICE, 

795 ) 

796 if verdicts: 

797 prompt += f"\n\n=== VERIFICATION VERDICTS ===\n{_format_verdicts(verdicts)}\n" 

798 return _run_with_retry(chair, prompt, "synthesis", budget, retries, log) 

799 

800 

801def _merge_results_by_agent(phase_lists: list[list[AgentResult]]) -> list[AgentResult]: 

802 """Merge per-chunk results for the same agent into one result (issue #31). 

803 

804 Outputs are concatenated under per-chunk headers, durations summed, ``ok`` is 

805 true if the agent succeeded on any chunk, and ``attempts`` keeps the max so a 

806 retried chunk is still visible. Agent order follows first appearance. 

807 """ 

808 order: list[str] = [] 

809 by_agent: dict[str, list[AgentResult]] = {} 

810 for lst in phase_lists: 

811 for r in lst: 

812 if r.agent not in by_agent: 

813 by_agent[r.agent] = [] 

814 order.append(r.agent) 

815 by_agent[r.agent].append(r) 

816 

817 merged: list[AgentResult] = [] 

818 for name in order: 

819 parts = by_agent[name] 

820 ok = any(p.ok for p in parts) 

821 body = "\n\n".join( 

822 f"#### chunk {i}\n{p.output}" 

823 for i, p in enumerate(parts, 1) 

824 if p.ok and p.output 

825 ) 

826 first_err = next((p for p in parts if not p.ok), None) 

827 merged.append( 

828 AgentResult( 

829 name, 

830 parts[0].vendor, 

831 ok, 

832 body, 

833 round(sum(p.duration_s for p in parts), 3), 

834 error=None if ok else (first_err.error if first_err else None), 

835 error_code=None if ok else (first_err.error_code if first_err else None), 

836 attempts=max(p.attempts for p in parts), 

837 ) 

838 ) 

839 return merged 

840 

841 

842def _combine_chair_results( 

843 results: list[AgentResult], chair: str 

844) -> AgentResult | None: 

845 """Combine per-chunk chair results (verify/synthesis) into one labelled result.""" 

846 ok_parts = [r for r in results if r.ok and r.output] 

847 if not ok_parts: 

848 return results[0] if results else None 

849 vendor = ok_parts[0].vendor 

850 body = "\n\n".join( 

851 f"### chunk {i}\n{r.output}" for i, r in enumerate(ok_parts, 1) 

852 ) 

853 return AgentResult( 

854 chair, vendor, True, body, round(sum(r.duration_s for r in ok_parts), 3) 

855 ) 

856 

857 

858def _merge_chunk_outcomes(outcomes: list[JuryOutcome]) -> JuryOutcome: 

859 """Fold per-chunk outcomes (issue #31) into one renderable JuryOutcome. 

860 

861 Findings are unioned and re-grouped across all chunks so the consensus view 

862 is global; verdicts are re-applied to the merged groups. Review/debate/chair 

863 outputs are merged per agent with chunk labels so the report stays coherent. 

864 """ 

865 if len(outcomes) == 1: 

866 return outcomes[0] 

867 base = outcomes[0] 

868 

869 reviews = _merge_results_by_agent([o.reviews for o in outcomes]) 

870 debate = ( 

871 _merge_results_by_agent([o.debate for o in outcomes]) 

872 if any(o.debate for o in outcomes) 

873 else [] 

874 ) 

875 findings = [f for o in outcomes for f in o.findings] 

876 groups = group_findings(findings, len(reviews)) 

877 verdicts = [v for o in outcomes for v in o.verdicts] 

878 _apply_verdicts(groups, verdicts) 

879 warnings = [w for o in outcomes for w in o.warnings] 

880 

881 synthesis = _combine_chair_results([o.synthesis for o in outcomes if o.synthesis], base.chair) 

882 verify = _combine_chair_results([o.verify for o in outcomes if o.verify], base.chair) 

883 

884 return JuryOutcome( 

885 reviews=reviews, 

886 debate=debate, 

887 synthesis=synthesis, 

888 chair=base.chair, 

889 findings=findings, 

890 warnings=warnings, 

891 groups=groups, 

892 verify=verify, 

893 verdicts=verdicts, 

894 context_mode=base.context_mode, 

895 redact_secrets=base.redact_secrets, 

896 redaction_count=sum(o.redaction_count for o in outcomes), 

897 injection_hits=[h for o in outcomes for h in o.injection_hits], 

898 skipped=base.skipped, 

899 budget_exhausted=any(o.budget_exhausted for o in outcomes), 

900 rounds_executed=max(o.rounds_executed for o in outcomes), 

901 stop_reason=f"chunked review across {len(outcomes)} part(s)", 

902 ) 

903 

904 

905def review_diff( 

906 config: JuryConfig, 

907 diff: str, 

908 *, 

909 context: str = "", 

910 mock: bool = False, 

911 strict: bool = False, 

912 seed: int | None = None, 

913 policy: ReviewPolicy | None = None, 

914 log=lambda _msg: None, 

915 on_event=None, 

916) -> tuple[JuryOutcome, largediff.DiffPlan]: 

917 """Plan a diff (filter + size + mode) then run the jury (issue #31). 

918 

919 The single entry point the CLI uses: it measures and filters the diff, 

920 reports the size and the selected handling mode, and dispatches: 

921 

922 - ``full`` — review the filtered diff in one ``run_jury`` pass; 

923 - ``chunked`` — review each chunk and merge the outcomes; 

924 - ``too_large`` — raise ``RuntimeError`` with an actionable message. 

925 

926 Returns ``(outcome, plan)`` so the caller can surface the plan. Existing 

927 callers of :func:`run_jury` are unaffected. 

928 """ 

929 dc = config.diff 

930 plan = largediff.plan_diff( 

931 diff, 

932 max_bytes=dc.max_bytes, 

933 chunk=dc.chunk, 

934 chunk_max_bytes=dc.chunk_max_bytes, 

935 exclude_generated=dc.exclude_generated, 

936 exclude=dc.exclude, 

937 include=dc.include, 

938 ) 

939 log( 

940 f"diff size: {plan.total_bytes} B total, {plan.kept_bytes} B after filters " 

941 f"({len(plan.kept)} file(s) kept, {len(plan.excluded)} excluded); " 

942 f"mode: {plan.mode}" 

943 ) 

944 if plan.excluded: 

945 log("excluded: " + ", ".join(f"{p} [{why}]" for p, why in plan.excluded)) 

946 log(plan.reason) 

947 

948 if plan.mode == largediff.MODE_TOO_LARGE: 

949 raise RuntimeError(f"diff too large to review: {plan.reason}") 

950 if not plan.chunks: 

951 raise RuntimeError( 

952 "nothing to review after filters — all files were excluded " 

953 "(check [jury.diff] include/exclude patterns)" 

954 ) 

955 

956 # One shared budget across all chunks so ``total_timeout`` bounds the WHOLE 

957 # review, not each chunk independently (review finding). ``phase_timeout`` and 

958 # per-agent timeouts still apply per call via the same budget. 

959 shared_budget = RunBudget(config.total_timeout, config.phase_timeout) 

960 

961 # Redact the shared context ONCE here, before fan-out (#249). The same context 

962 # is reviewed against every chunk; letting each per-chunk ``run_jury`` redact 

963 # it would count its secrets once per chunk and ``_merge_chunk_outcomes`` would 

964 # sum them, inflating ``redaction_count`` (e.g. a 1-secret context over 8 

965 # chunks reported 8). Pre-redacting makes each chunk's re-redaction a no-op — 

966 # the ``[REDACTED:…]`` placeholders no longer match — so we add the one-time 

967 # context count back at the end. Diff/chunk redactions are still counted 

968 # per chunk and summed, which is correct (each chunk's diff is distinct). 

969 # `config.context` is the right path: `_from_dict` flattens the `[jury]` 

970 # table onto JuryConfig, so the `[jury.context]` sub-table is `config.context` 

971 # (a ContextConfig), NOT `config.jury.context` — there is no `config.jury`. 

972 # This mirrors how run_jury() reads it. 

973 ctx_cfg = getattr(config, "context", None) 

974 ctx_mode = getattr(ctx_cfg, "mode", "diff-only") if ctx_cfg else "diff-only" 

975 redact_on = getattr(ctx_cfg, "redact_secrets", True) if ctx_cfg else True 

976 context_redactions = 0 

977 if redact_on and ctx_mode != "diff-only" and context: 

978 context, context_redactions = redact(context) 

979 

980 def _run(chunk: str) -> JuryOutcome: 

981 return run_jury( 

982 config, chunk, context=context, mock=mock, strict=strict, 

983 seed=seed, policy=policy, log=log, budget=shared_budget, 

984 on_event=on_event, 

985 ) 

986 

987 def _finalize(outcome: JuryOutcome) -> JuryOutcome: 

988 # Add the one-time context redaction count (per-chunk runs saw an already- 

989 # redacted context and counted 0 for it). 

990 if not context_redactions: 

991 return outcome 

992 return replace(outcome, redaction_count=outcome.redaction_count + context_redactions) 

993 

994 if plan.mode == largediff.MODE_FULL: 

995 return _finalize(_run(plan.chunks[0])), plan 

996 

997 outcomes = [] 

998 for i, chunk in enumerate(plan.chunks, 1): 

999 log(f"reviewing chunk {i}/{len(plan.chunks)}") 

1000 outcomes.append(_run(chunk)) 

1001 return _finalize(_merge_chunk_outcomes(outcomes)), plan