Skip to content

Commit 6911b9e

Browse files
committed
feat: make orchestration 2.0 the default execution path
Make Orch 2.0 the default for both UltraPlan and TripleShot sessions: - UltraPlan: flip UsePipeline default to true, routing execution through the pipeline backend instead of legacy ExecutionOrchestrator - TripleShot: wire teamwire.TeamCoordinator as default, replacing file polling with callback-driven execution via buffered channel bridge - Adversarial mode and session restoration stay on legacy path Add tripleshot.Runner interface for dual-coordinator coexistence, NewTripleShotAdapters() factory to break import cycles, and pipeline_wire.go wiring files for lazy PipelineRunner creation. Fix channel safety issues: nil-guard close to prevent double-close panic, stop re-subscribing after completion to prevent goroutine leak, close old channel on overwrite. Log SaveSession() errors instead of discarding. Improve pipeline factory error messages for users.
1 parent 65216a4 commit 6911b9e

25 files changed

+1562
-119
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- **Orchestration 2.0 Default Execution** - Made Orch 2.0 the default for both UltraPlan and TripleShot. UltraPlan flips `UsePipeline` default to `true`. TripleShot uses `teamwire.TeamCoordinator` with callback-driven execution (replacing file polling), falling back to legacy for adversarial mode or `tripleshot.use_legacy` config. Added `tripleshot.Runner` interface for dual-coordinator coexistence, channel bridge for teamwire callbacks into Bubble Tea, and `NewTripleShotAdapters()` factory to avoid import cycles.
13+
14+
- **Pipeline Execution Path** - Wired the Orchestration 2.0 pipeline stack into `Coordinator.StartExecution()`. Added `ExecutionRunner` interface in `orchestrator` (implemented by `bridgewire.PipelineRunner`) with factory-based injection to avoid import cycles. When `UsePipeline` config is enabled, the Coordinator delegates execution to the pipeline backend instead of the legacy `ExecutionOrchestrator`. Subscribes to `pipeline.completed` events for synthesis/failure handling. Guards legacy-only methods (`RetryFailedTasks`, `RetriggerGroup`, `ResumeWithPartialWork`) when pipeline is active.
15+
1216
- **Dynamic Team Sizing** - Connected the scaling feedback loop so bridges adjust concurrency based on workload and budget. Added `dynamicSemaphore` to the bridge for context-aware, resizable concurrency limiting. Added `SetMaxConcurrency`/`MaxConcurrency`/`ActiveInstances` to Bridge. Added `TeamScaledEvent` for TUI observability. Added `MinInstances`/`MaxInstances` to team Spec and `WithMinInstances`/`WithMaxInstances` coordination options. Decompose now populates scaling bounds from `DecomposeConfig`. `PipelineExecutor.wireScalingFeedback` connects scaling monitor decisions to bridge concurrency with budget-aware veto. (#646)
1317

1418
- **Remaining E2E Test Scenarios** - Added 5 new E2E integration tests for the pipeline executor: dependency ordering (linear chain), dependency failure cascade, partial failure (mixed success/failure), budget exhaustion event, and context cancellation. Added `selectiveFactory` mock for per-task failure control and `waitForTeamPhase` helper. (#649)
1519

1620
### Fixed
1721

22+
- **Teamwire Channel Safety** - Fixed potential panic from closing `teamwireEventCh` while callbacks may still write to it (nil-guard before close), goroutine leak from re-subscribing after triple-shot completion, and channel overwrite leak when starting multiple sessions. Surfaced session error details in `PhaseFailed` handler instead of generic "Triple-shot failed" message.
23+
24+
- **Pipeline SaveSession Error Logging** - Replaced silent `_ = SaveSession()` calls in `onPipelineCompleted` and `Cancel()` with logged errors, preventing session state loss from going undetected. Improved pipeline factory error message for user-facing context.
25+
1826
- **Failed-Dependency Cascade** - Teams blocked on a failed dependency now transition to `PhaseFailed` instead of staying blocked forever. The `Manager.onTeamCompleted` handler detects permanently blocked teams and cascades failure through multi-hop dependency chains (A fails → B fails → C fails). Uses a two-phase approach (collect state under lock, publish events outside) to avoid re-entrancy deadlock with the synchronous event bus. (#649)
1927

2028
### Changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package session
2+
3+
import (
4+
"github.com/Iron-Ham/claudio/internal/logging"
5+
"github.com/Iron-Ham/claudio/internal/orchestrator"
6+
"github.com/Iron-Ham/claudio/internal/orchestrator/bridgewire"
7+
)
8+
9+
// registerPipelineFactory registers a PipelineRunnerFactory on the given
10+
// Coordinator so that the Pipeline-based execution backend can be created
11+
// lazily when UsePipeline is enabled. See tui/pipeline_wire.go for the
12+
// equivalent TUI-side registration.
13+
func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) {
14+
coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) {
15+
recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{})
16+
return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{
17+
Orch: deps.Orch,
18+
Session: deps.Session,
19+
Verifier: deps.Verifier,
20+
Plan: deps.Plan,
21+
Bus: orch.EventBus(),
22+
Logger: logger,
23+
Recorder: recorder,
24+
MaxParallel: deps.MaxParallel,
25+
})
26+
})
27+
}

internal/cmd/session/start.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,9 @@ func resumeUltraplanSession(orch *orchestrator.Orchestrator, sess *orchestrator.
511511
// Create coordinator from the loaded session state
512512
coordinator := orchestrator.NewCoordinator(orch, sess, ultraSession, logger)
513513

514+
// Register pipeline factory for lazy PipelineRunner creation
515+
registerPipelineFactory(coordinator, orch, logger)
516+
514517
// Resume based on current phase
515518
switch ultraSession.Phase {
516519
case orchestrator.PhasePlanning:

internal/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ type TripleshotConfig struct {
226226
AutoApprove bool `mapstructure:"auto_approve"`
227227
// Adversarial enables adversarial review mode where each implementer must pass review before completion (default: false)
228228
Adversarial bool `mapstructure:"adversarial"`
229+
// UseLegacy forces the legacy polling-based coordinator instead of team-based Orch 2.0 (default: false)
230+
UseLegacy bool `mapstructure:"use_legacy"`
229231
}
230232

231233
// AdversarialConfig controls adversarial review mode behavior.
@@ -471,6 +473,7 @@ func Default() *Config {
471473
Tripleshot: TripleshotConfig{
472474
AutoApprove: false,
473475
Adversarial: false,
476+
UseLegacy: false,
474477
},
475478
Adversarial: AdversarialConfig{
476479
MaxIterations: 10, // Reasonable default to prevent infinite loops
@@ -598,6 +601,7 @@ func SetDefaults() {
598601
// Tripleshot defaults
599602
viper.SetDefault("tripleshot.auto_approve", defaults.Tripleshot.AutoApprove)
600603
viper.SetDefault("tripleshot.adversarial", defaults.Tripleshot.Adversarial)
604+
viper.SetDefault("tripleshot.use_legacy", defaults.Tripleshot.UseLegacy)
601605

602606
// Adversarial defaults
603607
viper.SetDefault("adversarial.max_iterations", defaults.Adversarial.MaxIterations)
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package bridgewire
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/Iron-Ham/claudio/internal/bridge"
8+
"github.com/Iron-Ham/claudio/internal/event"
9+
"github.com/Iron-Ham/claudio/internal/logging"
10+
"github.com/Iron-Ham/claudio/internal/orchestrator"
11+
"github.com/Iron-Ham/claudio/internal/pipeline"
12+
"github.com/Iron-Ham/claudio/internal/ultraplan"
13+
)
14+
15+
// PipelineRunnerConfig holds the dependencies for constructing a PipelineRunner.
16+
type PipelineRunnerConfig struct {
17+
Orch *orchestrator.Orchestrator
18+
Session *orchestrator.Session
19+
Verifier orchestrator.Verifier
20+
Plan *orchestrator.PlanSpec // orchestrator's PlanSpec (converted internally)
21+
Bus *event.Bus
22+
Logger *logging.Logger
23+
Recorder bridge.SessionRecorder
24+
BaseDir string // Base directory for pipeline state files (defaults to Session.BaseRepo)
25+
26+
MaxParallel int // from UltraPlanConfig.MaxParallel
27+
}
28+
29+
// PipelineRunner implements orchestrator.ExecutionRunner using the
30+
// Pipeline-based execution backend (Orchestration 2.0).
31+
//
32+
// It encapsulates the full lifecycle: plan conversion, pipeline creation,
33+
// decomposition, and PipelineExecutor wiring.
34+
type PipelineRunner struct {
35+
pipe *pipeline.Pipeline
36+
exec *PipelineExecutor
37+
}
38+
39+
// NewPipelineRunner creates a PipelineRunner from the given config.
40+
// It converts the orchestrator PlanSpec to ultraplan.PlanSpec, creates the
41+
// Pipeline, decomposes the plan into teams, and wires a PipelineExecutor.
42+
func NewPipelineRunner(cfg PipelineRunnerConfig) (*PipelineRunner, error) {
43+
if cfg.Plan == nil {
44+
return nil, fmt.Errorf("bridgewire: PipelineRunner requires a non-nil Plan")
45+
}
46+
if cfg.Bus == nil {
47+
return nil, fmt.Errorf("bridgewire: PipelineRunner requires a non-nil Bus")
48+
}
49+
if cfg.Orch == nil {
50+
return nil, fmt.Errorf("bridgewire: PipelineRunner requires a non-nil Orch")
51+
}
52+
if cfg.Session == nil {
53+
return nil, fmt.Errorf("bridgewire: PipelineRunner requires a non-nil Session")
54+
}
55+
56+
// Convert orchestrator.PlanSpec → ultraplan.PlanSpec
57+
uplan := convertPlan(cfg.Plan)
58+
59+
// Use explicit BaseDir if provided, otherwise fall back to Session.BaseRepo
60+
baseDir := cfg.BaseDir
61+
if baseDir == "" {
62+
baseDir = cfg.Session.BaseRepo
63+
}
64+
65+
// Create the pipeline
66+
pipe, err := pipeline.NewPipeline(pipeline.PipelineConfig{
67+
Bus: cfg.Bus,
68+
BaseDir: baseDir,
69+
Plan: uplan,
70+
})
71+
if err != nil {
72+
return nil, fmt.Errorf("bridgewire: create pipeline: %w", err)
73+
}
74+
75+
// Decompose the plan into execution teams only (no planning/review/consolidation
76+
// phases — those are handled by the Coordinator's existing methods).
77+
maxPar := cfg.MaxParallel
78+
if maxPar < 1 {
79+
maxPar = 3
80+
}
81+
82+
_, err = pipe.Decompose(pipeline.DecomposeConfig{
83+
DefaultTeamSize: maxPar,
84+
MinTeamInstances: 1,
85+
MaxTeamInstances: maxPar,
86+
})
87+
if err != nil {
88+
return nil, fmt.Errorf("bridgewire: decompose plan: %w", err)
89+
}
90+
91+
// Create the PipelineExecutor
92+
logger := cfg.Logger
93+
if logger == nil {
94+
logger = logging.NopLogger()
95+
}
96+
97+
exec, err := NewPipelineExecutorFromOrch(
98+
cfg.Orch, cfg.Session, cfg.Verifier,
99+
cfg.Bus, pipe, cfg.Recorder, logger,
100+
)
101+
if err != nil {
102+
return nil, fmt.Errorf("bridgewire: create executor: %w", err)
103+
}
104+
105+
return &PipelineRunner{
106+
pipe: pipe,
107+
exec: exec,
108+
}, nil
109+
}
110+
111+
// Start begins execution: starts the executor (which subscribes to pipeline
112+
// phase events and creates bridges) then starts the pipeline itself.
113+
func (r *PipelineRunner) Start(ctx context.Context) error {
114+
if err := r.exec.Start(ctx); err != nil {
115+
return fmt.Errorf("bridgewire: start executor: %w", err)
116+
}
117+
if err := r.pipe.Start(ctx); err != nil {
118+
r.exec.Stop()
119+
return fmt.Errorf("bridgewire: start pipeline: %w", err)
120+
}
121+
return nil
122+
}
123+
124+
// Stop tears down both the executor and pipeline.
125+
func (r *PipelineRunner) Stop() {
126+
r.exec.Stop()
127+
_ = r.pipe.Stop()
128+
}
129+
130+
// convertPlan converts an orchestrator.PlanSpec to an ultraplan.PlanSpec.
131+
// The two types have identical shapes (by design) so this is a field-by-field copy.
132+
func convertPlan(src *orchestrator.PlanSpec) *ultraplan.PlanSpec {
133+
tasks := make([]ultraplan.PlannedTask, len(src.Tasks))
134+
for i, t := range src.Tasks {
135+
var files []string
136+
if len(t.Files) > 0 {
137+
files = make([]string, len(t.Files))
138+
copy(files, t.Files)
139+
}
140+
var deps []string
141+
if len(t.DependsOn) > 0 {
142+
deps = make([]string, len(t.DependsOn))
143+
copy(deps, t.DependsOn)
144+
}
145+
tasks[i] = ultraplan.PlannedTask{
146+
ID: t.ID,
147+
Title: t.Title,
148+
Description: t.Description,
149+
Files: files,
150+
DependsOn: deps,
151+
Priority: t.Priority,
152+
EstComplexity: ultraplan.TaskComplexity(t.EstComplexity),
153+
IssueURL: t.IssueURL,
154+
NoCode: t.NoCode,
155+
}
156+
}
157+
158+
depGraph := make(map[string][]string, len(src.DependencyGraph))
159+
for k, v := range src.DependencyGraph {
160+
cp := make([]string, len(v))
161+
copy(cp, v)
162+
depGraph[k] = cp
163+
}
164+
165+
execOrder := make([][]string, len(src.ExecutionOrder))
166+
for i, group := range src.ExecutionOrder {
167+
execOrder[i] = make([]string, len(group))
168+
copy(execOrder[i], group)
169+
}
170+
171+
var insights []string
172+
if len(src.Insights) > 0 {
173+
insights = make([]string, len(src.Insights))
174+
copy(insights, src.Insights)
175+
}
176+
var constraints []string
177+
if len(src.Constraints) > 0 {
178+
constraints = make([]string, len(src.Constraints))
179+
copy(constraints, src.Constraints)
180+
}
181+
182+
return &ultraplan.PlanSpec{
183+
ID: src.ID,
184+
Objective: src.Objective,
185+
Summary: src.Summary,
186+
Tasks: tasks,
187+
DependencyGraph: depGraph,
188+
ExecutionOrder: execOrder,
189+
Insights: insights,
190+
Constraints: constraints,
191+
CreatedAt: src.CreatedAt,
192+
}
193+
}

0 commit comments

Comments
 (0)