Skip to content

Commit 34b7850

Browse files
authored
fix: resolve tripleshot judge race that misreports attempt as failed (#663)
The bridge's gate.Complete() triggers TeamCompletedEvent (which dispatches startJudge as a goroutine) before publishing BridgeTaskCompletedEvent (which sets Attempts[i].Status). The startJudge goroutine wins the race and snapshots the last attempt as "working", causing the judge to always report one attempt failed. Fix: eagerly set attempt status in onTeamCompleted under the lock, before dispatching the goroutine. Also addressed pre-existing issues: - Log discarded tmManager.Stop() error instead of _ = - Extract abortStart() to stop already-started bridges on partial failure - Upgrade SetMaxRetries failure log level from Warn to Error - Skip redundant status/timestamp overwrite in onBridgeTaskCompleted - Guard unknown team IDs in bridge event handlers
1 parent 8979c19 commit 34b7850

File tree

4 files changed

+181
-18
lines changed

4 files changed

+181
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919

2020
### Fixed
2121

22+
- **Triple-Shot Judge Misreports Failed Attempt** - Fixed race condition where the judge always reported one attempt as failed despite all three succeeding. The bridge's `gate.Complete()` triggers `TeamCompletedEvent` (which dispatches `startJudge` as a goroutine) before publishing `BridgeTaskCompletedEvent` (which sets `Attempts[i].Status`). The `startJudge` goroutine would snapshot the last attempt's status as "working" before `onBridgeTaskCompleted` could set it to "completed". Fixed by eagerly setting attempt status in `onTeamCompleted` under the lock, before dispatching the goroutine. Also fixed: discarded `Stop()` error now logged, `SetMaxRetries` failures upgraded from Warn to Error, `onBridgeTaskCompleted` skips redundant status overwrites, and unknown team IDs are guarded in event handlers.
23+
2224
- **Triple-Shot Spurious Second Pass** - Fixed duplicate instance creation in triple-shot workflows. Two root causes: (1) TaskQueue's `defaultMaxRetries=2` caused failed attempt/judge tasks to retry, spawning new instances. Fixed by calling `SetMaxRetries(taskID, 0)` after team creation. (2) Judge "merge" recommendations caused `json.Unmarshal` to fail when the LLM wrote `suggested_changes` as a string instead of `[]string`. The evaluation file parse failure triggered a retry, creating a second judge. Fixed by adding `FlexibleStringSlice` type (mirrors existing `FlexibleString`) to tolerate string/array mismatches in all LLM-parsed sentinel file structs (`Evaluation`, `AttemptEvaluationItem`, `AdversarialReviewFile`).
2325

2426
- **Teamwire TUI Freeze** - Fixed TUI freeze when starting a triple-shot in teamwire mode. `coordinator.Start()` was called synchronously in the Bubble Tea `Update()` handler, blocking the event loop while bridges created git worktrees. Moved startup to an async `tea.Cmd` so the UI remains responsive during initialization.

internal/orchestrator/workflows/tripleshot/teamwire/AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ TeamCoordinator
3232
- **Two-phase Start**`Start()` must not hold `tc.mu` when calling `Bridge.Start()`. The bridge's claim loop publishes `BridgeTaskStartedEvent` synchronously, and the handler `onBridgeTaskStarted` acquires `tc.mu`. Holding the lock through `Start()` → bridge claim → event publish → handler → lock = deadlock. The fix: `registerStart()` holds/releases the lock, then `Start()` creates bridges outside it.
3333
- **Event subscription timing** — Subscriptions must happen before `Bridge.Start()` launches the claim loop. Currently done in `registerStart()` (Phase 1, under lock, before Phase 2 bridge creation) — this is the safe window. Don't move subscriptions after Phase 2 begins. For test assertions where you need events, subscribe before calling `Start()`. For production callbacks, use `SetCallbacks` before `Start`.
3434
- **`onTeamCompleted` dispatches to goroutine** — The handler for `team.completed` dispatches `startJudge()` via `go` to avoid deadlock. The synchronous event bus would block if `startJudge` tried to publish events while the bus's `Publish` goroutine holds a lock.
35+
- **`onTeamCompleted` must set attempt status eagerly** — The bridge calls `gate.Complete(taskID)` before publishing `BridgeTaskCompletedEvent`. `gate.Complete` triggers `TeamCompletedEvent` synchronously (via the team detecting all tasks done), which dispatches `go startJudge()`. `BridgeTaskCompletedEvent` fires next and runs `onBridgeTaskCompleted` (which sets `Attempts[i].Status`). Since `startJudge` runs in a separate goroutine, it races with `onBridgeTaskCompleted` for the last-completing attempt. To prevent `startJudge` from snapshotting a stale "working" status, `onTeamCompleted` sets `Attempts[i].Status` itself under the lock, before dispatching the goroutine.
3536
- **Retries disabled for tripleshot tasks**`registerStart()` and `startJudge()` call `SetMaxRetries(taskID, 0)` to disable TaskQueue's default retry logic (`defaultMaxRetries=2`). Without this, failed attempt/judge tasks would return to Pending and spawn duplicate instances, appearing as a spurious "second pass." The triple-shot workflow has its own redundancy (3 independent attempts), so retrying individual tasks is counterproductive.
3637
- **Every `onJudgeCompleted` failure path must publish `TripleShotJudgeCompletedEvent`** — Use the `failJudge()` helper, which sets session error, transitions to `PhaseFailed`, fires callbacks, and publishes the event. Forgetting the event on one path breaks downstream listeners.
3738
- **Session mutation lock discipline**`tsManager.Session()` returns a raw `*Session` pointer; the `tsManager.mu` RLock only protects the pointer swap, not field access. All session field reads *and* mutations (`JudgeID`, `CompletedAt`, `Error`, `Attempts[i].*`) must hold `tc.mu`. `GetWinningBranch()` also holds `tc.mu` for reads. The lock order `tc.mu → tsManager.mu` is safe (no reverse path exists). Functions like `failJudge` and `startJudge` error paths acquire `tc.mu` for mutations, then release before `notifyCallbacks`/`bus.Publish` to avoid deadlock.

internal/orchestrator/workflows/tripleshot/teamwire/teamcoordinator.go

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -154,22 +154,14 @@ func (tc *TeamCoordinator) Start(ctx context.Context) error {
154154
teamID := tc.attemptTeamIDs[i]
155155
t := mgr.Team(teamID)
156156
if t == nil { // Coverage: defensive — Team() returns nil if concurrent Stop() clears state.
157-
tc.mu.Lock()
158-
tc.unsubscribeEvents()
159-
tc.cancel()
160-
tc.started = false
161-
tc.mu.Unlock()
157+
tc.abortStart()
162158
return fmt.Errorf("teamcoordinator: team %q not found after AddTeam", teamID)
163159
}
164160

165161
recorder := tc.buildAttemptRecorder(i)
166162
b := bridge.New(t, factory, checker, recorder, tc.bus, tc.bridgeOpts...)
167163
if err := b.Start(tc.ctx); err != nil { // Coverage: Bridge.Start only fails if already started.
168-
tc.mu.Lock()
169-
tc.unsubscribeEvents()
170-
tc.cancel()
171-
tc.started = false
172-
tc.mu.Unlock()
164+
tc.abortStart()
173165
return fmt.Errorf("teamcoordinator: start bridge for %q: %w", teamID, err)
174166
}
175167

@@ -255,7 +247,7 @@ func (tc *TeamCoordinator) registerStart(ctx context.Context) (*team.Manager, er
255247
if t != nil {
256248
taskID := fmt.Sprintf("attempt-%d-task", i)
257249
if err := t.Hub().TaskQueue().SetMaxRetries(taskID, 0); err != nil {
258-
tc.logger.Warn("failed to disable retries for attempt task",
250+
tc.logger.Error("failed to disable retries for attempt task",
259251
"task_id", taskID, "error", err)
260252
}
261253
}
@@ -315,7 +307,26 @@ func (tc *TeamCoordinator) Stop() {
315307

316308
// Stop the team manager.
317309
if tc.tmManager != nil {
318-
_ = tc.tmManager.Stop()
310+
if err := tc.tmManager.Stop(); err != nil {
311+
tc.logger.Warn("failed to stop team manager", "error", err)
312+
}
313+
}
314+
}
315+
316+
// abortStart cleans up partial state when the bridge-creation loop in Start()
317+
// encounters an error. It stops already-started bridges, unsubscribes events,
318+
// cancels the context, and marks the coordinator as not started.
319+
func (tc *TeamCoordinator) abortStart() {
320+
tc.mu.Lock()
321+
bridges := make([]*bridge.Bridge, len(tc.bridges))
322+
copy(bridges, tc.bridges)
323+
tc.bridges = nil
324+
tc.unsubscribeEvents()
325+
tc.cancel()
326+
tc.started = false
327+
tc.mu.Unlock()
328+
for _, b := range bridges {
329+
b.Stop()
319330
}
320331
}
321332

@@ -393,6 +404,21 @@ func (tc *TeamCoordinator) onTeamCompleted(tce event.TeamCompletedEvent) {
393404
}
394405
completed := tc.completedAttempts
395406

407+
// Set attempt status now, while we still hold the lock. The bridge
408+
// publishes BridgeTaskCompletedEvent (which also sets this status) AFTER
409+
// gate.Complete returns — but gate.Complete is what triggers this
410+
// TeamCompletedEvent. If we wait for onBridgeTaskCompleted, startJudge
411+
// (dispatched below as a goroutine) races with it and may snapshot the
412+
// status as "working" instead of "completed".
413+
session := tc.tsManager.Session()
414+
now := time.Now()
415+
session.Attempts[attemptIndex].CompletedAt = &now
416+
if tce.Success {
417+
session.Attempts[attemptIndex].Status = ts.AttemptStatusCompleted
418+
} else {
419+
session.Attempts[attemptIndex].Status = ts.AttemptStatusFailed
420+
}
421+
396422
tc.logger.Info("attempt team completed",
397423
"team_id", tce.TeamID,
398424
"attempt_index", attemptIndex,
@@ -448,7 +474,11 @@ func (tc *TeamCoordinator) onBridgeTaskStarted(bse event.BridgeTaskStartedEvent)
448474
}
449475
}
450476

451-
// Must be the judge team.
477+
if bse.TeamID != "judge" {
478+
tc.logger.Warn("unexpected team ID in bridge.task_started", "team_id", bse.TeamID)
479+
tc.mu.Unlock()
480+
return
481+
}
452482
tc.mu.Unlock()
453483
tc.notifyCallbacks(func(cb *ts.CoordinatorCallbacks) {
454484
if cb.OnJudgeStart != nil {
@@ -467,20 +497,30 @@ func (tc *TeamCoordinator) onBridgeTaskCompleted(bce event.BridgeTaskCompletedEv
467497

468498
for i, id := range tc.attemptTeamIDs {
469499
if bce.TeamID == id {
500+
// Status and CompletedAt may already be set by onTeamCompleted
501+
// (which fires before this handler for the last-completing attempt).
502+
// Only update if not already terminal to avoid overwriting the
503+
// earlier, more accurate CompletedAt timestamp.
470504
session := tc.tsManager.Session()
471-
now := time.Now()
472-
session.Attempts[i].CompletedAt = &now
505+
status := session.Attempts[i].Status
506+
if status != ts.AttemptStatusCompleted && status != ts.AttemptStatusFailed {
507+
now := time.Now()
508+
session.Attempts[i].CompletedAt = &now
509+
if bce.Success {
510+
session.Attempts[i].Status = ts.AttemptStatusCompleted
511+
} else {
512+
session.Attempts[i].Status = ts.AttemptStatusFailed
513+
}
514+
}
473515

474516
if bce.Success {
475-
session.Attempts[i].Status = ts.AttemptStatusCompleted
476517
tc.mu.Unlock()
477518
tc.notifyCallbacks(func(cb *ts.CoordinatorCallbacks) {
478519
if cb.OnAttemptComplete != nil {
479520
cb.OnAttemptComplete(i)
480521
}
481522
})
482523
} else {
483-
session.Attempts[i].Status = ts.AttemptStatusFailed
484524
tc.mu.Unlock()
485525
tc.notifyCallbacks(func(cb *ts.CoordinatorCallbacks) {
486526
if cb.OnAttemptFailed != nil {
@@ -493,6 +533,11 @@ func (tc *TeamCoordinator) onBridgeTaskCompleted(bce event.BridgeTaskCompletedEv
493533
}
494534

495535
// Judge team completion.
536+
if bce.TeamID != "judge" {
537+
tc.logger.Warn("unexpected team ID in bridge.task_completed", "team_id", bce.TeamID)
538+
tc.mu.Unlock()
539+
return
540+
}
496541
tc.mu.Unlock()
497542
tc.onJudgeCompleted(bce)
498543
}
@@ -665,7 +710,7 @@ func (tc *TeamCoordinator) startJudge() {
665710

666711
// Disable retries for the judge task — same rationale as attempt tasks.
667712
if err := judgeTeam.Hub().TaskQueue().SetMaxRetries("judge-task", 0); err != nil {
668-
tc.logger.Warn("failed to disable retries for judge task", "error", err)
713+
tc.logger.Error("failed to disable retries for judge task", "error", err)
669714
}
670715

671716
factory := newAttemptFactory(tc.orch, tc.session)

internal/orchestrator/workflows/tripleshot/teamwire/teamcoordinator_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,121 @@ func TestTeamCoordinator_OnTeamCompleted_UnknownTeam(t *testing.T) {
971971
}
972972
}
973973

974+
// TestTeamCoordinator_OnTeamCompleted_SetsAttemptStatus verifies that
975+
// onTeamCompleted sets the attempt status eagerly. Without this, startJudge
976+
// (dispatched as a goroutine from onTeamCompleted) races with
977+
// onBridgeTaskCompleted and may snapshot the last attempt as "working".
978+
func TestTeamCoordinator_OnTeamCompleted_SetsAttemptStatus(t *testing.T) {
979+
setup := func(t *testing.T) *TeamCoordinator {
980+
t.Helper()
981+
bus := event.NewBus()
982+
orch := newTestOrch(t)
983+
session := newTestSession()
984+
985+
tc, _ := NewTeamCoordinator(TeamCoordinatorConfig{
986+
Orchestrator: orch,
987+
BaseSession: session,
988+
Bus: bus,
989+
BaseDir: t.TempDir(),
990+
Task: "test task",
991+
})
992+
993+
tc.mu.Lock()
994+
tc.started = true
995+
tc.attemptTeamIDs = [3]string{"attempt-0", "attempt-1", "attempt-2"}
996+
tc.mu.Unlock()
997+
return tc
998+
}
999+
1000+
t.Run("success", func(t *testing.T) {
1001+
tc := setup(t)
1002+
1003+
tce := event.NewTeamCompletedEvent("attempt-1", "Attempt 2", true, 1, 0)
1004+
tc.onTeamCompleted(tce)
1005+
1006+
// The status should be set immediately by onTeamCompleted, without
1007+
// needing onBridgeTaskCompleted to fire.
1008+
s := tc.Session()
1009+
if s.Attempts[1].Status != ts.AttemptStatusCompleted {
1010+
t.Errorf("Attempts[1].Status = %q, want %q", s.Attempts[1].Status, ts.AttemptStatusCompleted)
1011+
}
1012+
if s.Attempts[1].CompletedAt == nil {
1013+
t.Error("Attempts[1].CompletedAt is nil, want non-nil")
1014+
}
1015+
// Unrelated attempts should be unaffected.
1016+
if s.Attempts[0].Status != "" {
1017+
t.Errorf("Attempts[0].Status = %q, want empty", s.Attempts[0].Status)
1018+
}
1019+
})
1020+
1021+
t.Run("failure", func(t *testing.T) {
1022+
tc := setup(t)
1023+
1024+
tce := event.NewTeamCompletedEvent("attempt-2", "Attempt 3", false, 0, 1)
1025+
tc.onTeamCompleted(tce)
1026+
1027+
s := tc.Session()
1028+
if s.Attempts[2].Status != ts.AttemptStatusFailed {
1029+
t.Errorf("Attempts[2].Status = %q, want %q", s.Attempts[2].Status, ts.AttemptStatusFailed)
1030+
}
1031+
if s.Attempts[2].CompletedAt == nil {
1032+
t.Error("Attempts[2].CompletedAt is nil, want non-nil")
1033+
}
1034+
})
1035+
}
1036+
1037+
// TestTeamCoordinator_OnBridgeTaskCompleted_SkipsTerminalStatus verifies that
1038+
// onBridgeTaskCompleted does not overwrite status/CompletedAt when the attempt
1039+
// is already in a terminal state (set earlier by onTeamCompleted).
1040+
func TestTeamCoordinator_OnBridgeTaskCompleted_SkipsTerminalStatus(t *testing.T) {
1041+
bus := event.NewBus()
1042+
orch := newTestOrch(t)
1043+
session := newTestSession()
1044+
1045+
tc, _ := NewTeamCoordinator(TeamCoordinatorConfig{
1046+
Orchestrator: orch,
1047+
BaseSession: session,
1048+
Bus: bus,
1049+
BaseDir: t.TempDir(),
1050+
Task: "test task",
1051+
})
1052+
1053+
completedCh := make(chan int, 1)
1054+
tc.SetCallbacks(&ts.CoordinatorCallbacks{
1055+
OnAttemptComplete: func(idx int) { completedCh <- idx },
1056+
})
1057+
1058+
tc.mu.Lock()
1059+
tc.started = true
1060+
tc.attemptTeamIDs = [3]string{"attempt-0", "attempt-1", "attempt-2"}
1061+
tc.mu.Unlock()
1062+
1063+
// Pre-set attempt-1 as completed (simulates onTeamCompleted having run first).
1064+
s := tc.Session()
1065+
earlyTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
1066+
s.Attempts[1].Status = ts.AttemptStatusCompleted
1067+
s.Attempts[1].CompletedAt = &earlyTime
1068+
1069+
// Fire onBridgeTaskCompleted — should NOT overwrite the earlier timestamp.
1070+
bce := event.NewBridgeTaskCompletedEvent("attempt-1", "task-1", "inst-1", true, 1, "")
1071+
tc.onBridgeTaskCompleted(bce)
1072+
1073+
// Callback should still fire.
1074+
select {
1075+
case idx := <-completedCh:
1076+
if idx != 1 {
1077+
t.Errorf("callback index = %d, want 1", idx)
1078+
}
1079+
case <-time.After(2 * time.Second):
1080+
t.Fatal("timed out waiting for callback")
1081+
}
1082+
1083+
// CompletedAt should retain the earlier timestamp.
1084+
if !s.Attempts[1].CompletedAt.Equal(earlyTime) {
1085+
t.Errorf("CompletedAt was overwritten: got %v, want %v", s.Attempts[1].CompletedAt, earlyTime)
1086+
}
1087+
}
1088+
9741089
// startInfo captures an attempt start event for test assertions.
9751090
type startInfo struct {
9761091
idx int

0 commit comments

Comments
 (0)