@@ -60,27 +60,13 @@ type SaturationDetector interface {
60
60
IsSaturated (ctx context.Context , candidatePods []backendmetrics.PodMetrics ) bool
61
61
}
62
62
63
- type RequestControlPlugins struct {
64
- preRequestPlugins []PreRequest
65
- responseReceivedPlugins []ResponseReceived
66
- responseStreamingPlugins []ResponseStreaming
67
- responseCompletePlugins []ResponseComplete
68
- }
69
-
70
63
// NewDirectorWithConfig creates a new Director instance with all dependencies.
71
64
func NewDirectorWithConfig (datastore Datastore , scheduler Scheduler , saturationDetector SaturationDetector , config * Config ) * Director {
72
- RCPlugins := RequestControlPlugins {
73
- preRequestPlugins : config .preRequestPlugins ,
74
- responseReceivedPlugins : config .responseReceivedPlugins ,
75
- responseStreamingPlugins : config .responseStreamingPlugins ,
76
- responseCompletePlugins : config .responseCompletePlugins ,
77
- }
78
-
79
65
return & Director {
80
66
datastore : datastore ,
81
67
scheduler : scheduler ,
82
68
saturationDetector : saturationDetector ,
83
- requestControlPlugins : RCPlugins ,
69
+ requestControlPlugins : * config ,
84
70
defaultPriority : 0 , // define default priority explicitly
85
71
}
86
72
}
@@ -90,7 +76,7 @@ type Director struct {
90
76
datastore Datastore
91
77
scheduler Scheduler
92
78
saturationDetector SaturationDetector
93
- requestControlPlugins RequestControlPlugins
79
+ requestControlPlugins Config
94
80
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
95
81
// no need to set this in the constructor, since the value we want is the default int val
96
82
// and value types cannot be nil
@@ -291,7 +277,7 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
291
277
return pm
292
278
}
293
279
294
- // HandleResponseReceived is called when the first chunk of the response arrives .
280
+ // HandleResponseReceived is called when the response headers are received .
295
281
func (d * Director ) HandleResponseReceived (ctx context.Context , reqCtx * handlers.RequestContext ) (* handlers.RequestContext , error ) {
296
282
response := & Response {
297
283
RequestId : reqCtx .Request .Headers [requtil .RequestIdHeaderKey ],
@@ -347,22 +333,22 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
347
333
schedulingResult * schedulingtypes.SchedulingResult , targetPort int ) {
348
334
loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
349
335
for _ , plugin := range d .requestControlPlugins .preRequestPlugins {
350
- loggerDebug .Info ("Running pre-request plugin" , "plugin" , plugin .TypedName ())
336
+ loggerDebug .Info ("Running PreRequest plugin" , "plugin" , plugin .TypedName ())
351
337
before := time .Now ()
352
338
plugin .PreRequest (ctx , request , schedulingResult , targetPort )
353
339
metrics .RecordPluginProcessingLatency (PreRequestExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
354
- loggerDebug .Info ("Completed running pre-request plugin successfully" , "plugin" , plugin .TypedName ())
340
+ loggerDebug .Info ("Completed running PreRequest plugin successfully" , "plugin" , plugin .TypedName ())
355
341
}
356
342
}
357
343
358
344
func (d * Director ) runResponseReceivedPlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
359
345
loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
360
346
for _ , plugin := range d .requestControlPlugins .responseReceivedPlugins {
361
- loggerDebug .Info ("Running post-response plugin" , "plugin" , plugin .TypedName ())
347
+ loggerDebug .Info ("Running ResponseReceived plugin" , "plugin" , plugin .TypedName ())
362
348
before := time .Now ()
363
349
plugin .ResponseReceived (ctx , request , response , targetPod )
364
350
metrics .RecordPluginProcessingLatency (ResponseReceivedExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
365
- loggerDebug .Info ("Completed running post-response plugin successfully" , "plugin" , plugin .TypedName ())
351
+ loggerDebug .Info ("Completed running ResponseReceived plugin successfully" , "plugin" , plugin .TypedName ())
366
352
}
367
353
}
368
354
@@ -373,16 +359,17 @@ func (d *Director) runResponseStreamingPlugins(ctx context.Context, request *sch
373
359
before := time .Now ()
374
360
plugin .ResponseStreaming (ctx , request , response , targetPod )
375
361
metrics .RecordPluginProcessingLatency (ResponseStreamingExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
362
+ loggerTrace .Info ("Completed running ResponseStreaming plugin successfully" , "plugin" , plugin .TypedName ())
376
363
}
377
364
}
378
365
379
366
func (d * Director ) runResponseCompletePlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
380
367
loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
381
368
for _ , plugin := range d .requestControlPlugins .responseCompletePlugins {
382
- loggerDebug .Info ("Running post-response complete plugin" , "plugin" , plugin .TypedName ())
369
+ loggerDebug .Info ("Running ResponseComplete plugin" , "plugin" , plugin .TypedName ())
383
370
before := time .Now ()
384
371
plugin .ResponseComplete (ctx , request , response , targetPod )
385
372
metrics .RecordPluginProcessingLatency (ResponseCompleteExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
386
- loggerDebug .Info ("Completed running post-response complete plugin successfully" , "plugin" , plugin .TypedName ())
373
+ loggerDebug .Info ("Completed running ResponseComplete plugin successfully" , "plugin" , plugin .TypedName ())
387
374
}
388
375
}
0 commit comments