Skip to content

Commit 0bbab02

Browse files
committed
feat: support dynamic router based on picked endpoint
Signed-off-by: bitliu <[email protected]>
1 parent 4b1fdaa commit 0bbab02

File tree

4 files changed

+77
-99
lines changed

4 files changed

+77
-99
lines changed

config/envoy.yaml

Lines changed: 12 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -38,39 +38,11 @@ static_resources:
3838
- name: local_service
3939
domains: ["*"]
4040
routes:
41-
# Dynamic routing based on selected endpoint header
41+
# Single route using original destination cluster
4242
- match:
4343
prefix: "/"
44-
headers:
45-
- name: "x-semantic-destination-endpoint"
46-
string_match:
47-
exact: "endpoint1"
4844
route:
49-
cluster: vllm_endpoint1
50-
timeout: 300s
51-
- match:
52-
prefix: "/"
53-
headers:
54-
- name: "x-semantic-destination-endpoint"
55-
string_match:
56-
exact: "endpoint2"
57-
route:
58-
cluster: vllm_endpoint2
59-
timeout: 300s
60-
- match:
61-
prefix: "/"
62-
headers:
63-
- name: "x-semantic-destination-endpoint"
64-
string_match:
65-
exact: "endpoint3"
66-
route:
67-
cluster: vllm_endpoint3
68-
timeout: 300s
69-
# Fallback route - will be routed by the external processor
70-
- match:
71-
prefix: "/"
72-
route:
73-
cluster: vllm_endpoint1 # Default fallback
45+
cluster: vllm_dynamic_cluster
7446
timeout: 300s
7547
http_filters:
7648
- name: envoy.filters.http.ext_proc
@@ -85,6 +57,8 @@ static_resources:
8557
response_header_mode: "SEND"
8658
request_body_mode: "BUFFERED"
8759
response_body_mode: "BUFFERED"
60+
request_trailer_mode: "SKIP"
61+
response_trailer_mode: "SKIP"
8862
failure_mode_allow: true
8963
message_timeout: 300s
9064
- name: envoy.filters.http.router
@@ -95,6 +69,7 @@ static_resources:
9569
clusters:
9670
- name: extproc_service
9771
connect_timeout: 300s
72+
per_connection_buffer_limit_bytes: 52428800
9873
type: STATIC
9974
lb_policy: ROUND_ROBIN
10075
typed_extension_protocol_options:
@@ -114,64 +89,17 @@ static_resources:
11489
socket_address:
11590
address: 127.0.0.1
11691
port_value: 50051
117-
118-
# Multiple vLLM backend clusters
119-
- name: vllm_endpoint1
120-
connect_timeout: 300s
121-
type: STRICT_DNS
122-
lb_policy: ROUND_ROBIN
123-
typed_extension_protocol_options:
124-
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
125-
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
126-
explicit_http_config:
127-
http_protocol_options: {}
128-
load_assignment:
129-
cluster_name: vllm_endpoint1
130-
endpoints:
131-
- lb_endpoints:
132-
- endpoint:
133-
address:
134-
socket_address:
135-
address: 192.168.12.90
136-
port_value: 11434
137-
load_balancing_weight: 1
13892

139-
- name: vllm_endpoint2
93+
# Dynamic vLLM cluster using original destination
94+
- name: vllm_dynamic_cluster
14095
connect_timeout: 300s
141-
type: STRICT_DNS
142-
lb_policy: ROUND_ROBIN
96+
type: ORIGINAL_DST
97+
lb_policy: CLUSTER_PROVIDED
98+
original_dst_lb_config:
99+
use_http_header: true
100+
http_header_name: "x-semantic-destination-endpoint"
143101
typed_extension_protocol_options:
144102
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
145103
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
146104
explicit_http_config:
147105
http_protocol_options: {}
148-
load_assignment:
149-
cluster_name: vllm_endpoint2
150-
endpoints:
151-
- lb_endpoints:
152-
- endpoint:
153-
address:
154-
socket_address:
155-
address: 192.168.12.90
156-
port_value: 11434
157-
load_balancing_weight: 1
158-
159-
- name: vllm_endpoint3
160-
connect_timeout: 300s
161-
type: STRICT_DNS
162-
lb_policy: ROUND_ROBIN
163-
typed_extension_protocol_options:
164-
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
165-
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
166-
explicit_http_config:
167-
http_protocol_options: {}
168-
load_assignment:
169-
cluster_name: vllm_endpoint3
170-
endpoints:
171-
- lb_endpoints:
172-
- endpoint:
173-
address:
174-
socket_address:
175-
address: 192.168.12.90
176-
port_value: 11434
177-
load_balancing_weight: 2 # Higher weight for more powerful endpoint

src/semantic-router/pkg/config/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,30 @@ func (c *RouterConfig) SelectBestEndpointForModel(modelName string) (string, boo
458458
return bestEndpoint.Name, true
459459
}
460460

461+
// SelectBestEndpointAddressForModel selects the best endpoint for a model and returns the address:port
462+
// Returns the endpoint address:port string and whether selection was successful
463+
func (c *RouterConfig) SelectBestEndpointAddressForModel(modelName string) (string, bool) {
464+
endpoints := c.GetEndpointsForModel(modelName)
465+
if len(endpoints) == 0 {
466+
return "", false
467+
}
468+
469+
// If only one endpoint, return it
470+
if len(endpoints) == 1 {
471+
return fmt.Sprintf("%s:%d", endpoints[0].Address, endpoints[0].Port), true
472+
}
473+
474+
// Select endpoint with highest weight
475+
bestEndpoint := endpoints[0]
476+
for _, endpoint := range endpoints[1:] {
477+
if endpoint.Weight > bestEndpoint.Weight {
478+
bestEndpoint = endpoint
479+
}
480+
}
481+
482+
return fmt.Sprintf("%s:%d", bestEndpoint.Address, bestEndpoint.Port), true
483+
}
484+
461485
// ValidateEndpoints validates that all configured models have at least one endpoint
462486
func (c *RouterConfig) ValidateEndpoints() error {
463487
// Get all models from categories

src/semantic-router/pkg/extproc/endpoint_selection_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ var _ = Describe("Endpoint Selection", func() {
7777
for _, header := range headerMutation.SetHeaders {
7878
if header.Header.Key == "x-semantic-destination-endpoint" {
7979
endpointHeaderFound = true
80-
// Should be one of the configured endpoints
81-
Expect(header.Header.Value).To(BeElementOf("test-endpoint1", "test-endpoint2"))
80+
// Should be one of the configured endpoint addresses
81+
Expect(header.Header.Value).To(BeElementOf("127.0.0.1:8000", "127.0.0.1:8001"))
8282
}
8383
if header.Header.Key == "x-selected-model" {
8484
modelHeaderFound = true
@@ -148,7 +148,7 @@ var _ = Describe("Endpoint Selection", func() {
148148

149149
if endpointHeaderFound {
150150
// model-a should be routed to test-endpoint1 based on preferred endpoints
151-
Expect(selectedEndpoint).To(Equal("test-endpoint1"))
151+
Expect(selectedEndpoint).To(Equal("127.0.0.1:8000"))
152152
}
153153
}
154154
})
@@ -207,7 +207,7 @@ var _ = Describe("Endpoint Selection", func() {
207207

208208
if endpointHeaderFound {
209209
// model-b should be routed to test-endpoint2 (higher weight) or test-endpoint1
210-
Expect(selectedEndpoint).To(BeElementOf("test-endpoint1", "test-endpoint2"))
210+
Expect(selectedEndpoint).To(BeElementOf("127.0.0.1:8000", "127.0.0.1:8001"))
211211
}
212212
}
213213
})
@@ -256,6 +256,11 @@ var _ = Describe("Endpoint Selection", func() {
256256
bestEndpoint, found := cfg.SelectBestEndpointForModel("model-b")
257257
Expect(found).To(BeTrue())
258258
Expect(bestEndpoint).To(BeElementOf("test-endpoint1", "test-endpoint2"))
259+
260+
// Test best endpoint address selection
261+
bestEndpointAddress, found := cfg.SelectBestEndpointAddressForModel("model-b")
262+
Expect(found).To(BeTrue())
263+
Expect(bestEndpointAddress).To(BeElementOf("127.0.0.1:8000", "127.0.0.1:8001"))
259264
})
260265
})
261266

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
344344
actualModel = matchedModel
345345

346346
// Select the best endpoint for this model
347-
endpoint, endpointFound := r.Config.SelectBestEndpointForModel(matchedModel)
347+
endpointAddress, endpointFound := r.Config.SelectBestEndpointAddressForModel(matchedModel)
348348
if endpointFound {
349-
selectedEndpoint = endpoint
350-
log.Printf("Selected endpoint: %s for model: %s", selectedEndpoint, matchedModel)
349+
selectedEndpoint = endpointAddress
350+
log.Printf("Selected endpoint address: %s for model: %s", selectedEndpoint, matchedModel)
351351
} else {
352352
log.Printf("Warning: No endpoint found for model %s, using fallback", matchedModel)
353353
}
@@ -386,7 +386,6 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
386386
setHeaders = append(setHeaders, &core.HeaderValueOption{
387387
Header: &core.HeaderValue{
388388
Key: "x-semantic-destination-endpoint",
389-
Value: selectedEndpoint,
390389
RawValue: []byte(selectedEndpoint),
391390
},
392391
})
@@ -416,10 +415,9 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
416415
Response: &ext_proc.ProcessingResponse_RequestBody{
417416
RequestBody: &ext_proc.BodyResponse{
418417
Response: &ext_proc.CommonResponse{
419-
ClearRouteCache: true,
420-
Status: ext_proc.CommonResponse_CONTINUE_AND_REPLACE,
421-
HeaderMutation: headerMutation,
422-
BodyMutation: bodyMutation,
418+
Status: ext_proc.CommonResponse_CONTINUE,
419+
HeaderMutation: headerMutation,
420+
BodyMutation: bodyMutation,
423421
},
424422
},
425423
},
@@ -444,13 +442,36 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
444442
}
445443

446444
// Select the best endpoint for the specified model
447-
endpoint, endpointFound := r.Config.SelectBestEndpointForModel(originalModel)
445+
endpointAddress, endpointFound := r.Config.SelectBestEndpointAddressForModel(originalModel)
448446
if endpointFound {
449-
selectedEndpoint = endpoint
450-
log.Printf("Selected endpoint: %s for model: %s", selectedEndpoint, originalModel)
447+
selectedEndpoint = endpointAddress
448+
log.Printf("Selected endpoint address: %s for model: %s", selectedEndpoint, originalModel)
451449
} else {
450+
// TOOD(Xunzhuo): pick a random endpoint from the list of all available endpoints
452451
log.Printf("Warning: No endpoint found for model %s, using fallback", originalModel)
453452
}
453+
setHeaders := []*core.HeaderValueOption{}
454+
if selectedEndpoint != "" {
455+
setHeaders = append(setHeaders, &core.HeaderValueOption{
456+
Header: &core.HeaderValue{
457+
Key: "x-semantic-destination-endpoint",
458+
RawValue: []byte(selectedEndpoint),
459+
},
460+
})
461+
}
462+
// Set the response with body mutation and content-length removal
463+
response = &ext_proc.ProcessingResponse{
464+
Response: &ext_proc.ProcessingResponse_RequestBody{
465+
RequestBody: &ext_proc.BodyResponse{
466+
Response: &ext_proc.CommonResponse{
467+
Status: ext_proc.CommonResponse_CONTINUE,
468+
HeaderMutation: &ext_proc.HeaderMutation{
469+
SetHeaders: setHeaders,
470+
},
471+
},
472+
},
473+
},
474+
}
454475
}
455476

456477
// Save the actual model that will be used for token tracking

0 commit comments

Comments
 (0)