Skip to content

Commit 82105a6

Browse files
authored
move PostResponse plugins to requestcontrol instead of scheduler (#914)
* move PostResponse plugins to requestcontrol instead of scheduler Signed-off-by: Nir Rozenbaum <[email protected]> * typo Signed-off-by: Nir Rozenbaum <[email protected]> * fixed typo raised by elevran in code review Signed-off-by: Nir Rozenbaum <[email protected]> * added general Plugin interface in requestcontrol layer Signed-off-by: Nir Rozenbaum <[email protected]> * removed LLMResponse from scheduler Signed-off-by: Nir Rozenbaum <[email protected]> * added metric for request-control plugin and fixed a copy paste typo when recording plugin time Signed-off-by: Nir Rozenbaum <[email protected]> * converged to a single plugins interface based on code review Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 1fb2a23 commit 82105a6

File tree

15 files changed

+229
-213
lines changed

15 files changed

+229
-213
lines changed

cmd/epp/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
43+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
@@ -227,6 +228,8 @@ func run() error {
227228

228229
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)
229230

231+
director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
232+
230233
// --- Setup ExtProc Server Runner ---
231234
serverRunner := &runserver.ExtProcServerRunner{
232235
GrpcPort: *grpcPort,
@@ -237,7 +240,7 @@ func run() error {
237240
SecureServing: *secureServing,
238241
CertPath: *certPath,
239242
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
240-
Scheduler: scheduler,
243+
Director: director,
241244
SaturationDetector: saturationDetector,
242245
}
243246
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {

pkg/epp/handlers/server.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3333
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3434
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
35+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3536
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3637
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3738
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
@@ -79,7 +80,7 @@ type StreamingServer struct {
7980
// Specifically, there are fields related to the ext-proc protocol, and then fields related to the lifecycle of the request.
8081
// We should split these apart as this monolithic object exposes too much data to too many layers.
8182
type RequestContext struct {
82-
TargetPod string
83+
TargetPod *backend.Pod
8384
TargetEndpoint string
8485
Model string
8586
ResolvedTargetModel string
@@ -93,6 +94,8 @@ type RequestContext struct {
9394
RequestRunning bool
9495
Request *Request
9596

97+
SchedulingRequest *schedulingtypes.LLMRequest
98+
9699
RequestState StreamRequestState
97100
modelServerStreaming bool
98101

pkg/epp/metrics/metrics.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,18 @@ var (
202202
[]string{"plugin_type", "plugin_name"},
203203
)
204204

205+
RequestControlPluginProcessingLatencies = prometheus.NewHistogramVec(
206+
prometheus.HistogramOpts{
207+
Subsystem: InferenceExtension,
208+
Name: "request_control_plugin_duration_seconds",
209+
Help: metricsutil.HelpMsgWithStability("RequestControl plugin processing latency distribution in seconds for each plugin type and plugin name.", compbasemetrics.ALPHA),
210+
Buckets: []float64{
211+
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
212+
},
213+
},
214+
[]string{"plugin_type", "plugin_name"},
215+
)
216+
205217
// Prefix indexer Metrics
206218
PrefixCacheSize = prometheus.NewGaugeVec(
207219
prometheus.GaugeOpts{
@@ -263,6 +275,7 @@ func Register(customCollectors ...prometheus.Collector) {
263275
metrics.Registry.MustRegister(inferencePoolReadyPods)
264276
metrics.Registry.MustRegister(SchedulerPluginProcessingLatencies)
265277
metrics.Registry.MustRegister(SchedulerE2ELatency)
278+
metrics.Registry.MustRegister(RequestControlPluginProcessingLatencies)
266279
metrics.Registry.MustRegister(InferenceExtensionInfo)
267280
metrics.Registry.MustRegister(PrefixCacheSize)
268281
metrics.Registry.MustRegister(PrefixCacheHitRatio)
@@ -289,6 +302,7 @@ func Reset() {
289302
inferencePoolReadyPods.Reset()
290303
SchedulerPluginProcessingLatencies.Reset()
291304
SchedulerE2ELatency.Reset()
305+
RequestControlPluginProcessingLatencies.Reset()
292306
InferenceExtensionInfo.Reset()
293307
PrefixCacheSize.Reset()
294308
PrefixCacheHitRatio.Reset()
@@ -400,6 +414,11 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
400414
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
401415
}
402416

417+
// RecordRequestControlPluginProcessingLatency records the processing latency for a request-control plugin.
418+
func RecordRequestControlPluginProcessingLatency(pluginType, pluginName string, duration time.Duration) {
419+
RequestControlPluginProcessingLatencies.WithLabelValues(pluginType, pluginName).Observe(duration.Seconds())
420+
}
421+
403422
// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
404423
func RecordPrefixCacheSize(size int64) {
405424
PrefixCacheSize.WithLabelValues().Set(float64(size))

pkg/epp/plugins/plugins.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package plugins
18+
19+
// Plugin defines the interface for a plugin.
20+
// This interface should be embedded in all plugins across the code.
21+
type Plugin interface {
22+
// Name returns the name of the plugin.
23+
Name() string
24+
}

pkg/epp/requestcontrol/director.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ import (
2323
"fmt"
2424
"math/rand"
2525
"strconv"
26+
"time"
2627

2728
"github.com/go-logr/logr"
2829
"sigs.k8s.io/controller-runtime/pkg/log"
2930
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3031
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
34+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3335
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3436
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3537
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -39,24 +41,32 @@ import (
3941
// Scheduler defines the interface required by the Director for scheduling.
4042
type Scheduler interface {
4143
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result map[string]*schedulingtypes.Result, err error)
42-
OnResponse(ctx context.Context, resp *schedulingtypes.LLMResponse, targetPodName string)
4344
}
4445

4546
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
4647
type SaturationDetector interface {
4748
IsSaturated(ctx context.Context) bool
4849
}
4950

51+
// NewDirector creates a new Director instance with all dependencies.
52+
// postResponsePlugins remains nil as this is an optional field that can be set using the "WithPostResponsePlugins" function.
53+
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
54+
return &Director{datastore: datastore, scheduler: scheduler, saturationDetector: saturationDetector}
55+
}
56+
5057
// Director orchestrates the request handling flow, including scheduling.
5158
type Director struct {
52-
datastore datastore.Datastore
53-
scheduler Scheduler
54-
saturationDetector SaturationDetector
59+
datastore datastore.Datastore
60+
scheduler Scheduler
61+
saturationDetector SaturationDetector
62+
postResponsePlugins []PostResponsePlugin
5563
}
5664

57-
// NewDirector creates a new Director instance with all dependencies.
58-
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
59-
return &Director{datastore, scheduler, saturationDetector}
65+
// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
66+
// If the Director has PostResponse plugins already, this call replaces the existing plugins with the given ones.
67+
func (d *Director) WithPostResponsePlugins(plugins ...PostResponsePlugin) *Director {
68+
d.postResponsePlugins = plugins
69+
return d
6070
}
6171

6272
// HandleRequest orchestrates the request lifecycle:
@@ -104,7 +114,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
104114
}
105115

106116
// Prepare LLMRequest (needed for both saturation detection and Scheduler)
107-
llmReq := &schedulingtypes.LLMRequest{
117+
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
108118
TargetModel: reqCtx.ResolvedTargetModel,
109119
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
110120
Critical: requestCriticality == v1alpha2.Critical,
@@ -113,7 +123,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
113123
}
114124
logger = logger.WithValues(
115125
"model", reqCtx.Model,
116-
"resolvedTargetModel", llmReq.TargetModel,
126+
"resolvedTargetModel", reqCtx.ResolvedTargetModel,
117127
"criticality", requestCriticality,
118128
)
119129
ctx = log.IntoContext(ctx, logger)
@@ -126,7 +136,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
126136
}
127137

128138
// --- 3. Dispatch (Calls Scheduler) ---
129-
results, dispatchErr := d.Dispatch(ctx, llmReq)
139+
results, dispatchErr := d.Dispatch(ctx, reqCtx.SchedulingRequest)
130140
if dispatchErr != nil {
131141
return reqCtx, dispatchErr
132142
}
@@ -193,22 +203,19 @@ func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestCon
193203
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
194204
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
195205

196-
reqCtx.TargetPod = targetPod.NamespacedName.String()
206+
reqCtx.TargetPod = targetPod
197207
reqCtx.TargetEndpoint = endpoint
198208

199209
return reqCtx, nil
200210
}
201211

202212
func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
203-
logger := log.FromContext(ctx)
204-
205-
llmResp := &schedulingtypes.LLMResponse{
213+
response := &Response{
206214
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
207215
Headers: reqCtx.Response.Headers,
208216
}
209-
logger.V(logutil.DEBUG).Info("LLM response assembled", "response", llmResp)
210217

211-
d.scheduler.OnResponse(ctx, llmResp, reqCtx.TargetPod)
218+
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)
212219

213220
return reqCtx, nil
214221
}
@@ -253,3 +260,12 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
253260
}
254261
return ""
255262
}
263+
264+
func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
265+
for _, plugin := range d.postResponsePlugins {
266+
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name())
267+
before := time.Now()
268+
plugin.PostResponse(ctx, request, response, targetPod)
269+
metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.Name(), time.Since(before))
270+
}
271+
}

0 commit comments

Comments
 (0)