@@ -59,29 +59,37 @@ type SaturationDetector interface {
59
59
IsSaturated (ctx context.Context , candidatePods []backendmetrics.PodMetrics ) bool
60
60
}
61
61
62
+ type RequestControlPlugins struct {
63
+ preRequestPlugins []PreRequest
64
+ responseReceivedPlugins []ResponseReceived
65
+ responseStreamingPlugins []ResponseStreaming
66
+ responseCompletePlugins []ResponseComplete
67
+ }
68
+
62
69
// NewDirectorWithConfig creates a new Director instance with all dependencies.
63
70
func NewDirectorWithConfig (datastore Datastore , scheduler Scheduler , saturationDetector SaturationDetector , config * Config ) * Director {
71
+ RCPlugins := RequestControlPlugins {
72
+ preRequestPlugins : config .preRequestPlugins ,
73
+ responseReceivedPlugins : config .responseReceivedPlugins ,
74
+ responseStreamingPlugins : config .responseStreamingPlugins ,
75
+ responseCompletePlugins : config .responseCompletePlugins ,
76
+ }
77
+
64
78
return & Director {
65
- datastore : datastore ,
66
- scheduler : scheduler ,
67
- saturationDetector : saturationDetector ,
68
- preRequestPlugins : config .preRequestPlugins ,
69
- postResponseReceivedPlugins : config .postResponseReceivedPlugins ,
70
- postResponseStreamingPlugins : config .postResponseStreamingPlugins ,
71
- postResponseCompletePlugins : config .postResponseCompletePlugins ,
72
- defaultPriority : 0 , // define default priority explicitly
79
+ datastore : datastore ,
80
+ scheduler : scheduler ,
81
+ saturationDetector : saturationDetector ,
82
+ requestControlPlugins : RCPlugins ,
83
+ defaultPriority : 0 , // define default priority explicitly
73
84
}
74
85
}
75
86
76
87
// Director orchestrates the request handling flow, including scheduling.
77
88
type Director struct {
78
- datastore Datastore
79
- scheduler Scheduler
80
- saturationDetector SaturationDetector
81
- preRequestPlugins []PreRequest
82
- postResponseReceivedPlugins []PostResponseReceived
83
- postResponseStreamingPlugins []PostResponseStreaming
84
- postResponseCompletePlugins []PostResponseComplete
89
+ datastore Datastore
90
+ scheduler Scheduler
91
+ saturationDetector SaturationDetector
92
+ requestControlPlugins RequestControlPlugins
85
93
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
86
94
// no need to set this in the constructor, since the value we want is the default int val
87
95
// and value types cannot be nil
@@ -291,7 +299,7 @@ func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.
291
299
292
300
// TODO: to extend fallback functionality, handle cases where target pod is unavailable
293
301
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224
294
- d .runPostResponseReceivedPlugins (ctx , reqCtx .SchedulingRequest , response , reqCtx .TargetPod )
302
+ d .runResponseReceivedPlugins (ctx , reqCtx .SchedulingRequest , response , reqCtx .TargetPod )
295
303
296
304
return reqCtx , nil
297
305
}
@@ -305,7 +313,7 @@ func (d *Director) HandleResponseBodyStreaming(ctx context.Context, reqCtx *hand
305
313
Headers : reqCtx .Response .Headers ,
306
314
}
307
315
308
- d .runPostResponseStreamingPlugins (ctx , reqCtx .SchedulingRequest , response , reqCtx .TargetPod )
316
+ d .runResponseStreamingPlugins (ctx , reqCtx .SchedulingRequest , response , reqCtx .TargetPod )
309
317
logger .V (logutil .TRACE ).Info ("Exiting HandleResponseBodyChunk" )
310
318
return reqCtx , nil
311
319
}
@@ -319,7 +327,7 @@ func (d *Director) HandleResponseBodyComplete(ctx context.Context, reqCtx *handl
319
327
Headers : reqCtx .Response .Headers ,
320
328
}
321
329
322
- d .runPostResponseCompletePlugins (ctx , reqCtx .SchedulingRequest , response , reqCtx .TargetPod )
330
+ d .runResponseCompletePlugins (ctx , reqCtx .SchedulingRequest , response , reqCtx .TargetPod )
323
331
324
332
logger .V (logutil .DEBUG ).Info ("Exiting HandleResponseBodyComplete" )
325
333
return reqCtx , nil
@@ -338,7 +346,7 @@ func (d *Director) GetRandomPod() *backend.Pod {
338
346
func (d * Director ) runPreRequestPlugins (ctx context.Context , request * schedulingtypes.LLMRequest ,
339
347
schedulingResult * schedulingtypes.SchedulingResult , targetPort int ) {
340
348
loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
341
- for _ , plugin := range d .preRequestPlugins {
349
+ for _ , plugin := range d .requestControlPlugins . preRequestPlugins {
342
350
loggerDebug .Info ("Running pre-request plugin" , "plugin" , plugin .TypedName ())
343
351
before := time .Now ()
344
352
plugin .PreRequest (ctx , request , schedulingResult , targetPort )
@@ -347,34 +355,34 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
347
355
}
348
356
}
349
357
350
- func (d * Director ) runPostResponseReceivedPlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
358
+ func (d * Director ) runResponseReceivedPlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
351
359
loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
352
- for _ , plugin := range d .postResponseReceivedPlugins {
360
+ for _ , plugin := range d .requestControlPlugins . responseReceivedPlugins {
353
361
loggerDebug .Info ("Running post-response plugin" , "plugin" , plugin .TypedName ())
354
362
before := time .Now ()
355
- plugin .PostResponseReceived (ctx , request , response , targetPod )
356
- metrics .RecordPluginProcessingLatency (PostResponseReceivedExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
363
+ plugin .ResponseReceived (ctx , request , response , targetPod )
364
+ metrics .RecordPluginProcessingLatency (ResponseReceivedExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
357
365
loggerDebug .Info ("Completed running post-response plugin successfully" , "plugin" , plugin .TypedName ())
358
366
}
359
367
}
360
368
361
- func (d * Director ) runPostResponseStreamingPlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
369
+ func (d * Director ) runResponseStreamingPlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
362
370
loggerTrace := log .FromContext (ctx ).V (logutil .TRACE )
363
- for _ , plugin := range d .postResponseStreamingPlugins {
371
+ for _ , plugin := range d .requestControlPlugins . responseStreamingPlugins {
364
372
loggerTrace .Info ("Running post-response chunk plugin" , "plugin" , plugin .TypedName ())
365
373
before := time .Now ()
366
- plugin .PostResponseStreaming (ctx , request , response , targetPod )
367
- metrics .RecordPluginProcessingLatency (PostResponseStreamingExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
374
+ plugin .ResponseStreaming (ctx , request , response , targetPod )
375
+ metrics .RecordPluginProcessingLatency (ResponseStreamingExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
368
376
}
369
377
}
370
378
371
- func (d * Director ) runPostResponseCompletePlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
379
+ func (d * Director ) runResponseCompletePlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
372
380
loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
373
- for _ , plugin := range d .postResponseCompletePlugins {
381
+ for _ , plugin := range d .requestControlPlugins . responseCompletePlugins {
374
382
loggerDebug .Info ("Running post-response complete plugin" , "plugin" , plugin .TypedName ())
375
383
before := time .Now ()
376
- plugin .PostResponseComplete (ctx , request , response , targetPod )
377
- metrics .RecordPluginProcessingLatency (PostResponseCompleteExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
384
+ plugin .ResponseComplete (ctx , request , response , targetPod )
385
+ metrics .RecordPluginProcessingLatency (ResponseCompleteExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
378
386
loggerDebug .Info ("Completed running post-response complete plugin successfully" , "plugin" , plugin .TypedName ())
379
387
}
380
388
}
0 commit comments