Coverage for src/ai_jury/orchestrator.py: 99%
424 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-05 20:29 +0000
1"""Jury orchestration: review -> debate -> synthesis.
3The orchestrator owns the round structure and prompt assembly; adapters only run
4their CLI. Rounds run agents concurrently (thread pool) because each call is an
5independent, IO-bound subprocess.
6"""
7from __future__ import annotations
9import random
10import string
11import time
12from concurrent.futures import ThreadPoolExecutor
13from dataclasses import dataclass, field, replace
15from . import convergence, injection, largediff, prompts
16from .adapters import RETRYABLE_ERROR_CODES, Adapter, AgentResult, make_adapter
17from .config import JuryConfig
18from .consensus import FindingGroup, group_findings
19from .findings import Finding, Verdict, parse_findings, parse_verdicts
20from .policy import ReviewPolicy, render_policy_section
21from .privilege import audit_privilege
22from .redaction import redact
25class RunBudget:
26 """Wall-clock budget for one jury run (issue #30).
28 Tracks the elapsed time since construction and derives the timeout to pass a
29 single agent call from the optional total-run and per-phase budgets. ``None``
30 for either budget means uncapped; when both are unset ``call_timeout``
31 returns ``None`` so adapters fall back to their own per-agent timeout and
32 behaviour is identical to having no budget at all.
33 """
35 def __init__(self, total_timeout: int | None, phase_timeout: int | None):
36 self.total = total_timeout
37 self.phase = phase_timeout
38 self._start = time.monotonic()
40 def elapsed(self) -> float:
41 return time.monotonic() - self._start
43 def remaining(self) -> float | None:
44 if self.total is None:
45 return None
46 return max(0.0, self.total - self.elapsed())
48 def expired(self) -> bool:
49 return self.total is not None and self.elapsed() >= self.total
51 def call_timeout(self) -> int | None:
52 """Per-call timeout: the min of the phase budget and remaining total.
54 The agent's own per-agent timeout is applied by the adapter (it takes the
55 min with this value), so it is not needed here. Returns ``None`` when
56 neither budget caps the call, leaving the adapter to use its configured
57 per-agent timeout.
58 """
59 caps: list[float] = []
60 if self.phase is not None:
61 caps.append(float(self.phase))
62 remaining = self.remaining()
63 if remaining is not None:
64 caps.append(remaining)
65 if not caps:
66 return None
67 return max(1, int(min(caps)))
70def _run_with_retry(
71 adapter: Adapter,
72 prompt: str,
73 phase: str,
74 budget: RunBudget,
75 retries: int,
76 log,
77) -> AgentResult:
78 """Run one agent for one phase, retrying transient failures (issue #30).
80 Retries only failures whose typed error code is in
81 ``RETRYABLE_ERROR_CODES`` (timeout/rate-limit/spawn), up to ``retries`` extra
82 attempts. A deterministic failure (auth, missing CLI, empty output, generic
83 nonzero exit) is returned immediately. The returned result's ``attempts``
84 records how many tries were made. Retrying stops early when the run budget is
85 exhausted so a retry never overruns the total timeout.
86 """
87 max_attempts = max(1, retries + 1)
88 result = adapter.run(prompt, phase=phase, timeout=budget.call_timeout())
89 attempts = 1
90 while (
91 not result.ok
92 and result.error_code in RETRYABLE_ERROR_CODES
93 and attempts < max_attempts
94 and not budget.expired()
95 ):
96 log(
97 f"{adapter.name}: {phase} attempt {attempts} failed "
98 f"({result.error_code}); retrying"
99 )
100 result = adapter.run(
101 prompt, phase=phase, timeout=budget.call_timeout()
102 )
103 attempts += 1
104 result.attempts = attempts
105 return result
108def _order_by_agents(results: list[AgentResult], order: list[str]) -> list[AgentResult]:
109 """Reorder phase results into the configured/enabled agent order.
111 Round phases run agents concurrently (ThreadPoolExecutor.map), so the order
112 in which results arrive is not guaranteed across runs. The report and all
113 downstream consumers must NOT depend on thread-completion order, so we sort
114 every phase's results by each agent's index in ``order`` (the stable
115 enabled-agent list). Agents not present in ``order`` (should not happen)
116 sort to the end, preserving their relative arrival order as a stable
117 tiebreak so the sort is total and deterministic.
118 """
119 index = {name: i for i, name in enumerate(order)}
120 fallback = len(order)
121 return sorted(results, key=lambda r: index.get(r.agent, fallback))
124@dataclass
125class JuryOutcome:
126 reviews: list[AgentResult]
127 debate: list[AgentResult]
128 synthesis: AgentResult | None
129 chair: str
130 findings: list[Finding] = field(default_factory=list)
131 warnings: list[str] = field(default_factory=list)
132 groups: list[FindingGroup] = field(default_factory=list)
133 verify: AgentResult | None = None
134 verdicts: list[Verdict] = field(default_factory=list)
135 context_mode: str = "diff-only"
136 redact_secrets: bool = True
137 redaction_count: int = 0
138 injection_hits: list = field(default_factory=list)
139 # Execution/partial-result signals (issue #30): agents skipped because their
140 # CLI was unavailable (name, reason), and whether the run budget was
141 # exhausted before all phases completed.
142 skipped: list = field(default_factory=list)
143 budget_exhausted: bool = False
144 # Adaptive-rounds signals (issue #40): rounds actually executed and a short
145 # human-readable reason for why the debate ran / stopped.
146 rounds_executed: int = 1
147 stop_reason: str = ""
148 # Set when this outcome was served from the local result cache (issue #33),
149 # so the report/metadata can mark it as cached rather than freshly computed.
150 from_cache: bool = False
153def _run_phase(
154 adapters: list[Adapter],
155 prompt_for: dict[str, str],
156 phase: str,
157 parallel: bool,
158 *,
159 budget: RunBudget,
160 retries: int,
161 log,
162) -> list[AgentResult]:
163 def task(a: Adapter) -> AgentResult:
164 return _run_with_retry(a, prompt_for[a.name], phase, budget, retries, log)
166 if parallel and len(adapters) > 1:
167 with ThreadPoolExecutor(max_workers=len(adapters)) as pool:
168 return list(pool.map(task, adapters))
169 return [task(a) for a in adapters]
172def _others(reviews: list[AgentResult], me: str) -> str:
173 """Identity-labeled peer reviews (legacy path; ``anonymize_debate = false``).
175 Renders each *other* reviewer's round-1 output with its real agent/vendor
176 identity in the stable enabled-agent order. This is the pre-#37 behaviour and
177 leaks both identity and position; the anonymizing path below is the default.
178 """
179 chunks = [
180 f"### {r.agent} ({r.vendor})\n{r.output}"
181 for r in reviews
182 if r.agent != me and r.ok and r.output
183 ]
184 return "\n\n".join(chunks) if chunks else "_(no other reviews available)_"
187def _anon_label(i: int) -> str:
188 """Stable anonymous reviewer label: 0->'A', 1->'B', ... 26->'AA'."""
189 letters = string.ascii_uppercase
190 label = ""
191 i += 1
192 while i > 0:
193 i, rem = divmod(i - 1, 26)
194 label = letters[rem] + label
195 return label
198def _anonymize_peers(
199 reviews: list[AgentResult], me: str, rng: random.Random
200) -> tuple[str, dict[str, str]]:
201 """Chatham House peer view for a debater (#37).
203 Returns ``(prompt_text, label_to_agent)`` where the prompt text renders each
204 *other* successful reviewer's round-1 output under an anonymous
205 ``### Reviewer A`` / ``### Reviewer B`` heading — NO vendor or agent name.
206 The debater's OWN review is excluded (it is passed separately as
207 ``own_review``). Presentation order is shuffled DETERMINISTICALLY using the
208 shared run RNG so neither identity nor position is a stable signal; the same
209 seed yields the same order, different seeds may differ.
211 ``label_to_agent`` keeps the anonymous-label -> real-agent mapping internal so
212 callers can still recover authorship (the report attributes by real name).
213 """
214 peers = [r for r in reviews if r.agent != me and r.ok and r.output]
215 if not peers:
216 return "_(no other reviews available)_", {}
217 # Deterministic per-debater shuffle from the shared run RNG. We shuffle a
218 # copy so the caller's review list (used elsewhere) is untouched.
219 order = list(peers)
220 rng.shuffle(order)
221 chunks: list[str] = []
222 label_to_agent: dict[str, str] = {}
223 for i, r in enumerate(order):
224 label = f"Reviewer {_anon_label(i)}"
225 label_to_agent[label] = r.agent
226 chunks.append(f"### {label}\n{r.output}")
227 return "\n\n".join(chunks), label_to_agent
230def _debate_round(
231 debaters: list[Adapter],
232 reviews: list[AgentResult],
233 diff: str,
234 config: JuryConfig,
235 run_rng: random.Random,
236 agent_order: list[str],
237 prior: list[AgentResult],
238 budget: RunBudget,
239 retries: int,
240 log,
241 round_no: int,
242 template: str = prompts.DEBATE,
243) -> list[AgentResult]:
244 """Run one debate round and return its results in stable agent order.
246 ``prior`` holds the previous round's debate outputs (empty for the first
247 debate round); when present they are appended to each debater's prompt as a
248 "prior debate" addendum so later rounds in an adaptive run (issue #40) build
249 on, rather than repeat, earlier cross-examination. The peer-review anonymizing
250 path (#37) is preserved unchanged.
251 """
252 log(f"round {round_no}: {len(debaters)} agents cross-examining")
253 own = {r.agent: r.output for r in reviews if r.ok}
254 prior_txt = "\n\n".join(
255 f"### {r.agent}\n{r.output}" for r in prior if r.ok and r.output
256 )
257 debate_prompt: dict[str, str] = {}
258 for a in debaters:
259 if config.anonymize_debate:
260 # Per-debater deterministic shuffle: derive a child RNG from the
261 # shared run RNG so each debater gets an independent but reproducible
262 # peer ordering (same seed -> same order).
263 peer_rng = random.Random(run_rng.random())
264 other_reviews, _label_map = _anonymize_peers(reviews, a.name, peer_rng)
265 else:
266 other_reviews = _others(reviews, a.name)
267 text = template.format(
268 name=a.name,
269 diff=diff,
270 own_review=own.get(a.name, "_(your review was unavailable)_"),
271 other_reviews=other_reviews,
272 notice=prompts._UNTRUSTED_NOTICE,
273 )
274 if prior_txt:
275 text += (
276 "\n\n=== PRIOR DEBATE (earlier round) ===\n"
277 "Build on this; do not just repeat it. Only keep a DISPUTE or "
278 "MISSED item if it is still unresolved.\n\n" + prior_txt + "\n"
279 )
280 debate_prompt[a.name] = text
281 results = _run_phase(
282 debaters, debate_prompt, "debate", config.parallel,
283 budget=budget, retries=retries, log=log,
284 )
285 # Same stable-ordering guarantee as round 1: independent of thread-pool
286 # completion order.
287 return _order_by_agents(results, agent_order)
290def run_jury(
291 config: JuryConfig,
292 diff: str,
293 *,
294 context: str = "",
295 mock: bool = False,
296 strict: bool = False,
297 seed: int | None = None,
298 policy: ReviewPolicy | None = None,
299 log=lambda _msg: None,
300 budget: RunBudget | None = None,
301 on_event=None,
302 mode: str = "code",
303) -> JuryOutcome:
304 # Jury mode (issue #221): "code" (default) reviews a diff with the code-review
305 # rubric; "issue" reviews a GitHub issue's prose for completeness/clarity.
306 # Only the prompt TEMPLATES differ — the round structure, consensus, voting,
307 # verification, ordering, and determinism are identical. ``tmpl`` selects the
308 # four phase templates; each is threaded into the phase that uses it so the
309 # call sites are otherwise unchanged.
310 tmpl = prompts.for_mode(mode)
311 # Live play-by-play hook (issue #210): an optional callback fired after each
312 # phase result is produced — ``on_event(kind, result, round_no=None)`` with
313 # kind in {"review", "debate", "verify", "synthesis"}. It lets a caller stream
314 # the deliberation as it happens (CLI ``--live``) without the orchestrator
315 # doing any I/O itself. Fired in stable per-phase order (not thread-completion
316 # order) so the event sequence is deterministic. Defaults to a no-op.
317 emit = on_event or (lambda *_a, **_k: None)
318 # Repository review policy (optional, #8): maintainer-authored, TRUSTED
319 # content rendered into each REVIEW prompt in a clearly separated section.
320 # When ``policy`` is None a sentinel placeholder is used, so the prompt is
321 # unchanged except for that section. The policy is distinct from the
322 # agent-runtime ``config`` and never enters the untrusted diff/context fences.
323 policy_section = render_policy_section(policy)
324 # Run reproducibility: a single shared RNG seeds every randomized
325 # orchestration decision (future: anonymized-rebuttal order, rotating
326 # chair, tie-breaks). The seed comes from the explicit ``seed`` argument if
327 # given, else from ``config.seed``. We construct a dedicated
328 # ``random.Random`` instance rather than touching the global ``random``
329 # module so seeding a jury run never perturbs unrelated global state.
330 # When the seed is None the RNG is unseeded (still deterministic
331 # orchestration; randomness, if any, is just not reproducible run-to-run).
332 # LLM output itself is never made deterministic by this — only the
333 # orchestration around it. ``run_rng`` is the shared run RNG: pass it to
334 # any feature that needs reproducible randomness instead of using ``random``.
335 run_seed = seed if seed is not None else config.seed
336 run_rng = random.Random(run_seed) # shared run RNG (see docstring)
338 # Run budget (issue #30): a single wall-clock budget threaded through every
339 # phase. Defaults (both None) leave behaviour identical to no budget, with
340 # each agent bounded only by its own per-agent timeout. ``retries`` is the
341 # number of extra attempts for transient (retryable) failures. A caller may
342 # pass a SHARED budget so ``total_timeout`` spans a whole chunked review
343 # rather than resetting per chunk (issue #31 / review finding).
344 if budget is None:
345 budget = RunBudget(config.total_timeout, config.phase_timeout)
346 retries = config.retries
348 # Context policy: diff-only sends only the diff; expanded includes context.
349 ctx_cfg = getattr(config, "context", None)
350 context_mode = getattr(ctx_cfg, "mode", "diff-only") if ctx_cfg else "diff-only"
351 redact_on = getattr(ctx_cfg, "redact_secrets", True) if ctx_cfg else True
352 if context_mode == "diff-only":
353 context = ""
354 redaction_count = 0
355 if redact_on:
356 diff, _n1 = redact(diff)
357 context, _n2 = redact(context)
358 redaction_count = _n1 + _n2
359 if redaction_count:
360 log(f"redacted {redaction_count} secret(s) before sending to agents")
362 # Prompt-injection heuristic (OWASP LLM01): scan untrusted diff/context for
363 # patterns that try to override instructions, then SURFACE them as a synthetic
364 # finding/warning. We never act on them; the CI gate is derived from
365 # structured consensus (see ci.evaluate_ci), so an injected "APPROVE"
366 # cannot flip the verdict.
367 injection_hits = injection.scan_inputs(diff, context)
368 injection_findings: list[Finding] = []
369 if injection_hits:
370 log(f"prompt-injection heuristic: {len(injection_hits)} suspicious pattern(s) flagged")
371 syn = injection.hits_to_finding(injection_hits)
372 if syn is not None: 372 ↛ 377line 372 didn't jump to line 377 because the condition on line 372 was always true
373 injection_findings.append(syn)
375 # Least-privilege audit: warn when a configured agent could perform
376 # write/tool actions while reviewing attacker-controlled content.
377 privilege_warnings = audit_privilege(config.enabled_agents)
378 for w in privilege_warnings:
379 log(f"least-privilege warning: {w}")
380 if strict and privilege_warnings:
381 raise RuntimeError(
382 "least-privilege check failed (--strict): "
383 + "; ".join(privilege_warnings)
384 )
386 specs = config.enabled_agents
387 adapters = [make_adapter(s, mock=mock) for s in specs]
389 # Filter to available agents (unless strict, where a missing CLI is fatal).
390 # Skipped agents are recorded (name, reason) so the report can state exactly
391 # which agents never ran — part of the partial-result policy (issue #30).
392 usable: list[Adapter] = []
393 skipped: list[tuple[str, str]] = []
394 for a in adapters:
395 if a.available():
396 usable.append(a)
397 elif strict:
398 raise RuntimeError(f"agent '{a.name}' CLI not available: {a.spec.command}")
399 else:
400 reason = f"CLI not found ({a.spec.command})"
401 log(f"skipping '{a.name}': {reason}")
402 skipped.append((a.name, reason))
403 if not usable:
404 raise RuntimeError("no usable agents — install at least one agent CLI or use --mock")
406 usable_names = [a.name for a in usable]
408 # Round 1: independent reviews.
409 log(f"round 1: {len(usable)} agents reviewing")
410 review_prompt = {
411 a.name: tmpl["review"].format(
412 name=a.name,
413 context=context or "_(none)_",
414 diff=diff,
415 policy=policy_section,
416 notice=prompts._UNTRUSTED_NOTICE,
417 )
418 for a in usable
419 }
420 reviews = _run_phase(
421 usable, review_prompt, "review", config.parallel,
422 budget=budget, retries=retries, log=log,
423 )
424 # Stable ordering: the thread pool can return results in any completion
425 # order. Reorder to the enabled-agent order so the report (and every
426 # downstream consumer) is independent of which thread finished first.
427 agent_order = [a.name for a in usable]
428 reviews = _order_by_agents(reviews, agent_order)
430 # Parse structured findings from each successful review and aggregate them.
431 # Seed with the synthetic injection finding/warnings so they surface in the
432 # report and outcome.warnings without ever influencing agent behaviour.
433 all_findings: list[Finding] = list(injection_findings)
434 all_warnings: list[str] = injection.hits_to_warnings(injection_hits)
435 all_warnings.extend(privilege_warnings)
436 for r in reviews:
437 if not r.ok:
438 continue
439 found, warns = parse_findings(r.output, r.agent)
440 r.findings = found
441 r.warnings = warns
442 all_findings.extend(found)
443 all_warnings.extend(warns)
445 # Stream round-1 reviews as they're now finalized (stable order).
446 for r in reviews:
447 emit("review", r)
449 # Deterministic consensus grouping across reviewers.
450 groups = group_findings(all_findings, len(reviews))
452 # Names of agents whose round-1 review succeeded — the chair resolver uses
453 # this to (optionally) prefer a non-reviewer chair (#38).
454 reviewer_names = [r.agent for r in reviews if r.ok]
456 # Resolve the chair ONCE for the whole run so verify and synthesis use the
457 # SAME chair. ``chair = "rotate"`` and prefer-non-reviewer both consume the
458 # shared run RNG / reviewer info, so resolving once (rather than recomputing
459 # per phase) is what keeps a rotating chair stable within a run (#38).
460 chair_name = resolve_chair(config, usable_names, reviewer_names, run_rng)
462 # Round 2+: debate. Only agents whose round-1 review succeeded participate.
463 # Two modes (issue #40):
464 # - fixed (early_stop = false): honour ``rounds`` exactly — run one debate
465 # round iff rounds >= 2. Reproducible fixed-N behaviour for benchmarking.
466 # - adaptive (early_stop = true): skip the debate when round-1 reviewers
467 # already agree, otherwise run debate up to ``max_rounds`` rounds and stop
468 # as soon as a round resolves all disputes.
469 debate: list[AgentResult] = []
470 rounds_executed = 1
471 stop_reason = ""
472 budget_exhausted = False
473 debaters = [a for a in usable if any(r.agent == a.name and r.ok for r in reviews)]
474 can_debate = len(debaters) >= 2
476 if config.early_stop:
477 max_rounds = config.effective_max_rounds
478 if not can_debate:
479 stop_reason = "stopped after round 1: need >=2 successful reviews to debate"
480 log(stop_reason)
481 elif max_rounds < 2:
482 stop_reason = "stopped after round 1: max_rounds < 2"
483 log(stop_reason)
484 else:
485 converged, why = convergence.review_convergence(groups, len(reviews))
486 if converged:
487 stop_reason = f"early stop after round 1: {why}"
488 log(stop_reason)
489 else:
490 log(f"early stop active: {why}; running debate up to {max_rounds} round(s)")
491 prior: list[AgentResult] = []
492 round_no = 1
493 while round_no < max_rounds:
494 if budget.expired():
495 budget_exhausted = True
496 stop_reason = f"stopped at round {rounds_executed}: run budget exhausted"
497 log(stop_reason)
498 break
499 round_no += 1
500 debate = _debate_round(
501 debaters, reviews, diff, config, run_rng, agent_order,
502 prior, budget, retries, log, round_no,
503 template=tmpl["debate"],
504 )
505 rounds_executed = round_no
506 for r in debate:
507 emit("debate", r, round_no)
508 dconv, dwhy = convergence.debate_convergence(debate)
509 if dconv:
510 stop_reason = f"converged after round {round_no}: {dwhy}"
511 log(stop_reason)
512 break
513 prior = debate
514 stop_reason = f"ran {round_no} rounds: {dwhy}"
515 else:
516 stop_reason = stop_reason or (
517 f"reached max_rounds ({max_rounds}) with disagreement remaining"
518 )
519 else:
520 # Fixed-N: exactly the historical behaviour.
521 if config.rounds >= 2 and can_debate:
522 if budget.expired():
523 budget_exhausted = True
524 stop_reason = "round 2 skipped: run budget exhausted"
525 log(stop_reason)
526 else:
527 debate = _debate_round(
528 debaters, reviews, diff, config, run_rng, agent_order,
529 [], budget, retries, log, 2,
530 template=tmpl["debate"],
531 )
532 rounds_executed = 2
533 for r in debate:
534 emit("debate", r, 2)
535 elif config.rounds >= 2:
536 stop_reason = "round 2 skipped: need >=2 successful reviews to debate"
537 log(stop_reason)
538 else:
539 stop_reason = "single round (rounds = 1)"
541 # Verification: the chair judges candidate findings to reduce false
542 # positives. Skipped when the run budget is exhausted (issue #30) so a
543 # partial run still returns what completed instead of overrunning.
544 verify_result: AgentResult | None = None
545 verdicts: list[Verdict] = []
546 if config.verify:
547 if budget.expired():
548 budget_exhausted = True
549 msg = "verification skipped: run budget exhausted"
550 log(msg)
551 all_warnings.append(msg)
552 else:
553 verify_result, verdicts, verify_warnings = _verify(
554 chair_name, usable, all_findings, diff, context, budget, retries, log,
555 template=tmpl["verify"],
556 )
557 all_warnings.extend(verify_warnings)
558 _apply_verdicts(groups, verdicts)
559 if verify_result is not None:
560 emit("verify", verify_result)
562 # Synthesis: the chair consolidates. When the resolved chair is ALSO a
563 # round-1 reviewer, feed it an anonymized view of the reviews (#38 guardrail)
564 # so it cannot preferentially weight its own findings; the report still
565 # attributes by real name because it renders the real outcome data, not this
566 # synthesis prompt.
567 synthesis: AgentResult | None = None
568 if budget.expired():
569 budget_exhausted = True
570 msg = "synthesis skipped: run budget exhausted"
571 log(msg)
572 if msg not in all_warnings: 572 ↛ 594line 572 didn't jump to line 594 because the condition on line 572 was always true
573 all_warnings.append(msg)
574 else:
575 chair_is_reviewer = chair_name in reviewer_names
576 anonymize_synthesis = config.anonymize_debate and chair_is_reviewer
577 synthesis = _synthesize(
578 chair_name,
579 usable,
580 reviews,
581 debate,
582 diff,
583 budget,
584 retries,
585 log,
586 verdicts=verdicts,
587 anonymize_reviews=anonymize_synthesis,
588 rng=run_rng,
589 template=tmpl["synthesis"],
590 )
591 if synthesis is not None:
592 emit("synthesis", synthesis)
594 return JuryOutcome(
595 reviews=reviews,
596 debate=debate,
597 synthesis=synthesis,
598 chair=chair_name,
599 findings=all_findings,
600 warnings=all_warnings,
601 groups=groups,
602 verify=verify_result,
603 verdicts=verdicts,
604 context_mode=context_mode,
605 redact_secrets=redact_on,
606 redaction_count=redaction_count,
607 injection_hits=injection_hits,
608 skipped=skipped,
609 budget_exhausted=budget_exhausted,
610 rounds_executed=rounds_executed,
611 stop_reason=stop_reason,
612 )
615def resolve_chair(
616 config: JuryConfig,
617 usable: list[str],
618 reviewers: list[str],
619 rng: random.Random,
620) -> str:
621 """Resolve the chair for a run as a PURE function of its inputs (#38).
623 Precedence:
624 1. ``chair = "rotate"`` — pick deterministically from the usable agents
625 using the shared run ``rng``. Same seed -> same chair; different seeds
626 may differ. Falls back to the first usable agent when none are usable.
627 2. An explicit ``config.chair`` that names a usable agent — honoured as-is
628 (an operator-chosen chair always wins).
629 3. ``prefer_non_reviewer_chair`` — when set and a usable agent that was NOT
630 a successful round-1 reviewer exists, prefer the first such agent
631 (neutral chair). This only applies when the configured chair is not
632 itself a usable agent.
633 4. Fallback to the first usable agent (legacy behaviour).
635 Keeping this pure (no Adapter objects, no I/O) makes it directly
636 unit-testable and guarantees ``_verify`` and ``_synthesize`` agree because
637 the caller resolves it ONCE and threads the result through both.
638 """
639 if not usable:
640 return config.chair
641 names = set(usable)
643 if config.chair == "rotate":
644 # Deterministic rotation: sort for a stable candidate order independent
645 # of dict/thread ordering, then index with the shared run RNG. Sorting
646 # the candidate list (not iterating the set) makes the pick a pure
647 # function of (seed, usable-name set): same seed + same agents -> same
648 # chair, regardless of RNG-consumption order elsewhere.
649 candidates = sorted(names)
650 return candidates[rng.randrange(len(candidates))]
652 if config.chair in names:
653 return config.chair
655 if config.prefer_non_reviewer_chair:
656 reviewer_set = set(reviewers)
657 non_reviewers = [n for n in usable if n not in reviewer_set]
658 if non_reviewers:
659 return non_reviewers[0]
661 return usable[0]
664def _format_findings_for_verify(findings: list[Finding]) -> str:
665 """Render candidate findings for the chair's verification prompt.
667 Reviewer identity is omitted (#250) so the chair can't favour its own
668 findings while judging them — parity with the #37/#38 anonymization.
669 Verdicts match back by file/line/claim, so dropping it is safe.
670 """
671 if not findings:
672 return "_(no candidate findings)_"
673 lines = []
674 for f in findings:
675 loc = f.file or "?"
676 if f.line is not None:
677 loc = f"{loc}:{f.line}"
678 lines.append(f"- [{f.severity}] {loc} — {f.claim}")
679 return "\n".join(lines)
682def _format_verdicts(verdicts: list[Verdict]) -> str:
683 if not verdicts:
684 return "_(no verification verdicts)_"
685 lines = []
686 for v in verdicts:
687 loc = v.file or "?"
688 if v.line is not None:
689 loc = f"{loc}:{v.line}"
690 lines.append(f"- [{v.status}] {loc} — {v.claim}: {v.reasoning}")
691 return "\n".join(lines)
694def _verify(
695 chair_name, usable, findings, diff, context, budget, retries, log,
696 template=prompts.VERIFY,
697) -> tuple[AgentResult | None, list[Verdict], list[str]]:
698 chair = next((a for a in usable if a.name == chair_name), None)
699 if chair is None:
700 return None, [], []
701 log(f"verification: chair '{chair_name}' judging {len(findings)} candidate findings")
702 prompt = template.format(
703 diff=diff,
704 findings=_format_findings_for_verify(findings),
705 context=context or "_(none)_",
706 notice=prompts._UNTRUSTED_NOTICE,
707 )
708 result = _run_with_retry(chair, prompt, "verify", budget, retries, log)
709 if not result.ok:
710 return result, [], [f"verification failed: {result.error}"]
711 verdicts, warnings = parse_verdicts(result.output, chair_name)
712 return result, verdicts, warnings
715def _verdict_matches_group(verdict: Verdict, group: FindingGroup) -> bool:
716 from .consensus import _normalize_claim, _normalize_path
718 rep = group.representative
719 if _normalize_path(verdict.file) != _normalize_path(rep.file):
720 return False
721 if verdict.line is not None and rep.line is not None and abs(verdict.line - rep.line) > 3:
722 return False
723 v_claim = _normalize_claim(verdict.claim)
724 r_claim = _normalize_claim(rep.claim)
725 if not v_claim or v_claim == r_claim:
726 return True
727 v_tokens, r_tokens = set(v_claim.split()), set(r_claim.split())
728 if not v_tokens or not r_tokens:
729 return False
730 inter = len(v_tokens & r_tokens)
731 union = len(v_tokens | r_tokens)
732 return (inter / union if union else 0.0) >= 0.5
735def _apply_verdicts(groups: list[FindingGroup], verdicts: list[Verdict]) -> None:
736 """Attach verification statuses to consensus groups.
738 unsupported -> bucket 'rejected'; needs_human_decision -> bucket 'disputed';
739 verified -> status recorded, bucket unchanged.
740 """
741 for verdict in verdicts:
742 # Apply to every matching group: when reviewers phrase the same issue
743 # slightly differently it can land in more than one group, and all of
744 # them should carry the verifier's judgement.
745 for group in groups:
746 if group.status:
747 continue
748 if _verdict_matches_group(verdict, group):
749 group.status = verdict.status
750 group.status_reasoning = verdict.reasoning
751 if verdict.status == "unsupported":
752 group.bucket = "rejected"
753 elif verdict.status == "needs_human_decision":
754 group.bucket = "disputed"
757def _synthesize(
758 chair_name,
759 usable,
760 reviews,
761 debate,
762 diff,
763 budget,
764 retries,
765 log,
766 verdicts=None,
767 anonymize_reviews=False,
768 rng=None,
769 template=prompts.SYNTHESIS,
770) -> AgentResult | None:
771 chair = next((a for a in usable if a.name == chair_name), None)
772 if chair is None:
773 return None
774 log(f"synthesis: chair '{chair_name}' consolidating verdict")
775 if anonymize_reviews:
776 # Chair self-preference guardrail (#38): present round-1 reviews to the
777 # chair under anonymous labels (no agent/vendor identity, no stable
778 # order) so it cannot tell which review is "its own". Uses the shared run
779 # RNG for deterministic-but-unstable ordering. ``me=None`` keeps ALL
780 # reviews (we are not excluding a debater here, only stripping identity).
781 peer_rng = random.Random(rng.random()) if rng is not None else random.Random()
782 reviews_txt, _label_map = _anonymize_peers(reviews, None, peer_rng)
783 else:
784 reviews_txt = "\n\n".join(
785 f"### {r.agent} ({r.vendor})\n{r.output}" for r in reviews if r.ok and r.output
786 ) or "_(no reviews)_"
787 debate_txt = "\n\n".join(
788 f"### {r.agent}\n{r.output}" for r in debate if r.ok and r.output
789 ) or "_(no debate round)_"
790 prompt = template.format(
791 diff=diff,
792 reviews=reviews_txt,
793 debate=debate_txt,
794 notice=prompts._UNTRUSTED_NOTICE,
795 )
796 if verdicts:
797 prompt += f"\n\n=== VERIFICATION VERDICTS ===\n{_format_verdicts(verdicts)}\n"
798 return _run_with_retry(chair, prompt, "synthesis", budget, retries, log)
801def _merge_results_by_agent(phase_lists: list[list[AgentResult]]) -> list[AgentResult]:
802 """Merge per-chunk results for the same agent into one result (issue #31).
804 Outputs are concatenated under per-chunk headers, durations summed, ``ok`` is
805 true if the agent succeeded on any chunk, and ``attempts`` keeps the max so a
806 retried chunk is still visible. Agent order follows first appearance.
807 """
808 order: list[str] = []
809 by_agent: dict[str, list[AgentResult]] = {}
810 for lst in phase_lists:
811 for r in lst:
812 if r.agent not in by_agent:
813 by_agent[r.agent] = []
814 order.append(r.agent)
815 by_agent[r.agent].append(r)
817 merged: list[AgentResult] = []
818 for name in order:
819 parts = by_agent[name]
820 ok = any(p.ok for p in parts)
821 body = "\n\n".join(
822 f"#### chunk {i}\n{p.output}"
823 for i, p in enumerate(parts, 1)
824 if p.ok and p.output
825 )
826 first_err = next((p for p in parts if not p.ok), None)
827 merged.append(
828 AgentResult(
829 name,
830 parts[0].vendor,
831 ok,
832 body,
833 round(sum(p.duration_s for p in parts), 3),
834 error=None if ok else (first_err.error if first_err else None),
835 error_code=None if ok else (first_err.error_code if first_err else None),
836 attempts=max(p.attempts for p in parts),
837 )
838 )
839 return merged
842def _combine_chair_results(
843 results: list[AgentResult], chair: str
844) -> AgentResult | None:
845 """Combine per-chunk chair results (verify/synthesis) into one labelled result."""
846 ok_parts = [r for r in results if r.ok and r.output]
847 if not ok_parts:
848 return results[0] if results else None
849 vendor = ok_parts[0].vendor
850 body = "\n\n".join(
851 f"### chunk {i}\n{r.output}" for i, r in enumerate(ok_parts, 1)
852 )
853 return AgentResult(
854 chair, vendor, True, body, round(sum(r.duration_s for r in ok_parts), 3)
855 )
858def _merge_chunk_outcomes(outcomes: list[JuryOutcome]) -> JuryOutcome:
859 """Fold per-chunk outcomes (issue #31) into one renderable JuryOutcome.
861 Findings are unioned and re-grouped across all chunks so the consensus view
862 is global; verdicts are re-applied to the merged groups. Review/debate/chair
863 outputs are merged per agent with chunk labels so the report stays coherent.
864 """
865 if len(outcomes) == 1:
866 return outcomes[0]
867 base = outcomes[0]
869 reviews = _merge_results_by_agent([o.reviews for o in outcomes])
870 debate = (
871 _merge_results_by_agent([o.debate for o in outcomes])
872 if any(o.debate for o in outcomes)
873 else []
874 )
875 findings = [f for o in outcomes for f in o.findings]
876 groups = group_findings(findings, len(reviews))
877 verdicts = [v for o in outcomes for v in o.verdicts]
878 _apply_verdicts(groups, verdicts)
879 warnings = [w for o in outcomes for w in o.warnings]
881 synthesis = _combine_chair_results([o.synthesis for o in outcomes if o.synthesis], base.chair)
882 verify = _combine_chair_results([o.verify for o in outcomes if o.verify], base.chair)
884 return JuryOutcome(
885 reviews=reviews,
886 debate=debate,
887 synthesis=synthesis,
888 chair=base.chair,
889 findings=findings,
890 warnings=warnings,
891 groups=groups,
892 verify=verify,
893 verdicts=verdicts,
894 context_mode=base.context_mode,
895 redact_secrets=base.redact_secrets,
896 redaction_count=sum(o.redaction_count for o in outcomes),
897 injection_hits=[h for o in outcomes for h in o.injection_hits],
898 skipped=base.skipped,
899 budget_exhausted=any(o.budget_exhausted for o in outcomes),
900 rounds_executed=max(o.rounds_executed for o in outcomes),
901 stop_reason=f"chunked review across {len(outcomes)} part(s)",
902 )
905def review_diff(
906 config: JuryConfig,
907 diff: str,
908 *,
909 context: str = "",
910 mock: bool = False,
911 strict: bool = False,
912 seed: int | None = None,
913 policy: ReviewPolicy | None = None,
914 log=lambda _msg: None,
915 on_event=None,
916) -> tuple[JuryOutcome, largediff.DiffPlan]:
917 """Plan a diff (filter + size + mode) then run the jury (issue #31).
919 The single entry point the CLI uses: it measures and filters the diff,
920 reports the size and the selected handling mode, and dispatches:
922 - ``full`` — review the filtered diff in one ``run_jury`` pass;
923 - ``chunked`` — review each chunk and merge the outcomes;
924 - ``too_large`` — raise ``RuntimeError`` with an actionable message.
926 Returns ``(outcome, plan)`` so the caller can surface the plan. Existing
927 callers of :func:`run_jury` are unaffected.
928 """
929 dc = config.diff
930 plan = largediff.plan_diff(
931 diff,
932 max_bytes=dc.max_bytes,
933 chunk=dc.chunk,
934 chunk_max_bytes=dc.chunk_max_bytes,
935 exclude_generated=dc.exclude_generated,
936 exclude=dc.exclude,
937 include=dc.include,
938 )
939 log(
940 f"diff size: {plan.total_bytes} B total, {plan.kept_bytes} B after filters "
941 f"({len(plan.kept)} file(s) kept, {len(plan.excluded)} excluded); "
942 f"mode: {plan.mode}"
943 )
944 if plan.excluded:
945 log("excluded: " + ", ".join(f"{p} [{why}]" for p, why in plan.excluded))
946 log(plan.reason)
948 if plan.mode == largediff.MODE_TOO_LARGE:
949 raise RuntimeError(f"diff too large to review: {plan.reason}")
950 if not plan.chunks:
951 raise RuntimeError(
952 "nothing to review after filters — all files were excluded "
953 "(check [jury.diff] include/exclude patterns)"
954 )
956 # One shared budget across all chunks so ``total_timeout`` bounds the WHOLE
957 # review, not each chunk independently (review finding). ``phase_timeout`` and
958 # per-agent timeouts still apply per call via the same budget.
959 shared_budget = RunBudget(config.total_timeout, config.phase_timeout)
961 # Redact the shared context ONCE here, before fan-out (#249). The same context
962 # is reviewed against every chunk; letting each per-chunk ``run_jury`` redact
963 # it would count its secrets once per chunk and ``_merge_chunk_outcomes`` would
964 # sum them, inflating ``redaction_count`` (e.g. a 1-secret context over 8
965 # chunks reported 8). Pre-redacting makes each chunk's re-redaction a no-op —
966 # the ``[REDACTED:…]`` placeholders no longer match — so we add the one-time
967 # context count back at the end. Diff/chunk redactions are still counted
968 # per chunk and summed, which is correct (each chunk's diff is distinct).
969 # `config.context` is the right path: `_from_dict` flattens the `[jury]`
970 # table onto JuryConfig, so the `[jury.context]` sub-table is `config.context`
971 # (a ContextConfig), NOT `config.jury.context` — there is no `config.jury`.
972 # This mirrors how run_jury() reads it.
973 ctx_cfg = getattr(config, "context", None)
974 ctx_mode = getattr(ctx_cfg, "mode", "diff-only") if ctx_cfg else "diff-only"
975 redact_on = getattr(ctx_cfg, "redact_secrets", True) if ctx_cfg else True
976 context_redactions = 0
977 if redact_on and ctx_mode != "diff-only" and context:
978 context, context_redactions = redact(context)
980 def _run(chunk: str) -> JuryOutcome:
981 return run_jury(
982 config, chunk, context=context, mock=mock, strict=strict,
983 seed=seed, policy=policy, log=log, budget=shared_budget,
984 on_event=on_event,
985 )
987 def _finalize(outcome: JuryOutcome) -> JuryOutcome:
988 # Add the one-time context redaction count (per-chunk runs saw an already-
989 # redacted context and counted 0 for it).
990 if not context_redactions:
991 return outcome
992 return replace(outcome, redaction_count=outcome.redaction_count + context_redactions)
994 if plan.mode == largediff.MODE_FULL:
995 return _finalize(_run(plan.chunks[0])), plan
997 outcomes = []
998 for i, chunk in enumerate(plan.chunks, 1):
999 log(f"reviewing chunk {i}/{len(plan.chunks)}")
1000 outcomes.append(_run(chunk))
1001 return _finalize(_merge_chunk_outcomes(outcomes)), plan