Skip to content

Commit 9acd73b

Browse files
Copilotrootfs
andcommitted
Refactor ensemble as independent service, remove extproc integration
Co-authored-by: rootfs <[email protected]>
1 parent 239c0e8 commit 9acd73b

File tree

10 files changed

+263
-174
lines changed

10 files changed

+263
-174
lines changed

ENSEMBLE_IMPLEMENTATION.md

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@
44

55
This document summarizes the implementation of ensemble orchestration support in the semantic-router. The feature enables parallel model inference with configurable aggregation strategies, allowing improved reliability, accuracy, and flexible cost-performance trade-offs.
66

7+
## Architecture
8+
9+
The ensemble service is implemented as an **independent OpenAI-compatible API server** that runs alongside the semantic router. This design allows:
10+
- Clean separation of concerns (extproc doesn't handle multiple downstream endpoints)
11+
- Scalable deployment (ensemble service can be scaled independently)
12+
- Flexibility (can be used standalone or integrated with semantic router)
13+
14+
```
15+
Client → Semantic Router ExtProc → Ensemble Service (Port 8081) → Model Endpoints
16+
↓ ↓
17+
(Set Headers) (Parallel Query + Aggregation)
18+
```
19+
720
## Implementation Summary
821

922
### Files Created
@@ -17,14 +30,17 @@ This document summarizes the implementation of ensemble orchestration support in
1730
- Parallel model querying with semaphore-based concurrency control
1831
- Multiple aggregation strategies implementation
1932
- Authentication header forwarding
33+
- Helper methods for default values
2034

2135
3. **src/semantic-router/pkg/ensemble/factory_test.go**
2236
- Comprehensive test suite covering all factory operations
2337
- 100% test coverage for core ensemble functionality
2438

25-
4. **src/semantic-router/pkg/extproc/req_filter_ensemble.go**
26-
- Request filter for ensemble orchestration in extproc flow
27-
- Integration with OpenAIRouter
39+
4. **src/semantic-router/pkg/ensembleserver/server.go**
40+
- Independent HTTP server for ensemble orchestration
41+
- OpenAI-compatible /v1/chat/completions endpoint
42+
- Health check endpoint
43+
- Header-based control of ensemble behavior
2844

2945
5. **config/ensemble/ensemble-example.yaml**
3046
- Example configuration file demonstrating all ensemble options
@@ -46,19 +62,9 @@ This document summarizes the implementation of ensemble orchestration support in
4662
3. **config/config.yaml**
4763
- Added ensemble configuration section (disabled by default)
4864

49-
4. **src/semantic-router/pkg/extproc/router.go**
50-
- Added EnsembleFactory field to OpenAIRouter
51-
- Initialize ensemble factory from configuration
52-
53-
5. **src/semantic-router/pkg/extproc/processor_req_header.go**
54-
- Parse ensemble headers from incoming requests
55-
- Added ensemble fields to RequestContext
56-
57-
6. **src/semantic-router/pkg/extproc/processor_req_body.go**
58-
- Integrate ensemble request handling into request flow
59-
60-
7. **src/semantic-router/pkg/extproc/processor_res_header.go**
61-
- Add ensemble metadata to response headers
65+
4. **src/semantic-router/cmd/main.go**
66+
- Start ensemble server when enabled in configuration
67+
- Support for -ensemble-port flag (default: 8081)
6268

6369
## Key Features
6470

config/ensemble/README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ The ensemble orchestration feature allows you to:
99
- Combine their outputs using various aggregation strategies
1010
- Improve reliability, accuracy, and cost-performance trade-offs
1111

12+
## Architecture
13+
14+
The ensemble service runs as an **independent OpenAI-compatible API server** (default port: 8081). The semantic router extproc sets ensemble headers and routes requests to this service, which then queries multiple model endpoints and returns the aggregated response.
15+
16+
```
17+
Client Request → Semantic Router ExtProc → Ensemble Service → Model Endpoints
18+
↓ ↓
19+
(Set Headers) (Parallel Queries + Aggregation)
20+
```
21+
1222
## Configuration
1323

1424
### Basic Setup
@@ -52,10 +62,27 @@ Control ensemble behavior using HTTP headers:
5262
| `x-ensemble-strategy` | Aggregation strategy | `voting` |
5363
| `x-ensemble-min-responses` | Minimum responses required | `2` |
5464

65+
### Service Startup
66+
67+
When ensemble is enabled, the router automatically starts the ensemble service:
68+
69+
```bash
70+
# Start the router (includes ensemble service on port 8081 if enabled)
71+
./bin/router -config=config/config.yaml
72+
```
73+
74+
To specify a custom ensemble port:
75+
76+
```bash
77+
./bin/router -config=config/config.yaml -ensemble-port=8082
78+
```
79+
5580
### Example Request
5681

82+
Send requests directly to the ensemble service:
83+
5784
```bash
58-
curl -X POST http://localhost:8080/v1/chat/completions \
85+
curl -X POST http://localhost:8081/v1/chat/completions \
5986
-H "Content-Type: application/json" \
6087
-H "x-ensemble-enable: true" \
6188
-H "x-ensemble-models: model-a,model-b,model-c" \

src/semantic-router/cmd/main.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
candle_binding "github.com/vllm-project/semantic-router/candle-binding"
1616
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/apiserver"
1717
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/config"
18+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/ensembleserver"
1819
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/extproc"
1920
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/k8s"
2021
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
@@ -157,6 +158,19 @@ func main() {
157158
}()
158159
}
159160

161+
// Start Ensemble server if enabled in configuration
162+
ensemblePort := flag.Int("ensemble-port", 8081, "Port to listen on for Ensemble API")
163+
flag.Parse() // Re-parse to pick up ensemble-port
164+
165+
if cfg.Ensemble.Enabled {
166+
go func() {
167+
logging.Infof("Starting Ensemble server on port %d", *ensemblePort)
168+
if err := ensembleserver.Init(cfg, *ensemblePort); err != nil {
169+
logging.Errorf("Start Ensemble server error: %v", err)
170+
}
171+
}()
172+
}
173+
160174
// Start Kubernetes controller if ConfigSource is kubernetes
161175
if cfg.ConfigSource == config.ConfigSourceKubernetes {
162176
logging.Infof("ConfigSource is kubernetes, starting Kubernetes controller")

src/semantic-router/pkg/ensemble/factory.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ func (f *Factory) RegisterEndpoint(modelName, endpointURL string) {
5656
logging.Infof("Registered ensemble endpoint: %s -> %s", modelName, endpointURL)
5757
}
5858

59+
// GetDefaultStrategy returns the configured default strategy
60+
func (f *Factory) GetDefaultStrategy() Strategy {
61+
return f.config.DefaultStrategy
62+
}
63+
64+
// GetDefaultMinResponses returns the configured default minimum responses
65+
func (f *Factory) GetDefaultMinResponses() int {
66+
return f.config.DefaultMinResponses
67+
}
68+
5969
// Execute performs ensemble orchestration for the given request
6070
func (f *Factory) Execute(req *Request) *Response {
6171
if !f.config.Enabled {
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package ensembleserver
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strconv"
9+
"strings"
10+
"time"
11+
12+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/config"
13+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/ensemble"
14+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/headers"
15+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
16+
)
17+
18+
// EnsembleServer handles OpenAI-compatible ensemble requests
19+
type EnsembleServer struct {
20+
factory *ensemble.Factory
21+
config *config.RouterConfig
22+
}
23+
24+
// Init starts the ensemble API server
25+
func Init(cfg *config.RouterConfig, port int) error {
26+
if cfg == nil {
27+
return fmt.Errorf("configuration not initialized")
28+
}
29+
30+
if !cfg.Ensemble.Enabled {
31+
logging.Infof("Ensemble service is disabled in configuration")
32+
return nil
33+
}
34+
35+
// Initialize ensemble factory
36+
ensembleConfig := &ensemble.Config{
37+
Enabled: cfg.Ensemble.Enabled,
38+
DefaultStrategy: ensemble.Strategy(cfg.Ensemble.DefaultStrategy),
39+
DefaultMinResponses: cfg.Ensemble.DefaultMinResponses,
40+
TimeoutSeconds: cfg.Ensemble.TimeoutSeconds,
41+
MaxConcurrentRequests: cfg.Ensemble.MaxConcurrentRequests,
42+
}
43+
factory := ensemble.NewFactory(ensembleConfig)
44+
45+
// Register endpoint mappings from config
46+
for modelName, endpoint := range cfg.Ensemble.EndpointMappings {
47+
factory.RegisterEndpoint(modelName, endpoint)
48+
}
49+
50+
server := &EnsembleServer{
51+
factory: factory,
52+
config: cfg,
53+
}
54+
55+
// Create HTTP server
56+
mux := server.setupRoutes()
57+
httpServer := &http.Server{
58+
Addr: fmt.Sprintf(":%d", port),
59+
Handler: mux,
60+
ReadTimeout: 60 * time.Second,
61+
WriteTimeout: 60 * time.Second,
62+
IdleTimeout: 120 * time.Second,
63+
}
64+
65+
logging.Infof("Ensemble API server listening on port %d", port)
66+
return httpServer.ListenAndServe()
67+
}
68+
69+
// setupRoutes configures HTTP routes
70+
func (s *EnsembleServer) setupRoutes() *http.ServeMux {
71+
mux := http.NewServeMux()
72+
73+
// OpenAI-compatible endpoints
74+
mux.HandleFunc("/v1/chat/completions", s.handleChatCompletions)
75+
mux.HandleFunc("/health", s.handleHealth)
76+
77+
return mux
78+
}
79+
80+
// handleHealth returns service health status
81+
func (s *EnsembleServer) handleHealth(w http.ResponseWriter, r *http.Request) {
82+
w.Header().Set("Content-Type", "application/json")
83+
w.WriteHeader(http.StatusOK)
84+
json.NewEncoder(w).Encode(map[string]interface{}{
85+
"status": "healthy",
86+
"service": "ensemble",
87+
})
88+
}
89+
90+
// handleChatCompletions processes OpenAI chat completion requests with ensemble
91+
func (s *EnsembleServer) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
92+
if r.Method != http.MethodPost {
93+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
94+
return
95+
}
96+
97+
// Read request body
98+
body, err := io.ReadAll(r.Body)
99+
if err != nil {
100+
logging.Errorf("Failed to read request body: %v", err)
101+
http.Error(w, "Failed to read request body", http.StatusBadRequest)
102+
return
103+
}
104+
defer r.Body.Close()
105+
106+
// Parse ensemble headers
107+
ensembleEnabled := strings.ToLower(r.Header.Get(headers.EnsembleEnable)) == "true"
108+
if !ensembleEnabled {
109+
http.Error(w, "Ensemble not enabled in request headers", http.StatusBadRequest)
110+
return
111+
}
112+
113+
// Parse models list
114+
modelsHeader := r.Header.Get(headers.EnsembleModels)
115+
if modelsHeader == "" {
116+
http.Error(w, "No models specified in ensemble header", http.StatusBadRequest)
117+
return
118+
}
119+
120+
var models []string
121+
for _, model := range strings.Split(modelsHeader, ",") {
122+
trimmedModel := strings.TrimSpace(model)
123+
if trimmedModel != "" {
124+
models = append(models, trimmedModel)
125+
}
126+
}
127+
128+
if len(models) == 0 {
129+
http.Error(w, "No valid models specified", http.StatusBadRequest)
130+
return
131+
}
132+
133+
// Parse strategy
134+
strategy := ensemble.Strategy(r.Header.Get(headers.EnsembleStrategy))
135+
if strategy == "" {
136+
strategy = s.factory.GetDefaultStrategy()
137+
}
138+
139+
// Parse min responses
140+
minResponses := s.factory.GetDefaultMinResponses()
141+
if minRespHeader := r.Header.Get(headers.EnsembleMinResponses); minRespHeader != "" {
142+
if parsed, err := strconv.Atoi(minRespHeader); err == nil && parsed > 0 {
143+
minResponses = parsed
144+
}
145+
}
146+
147+
logging.Infof("Ensemble request: models=%v, strategy=%s, minResponses=%d", models, strategy, minResponses)
148+
149+
// Forward headers for authentication
150+
headerMap := make(map[string]string)
151+
for key, values := range r.Header {
152+
if len(values) > 0 {
153+
headerMap[key] = values[0]
154+
}
155+
}
156+
157+
// Build ensemble request
158+
ensembleReq := &ensemble.Request{
159+
Models: models,
160+
Strategy: strategy,
161+
MinResponses: minResponses,
162+
OriginalRequest: body,
163+
Headers: headerMap,
164+
Context: r.Context(),
165+
}
166+
167+
// Execute ensemble orchestration
168+
ensembleResp := s.factory.Execute(ensembleReq)
169+
170+
// Check for errors
171+
if ensembleResp.Error != nil {
172+
logging.Errorf("Ensemble execution failed: %v", ensembleResp.Error)
173+
http.Error(w, fmt.Sprintf("Ensemble orchestration failed: %v", ensembleResp.Error), http.StatusInternalServerError)
174+
return
175+
}
176+
177+
// Add ensemble metadata headers
178+
w.Header().Set(headers.VSREnsembleUsed, "true")
179+
w.Header().Set(headers.VSREnsembleModelsQueried, strconv.Itoa(ensembleResp.ModelsQueried))
180+
w.Header().Set(headers.VSREnsembleResponsesReceived, strconv.Itoa(ensembleResp.ResponsesReceived))
181+
w.Header().Set("Content-Type", "application/json")
182+
183+
// Return the aggregated response
184+
logging.Infof("Ensemble execution successful: queried=%d, received=%d, strategy=%s",
185+
ensembleResp.ModelsQueried, ensembleResp.ResponsesReceived, ensembleResp.Strategy)
186+
187+
w.WriteHeader(http.StatusOK)
188+
w.Write(ensembleResp.FinalResponse)
189+
}

src/semantic-router/pkg/extproc/processor_req_body.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,6 @@ func (r *OpenAIRouter) handleRequestBody(v *ext_proc.ProcessingRequest_RequestBo
6161
ctx.RequestModel = originalModel
6262
}
6363

64-
// Check if ensemble mode is requested
65-
if response, shouldReturn := r.handleEnsembleRequest(ctx); shouldReturn {
66-
return response, nil
67-
}
68-
6964
// Get content from messages
7065
userContent, nonUserMessages := extractUserAndNonUserContent(openAIRequest)
7166

0 commit comments

Comments
 (0)