Skip to content

Commit a74ff90

Browse files
authored
feat: wire dormant orchestration 2.0 components into production flow (#669)
Connect five previously-dormant Orch 2.0 components to the execution pipeline: - File Lock Registry: bridge claims file locks before instance creation and releases on all exit paths; uses gate.Release (not Fail) for lock conflicts to avoid burning retries under concurrent scaling - Context Propagation: injects prior discoveries into task prompts and shares completion info for cross-instance awareness - Mailbox Event Publishing: adds WithBus functional option so all inter-instance messages publish MailboxMessageEvent to the event bus - Adaptive Lead Observability: logs scaling signal recommendations in the pipeline executor - Approval Auto-Approve: immediately approves gated tasks in the bridge to prevent stuck states while preserving gate infrastructure - Debate Protocol: identifies conflicting task outcomes between execution and review phases and records structured debate sessions for reviewer context (opt-in via WithDebate pipeline option)
1 parent 842d514 commit a74ff90

File tree

17 files changed

+793
-23
lines changed

17 files changed

+793
-23
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ These are real issues agents have encountered in this codebase. Package-specific
340340
- **Release locks before blocking on Stop()** — When stopping a component that holds a mutex, copy shared state (e.g., a slice of bridges) under the lock, release the lock, then perform blocking cleanup. Holding a lock while calling `bridge.Stop()` (which calls `wg.Wait()`) blocks goroutines that need the same lock. See `PipelineExecutor.Stop()` in `bridgewire/executor.go`.
341341
- **Two-phase event publishing for cascading state changes** — When an event handler (`onTeamCompleted`) modifies state that triggers further events of the same type, use a two-phase approach: (1) collect state changes under the lock, (2) publish events outside the lock. Repeat until no new transitions occur. Publishing `TeamCompletedEvent` from within the `onTeamCompleted` handler would re-enter the handler via the synchronous bus, deadlocking on `m.mu`. See `team.Manager.checkBlockedTeamsLocked`.
342342
- **Semaphore slot lifecycle in bridge** — When the bridge acquires a semaphore slot before `ClaimNext`, it must release on every non-monitor path (claim error, nil task, create/start failure). The monitor goroutine takes ownership of the slot via `defer b.sem.Release()`. Missing a release on any early-return path causes a permanent slot leak that eventually deadlocks the claim loop.
343+
- **Release vs Fail for scheduling conflicts** — When a task fails due to a scheduling conflict (file lock contention), use `gate.Release()` to return it to pending instead of `gate.Fail()`. `Fail` decrements the retry counter; with scaling enabled, multiple tasks competing for the same resource can exhaust all retries and permanently fail. `Release` puts the task back without consuming retries. Always pair Release with `waitForWake` to prevent hot retry loops.
343344

344345
---
345346

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- **Wire Dormant Orchestration 2.0 Components** - Connected five previously-dormant Orch 2.0 components to the production execution flow: (1) **File Lock Registry** in Bridge prevents concurrent file edits by claiming locks before instance creation and releasing on all exit paths, using Release instead of Fail for lock conflicts to avoid burning retries; (2) **Context Propagation** injects prior discoveries into task prompts and shares completion info for cross-instance awareness; (3) **Mailbox Event Publishing** makes all inter-instance messages visible to the event bus via `MailboxMessageEvent` using a `WithBus` functional option; (4) **Adaptive Lead Observability** logs scaling signal recommendations in the pipeline executor; (5) **Approval Auto-Approve** immediately approves gated tasks to prevent stuck states while preserving gate infrastructure for future interactive use; (6) **Debate Protocol Integration** identifies conflicting task outcomes between execution and review phases and records structured debate sessions for reviewer context (opt-in via `WithDebate()`).
13+
1214
- **Orchestration 2.0 Default Execution** - Made Orch 2.0 the default for both UltraPlan and TripleShot. UltraPlan flips `UsePipeline` default to `true`. TripleShot uses `teamwire.TeamCoordinator` with callback-driven execution (replacing file polling), falling back to legacy for adversarial mode or `tripleshot.use_legacy` config. Added `tripleshot.Runner` interface for dual-coordinator coexistence, channel bridge for teamwire callbacks into Bubble Tea, and `NewTripleShotAdapters()` factory to avoid import cycles.
1315

1416
- **Pipeline Execution Path** - Wired the Orchestration 2.0 pipeline stack into `Coordinator.StartExecution()`. Added `ExecutionRunner` interface in `orchestrator` (implemented by `bridgewire.PipelineRunner`) with factory-based injection to avoid import cycles. When `UsePipeline` config is enabled, the Coordinator delegates execution to the pipeline backend instead of the legacy `ExecutionOrchestrator`. Subscribes to `pipeline.completed` events for synthesis/failure handling. Guards legacy-only methods (`RetryFailedTasks`, `RetriggerGroup`, `ResumeWithPartialWork`) when pipeline is active.

internal/bridge/AGENTS.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ The bridge package connects team Hubs (Orchestration 2.0's task pipeline) to rea
1111

1212
**Core Flow:**
1313
```
14-
Gate.ClaimNext() → InstanceFactory.CreateInstance() → StartInstance()
14+
Gate.ClaimNext() → FileLockRegistry.ClaimMultiple() → ContextPropagation
15+
→ InstanceFactory.CreateInstance() → StartInstance()
16+
→ Gate.MarkRunning() → (auto-approve if gated)
1517
→ monitor loop (poll CompletionChecker)
1618
→ Gate.Complete/Fail() + SessionRecorder
19+
→ FileLockRegistry.ReleaseAll() + ContextPropagation.ShareDiscovery()
1720
```
1821

1922
**Interfaces (Ports):**
@@ -43,6 +46,9 @@ These interfaces are implemented by adapters in `internal/orchestrator/bridgewir
4346
- **Retry limit on completion check errors** — The monitor gives up after `maxCheckErrors` (10) consecutive `CheckCompletion` failures and fails the task. Without this, a bad worktree path would cause indefinite retries.
4447
- **TaskQueue retry interacts with bridge claim loop**`TaskQueue.Fail()` has retry logic (`defaultMaxRetries=2`). When the bridge monitor calls `gate.Fail()`, the task may return to `TaskPending` (not permanently failed), and the claim loop re-claims it. Tests that assert on `Running()` after failure must either disable retries via `SetMaxRetries(taskID, 0)` or account for the re-claim cycle.
4548
- **Always log gate.Fail errors**`gate.Fail()` can fail if the task has already transitioned. Always check and log the return error rather than discarding with `_ =`.
49+
- **File lock conflicts use Release, not Fail** — When `ClaimMultiple` returns `ErrAlreadyClaimed`, use `gate.Release` to return the task to pending without burning retries. Using `gate.Fail` would consume retry attempts, and with scaling enabled (semaphore > 1), multiple tasks competing for the same file lock would exhaust retries and permanently fail. After releasing, call `waitForWake` to avoid a hot retry loop.
50+
- **Record completion/failure before file lock release**`recorder.RecordCompletion`/`RecordFailure` must be called immediately after `gate.Complete`/`gate.Fail`, before `reg.ReleaseAll` and `shareCompletion`. The gate transition triggers a synchronous event cascade that can complete the pipeline before the monitor goroutine reaches subsequent lines. If the recorder call comes after file lock I/O, tests (and observers) see the pipeline complete before the recorder fires.
51+
- **Scaling monitor increases semaphore concurrency** — The hub's `ScalingMonitor` reacts to `QueueDepthChangedEvent` and may increase the bridge's semaphore limit via the `OnDecision` callback. Code that assumes semaphore=1 (sequential task execution) is incorrect when scaling is active. File lock claims are the safety net for concurrent access to the same files.
4652

4753
## Testing
4854

internal/bridge/bridge.go

Lines changed: 117 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package bridge
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"strings"
78
"sync"
89
"time"
910

1011
"github.com/Iron-Ham/claudio/internal/event"
12+
"github.com/Iron-Ham/claudio/internal/filelock"
1113
"github.com/Iron-Ham/claudio/internal/logging"
14+
"github.com/Iron-Ham/claudio/internal/mailbox"
1215
"github.com/Iron-Ham/claudio/internal/team"
1316
)
1417

@@ -187,11 +190,45 @@ func (b *Bridge) claimLoop() {
187190
continue
188191
}
189192

190-
// Build a prompt and create an instance.
191-
prompt := BuildTaskPrompt(task.Title, task.Description, task.Files)
193+
hub := b.team.Hub()
194+
195+
// Claim file locks to prevent concurrent edits.
196+
if len(task.Files) > 0 {
197+
if err := hub.FileLockRegistry().ClaimMultiple(task.ID, task.Files); err != nil {
198+
b.sem.Release()
199+
if errors.Is(err, filelock.ErrAlreadyClaimed) {
200+
// File is held by another task — release back to the
201+
// queue without burning a retry. The task will be
202+
// re-claimed once the lock holder finishes.
203+
b.logger.Debug("bridge: file lock conflict, releasing task",
204+
"team", b.team.Spec().ID, "task", task.ID, "error", err)
205+
if relErr := gate.Release(task.ID, "file lock conflict"); relErr != nil {
206+
b.logger.Error("bridge: gate.Release failed",
207+
"task", task.ID, "error", relErr)
208+
}
209+
} else {
210+
b.logger.Error("bridge: file lock claim failed",
211+
"team", b.team.Spec().ID, "task", task.ID, "error", err)
212+
if failErr := gate.Fail(task.ID, fmt.Sprintf("file lock: %v", err)); failErr != nil {
213+
b.logger.Error("bridge: gate.Fail also failed",
214+
"task", task.ID, "error", failErr)
215+
}
216+
}
217+
b.waitForWake(wake)
218+
continue
219+
}
220+
}
221+
222+
// Retrieve prior discoveries for context injection.
223+
prompt := BuildTaskPromptWithContext(
224+
task.Title, task.Description, task.Files,
225+
b.getInstanceContext(task.ID),
226+
)
227+
192228
inst, err := b.factory.CreateInstance(prompt)
193229
if err != nil {
194230
b.sem.Release()
231+
hub.FileLockRegistry().ReleaseAll(task.ID) //nolint:errcheck // best-effort cleanup
195232
b.logger.Error("bridge: failed to create instance",
196233
"team", b.team.Spec().ID, "task", task.ID, "error", err)
197234
if failErr := gate.Fail(task.ID, fmt.Sprintf("create instance: %v", err)); failErr != nil {
@@ -203,6 +240,7 @@ func (b *Bridge) claimLoop() {
203240

204241
if err := b.factory.StartInstance(inst); err != nil {
205242
b.sem.Release()
243+
hub.FileLockRegistry().ReleaseAll(task.ID) //nolint:errcheck // best-effort cleanup
206244
b.logger.Error("bridge: failed to start instance",
207245
"team", b.team.Spec().ID, "task", task.ID, "error", err)
208246
if failErr := gate.Fail(task.ID, fmt.Sprintf("start instance: %v", err)); failErr != nil {
@@ -215,6 +253,7 @@ func (b *Bridge) claimLoop() {
215253
// Transition the task to running.
216254
if err := gate.MarkRunning(task.ID); err != nil {
217255
b.sem.Release()
256+
hub.FileLockRegistry().ReleaseAll(task.ID) //nolint:errcheck // best-effort cleanup
218257
b.logger.Error("bridge: failed to mark running",
219258
"team", b.team.Spec().ID, "task", task.ID, "error", err)
220259
if failErr := gate.Fail(task.ID, fmt.Sprintf("mark running: %v", err)); failErr != nil {
@@ -224,6 +263,23 @@ func (b *Bridge) claimLoop() {
224263
continue
225264
}
226265

266+
// Auto-approve gated tasks to prevent stuck states.
267+
if gate.IsAwaitingApproval(task.ID) {
268+
if approveErr := gate.Approve(task.ID); approveErr != nil {
269+
b.sem.Release()
270+
hub.FileLockRegistry().ReleaseAll(task.ID) //nolint:errcheck // best-effort cleanup
271+
b.logger.Error("bridge: failed to auto-approve gated task",
272+
"team", b.team.Spec().ID, "task", task.ID, "error", approveErr)
273+
if failErr := gate.Fail(task.ID, fmt.Sprintf("auto-approve: %v", approveErr)); failErr != nil {
274+
b.logger.Error("bridge: gate.Fail also failed",
275+
"task", task.ID, "error", failErr)
276+
}
277+
continue
278+
}
279+
b.logger.Debug("bridge: auto-approved gated task",
280+
"team", b.team.Spec().ID, "task", task.ID)
281+
}
282+
227283
// Record assignment and publish event.
228284
b.recorder.AssignTask(task.ID, inst.ID())
229285

@@ -267,6 +323,9 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
267323

268324
consecutiveErrors := 0
269325

326+
hub := b.team.Hub()
327+
reg := hub.FileLockRegistry()
328+
270329
for {
271330
select {
272331
case <-b.ctx.Done():
@@ -275,6 +334,7 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
275334
b.mu.Lock()
276335
delete(b.running, taskID)
277336
b.mu.Unlock()
337+
reg.ReleaseAll(taskID) //nolint:errcheck // best-effort cleanup
278338
return
279339
case <-ticker.C:
280340
}
@@ -288,7 +348,7 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
288348
if consecutiveErrors >= maxCheckErrors {
289349
b.logger.Error("bridge: max check errors reached, failing task",
290350
"task", taskID, "limit", maxCheckErrors)
291-
gate := b.team.Hub().Gate()
351+
gate := hub.Gate()
292352
reason := fmt.Sprintf("completion check failed %d times: %v", maxCheckErrors, err)
293353
if failErr := gate.Fail(taskID, reason); failErr != nil {
294354
b.logger.Error("bridge: gate.Fail failed after check errors",
@@ -300,6 +360,7 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
300360
delete(b.running, taskID)
301361
b.mu.Unlock()
302362
b.recorder.RecordFailure(taskID, reason)
363+
reg.ReleaseAll(taskID) //nolint:errcheck // best-effort cleanup
303364
return
304365
}
305366
continue
@@ -315,7 +376,7 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
315376
taskID, inst.ID(), inst.WorktreePath(), inst.Branch(),
316377
)
317378

318-
gate := b.team.Hub().Gate()
379+
gate := hub.Gate()
319380
teamID := b.team.Spec().ID
320381

321382
// Clean up running map before recording/publishing so observers see
@@ -329,7 +390,15 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
329390
b.logger.Error("bridge: failed to complete task",
330391
"task", taskID, "error", completeErr)
331392
}
393+
// Record completion immediately after gate transition so that
394+
// observers who react to the synchronous event cascade see the
395+
// recorder state before any I/O-heavy cleanup runs.
332396
b.recorder.RecordCompletion(taskID, commitCount)
397+
reg.ReleaseAll(taskID) //nolint:errcheck // best-effort cleanup
398+
399+
// Share completion as a discovery for context propagation.
400+
b.shareCompletion(taskID, inst)
401+
333402
b.bus.Publish(event.NewBridgeTaskCompletedEvent(
334403
teamID, taskID, inst.ID(), true, commitCount, "",
335404
))
@@ -342,7 +411,10 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
342411
b.logger.Error("bridge: failed to fail task",
343412
"task", taskID, "error", failErr)
344413
}
414+
// Record failure immediately after gate transition (same
415+
// reasoning as the success path above).
345416
b.recorder.RecordFailure(taskID, reason)
417+
reg.ReleaseAll(taskID) //nolint:errcheck // best-effort cleanup
346418
b.bus.Publish(event.NewBridgeTaskCompletedEvent(
347419
teamID, taskID, inst.ID(), false, commitCount, reason,
348420
))
@@ -392,3 +464,44 @@ func BuildTaskPrompt(title, description string, files []string) string {
392464

393465
return sb.String()
394466
}
467+
468+
// BuildTaskPromptWithContext builds a task prompt and appends prior discoveries
469+
// from context propagation. If priorContext is empty, it returns the same
470+
// result as BuildTaskPrompt.
471+
func BuildTaskPromptWithContext(title, description string, files []string, priorContext string) string {
472+
prompt := BuildTaskPrompt(title, description, files)
473+
if priorContext == "" {
474+
return prompt
475+
}
476+
return prompt + "\n\n## Prior Discoveries\n" + priorContext
477+
}
478+
479+
// maxContextMessages limits the number of prior messages injected into an
480+
// instance's prompt to prevent unbounded context growth in large sessions.
481+
const maxContextMessages = 50
482+
483+
// getInstanceContext retrieves prior discoveries from the context propagator.
484+
// Returns an empty string if no relevant context exists or on error.
485+
func (b *Bridge) getInstanceContext(taskID string) string {
486+
ctx, err := b.team.Hub().Propagator().GetContextForInstance(taskID, mailbox.FilterOptions{
487+
Types: []mailbox.MessageType{mailbox.MessageDiscovery, mailbox.MessageWarning},
488+
MaxMessages: maxContextMessages,
489+
})
490+
if err != nil {
491+
b.logger.Warn("bridge: failed to get instance context",
492+
"task", taskID, "error", err)
493+
return ""
494+
}
495+
return ctx
496+
}
497+
498+
// shareCompletion broadcasts a completion discovery so future instances have
499+
// awareness of what has been done. Only called on success paths.
500+
func (b *Bridge) shareCompletion(taskID string, inst Instance) {
501+
body := fmt.Sprintf("Task completed: %s (instance: %s, worktree: %s)",
502+
taskID, inst.ID(), inst.WorktreePath())
503+
if err := b.team.Hub().Propagator().ShareDiscovery(taskID, body, nil); err != nil {
504+
b.logger.Warn("bridge: failed to share completion discovery",
505+
"task", taskID, "error", err)
506+
}
507+
}

internal/coordination/hub.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func NewHub(cfg Config, opts ...Option) (*Hub, error) {
9393
policy = scaling.NewPolicy(policyOpts...)
9494
}
9595

96-
mb := mailbox.NewMailbox(cfg.SessionDir)
96+
mb := mailbox.NewMailbox(cfg.SessionDir, mailbox.WithBus(cfg.Bus))
9797
queue := taskqueue.NewFromPlan(cfg.Plan)
9898
eq := taskqueue.NewEventQueue(queue, cfg.Bus)
9999
gate := approval.NewGate(eq, cfg.Bus, lookup)

internal/mailbox/AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ See `doc.go` for package overview and API usage.
1010
- **O_APPEND atomicity** — File writes use `O_APPEND` which is atomic for writes smaller than `PIPE_BUF` (4096 bytes on most systems), but is not crash-safe without `fsync`. This is an accepted trade-off — messages may be lost on hard crash but won't be corrupted or interleaved.
1111
- **Message ID uniqueness**`time.UnixNano()` alone is not unique under concurrent access. IDs are generated using an atomic counter combined with PID and timestamp. If you modify ID generation, ensure uniqueness under parallel `Send()` calls.
1212
- **Store mutex scope** — The `Store` holds a `sync.Mutex` for in-process thread safety. Any method that reads or writes the JSONL file must hold the lock for the entire operation, including the JSON marshal/unmarshal step — not just the file I/O.
13+
- **WithBus event publishing is synchronous** — When a `Mailbox` is created with `WithBus(bus)`, every successful `Send()` publishes a `MailboxMessageEvent` on the event bus synchronously. Since `event.Bus.Publish` runs handlers inline, callers of `Send` should be aware that handlers may execute significant work in their goroutine. The Hub passes its bus to `NewMailbox` automatically.
1314

1415
## File Layout
1516

0 commit comments

Comments
 (0)