From f9ba10f208a5ab528ac6c3d7a2e9505ef0b87d1a Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Sat, 27 Sep 2025 00:31:57 +0000 Subject: [PATCH 1/9] Break out PostResponse plugin into 3 constituent plugins for request recieved, streaming, and complete --- pkg/epp/handlers/response.go | 12 +- pkg/epp/handlers/server.go | 4 +- pkg/epp/requestcontrol/director.go | 89 ++++++++--- pkg/epp/requestcontrol/director_test.go | 138 ++++++++++++++++-- pkg/epp/requestcontrol/plugins.go | 24 ++- .../requestcontrol/request_control_config.go | 44 +++++- pkg/epp/server/server_test.go | 10 +- 7 files changed, 277 insertions(+), 44 deletions(-) diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index 7dfaf3b2e..fa01b3bc4 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -61,16 +61,26 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques reqCtx.ResponseComplete = true reqCtx.respBodyResp = generateResponseBodyResponses(responseBytes, true) + if s.director != nil { + s.director.HandleResponseBodyComplete(ctx, reqCtx) + } return reqCtx, nil } // The function is to handle streaming response if the modelServer is streaming. func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx *RequestContext, responseText string) { + if s.director != nil { + s.director.HandleResponseBodyStreaming(ctx, reqCtx) + } if strings.Contains(responseText, streamingEndMsg) { + reqCtx.ResponseComplete = true resp := parseRespForUsage(ctx, responseText) reqCtx.Usage = resp.Usage metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.CompletionTokens) + if s.director != nil { + s.director.HandleResponseBodyComplete(ctx, reqCtx) + } } } @@ -83,7 +93,7 @@ func (s *StreamingServer) HandleResponseHeaders(ctx context.Context, reqCtx *Req } } - reqCtx, err := s.director.HandleResponse(ctx, reqCtx) + reqCtx, err := s.director.HandleResponseRecieved(ctx, reqCtx) return reqCtx, err } diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index ddfb3316c..213ff78e8 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -54,7 +54,9 @@ func NewStreamingServer(datastore Datastore, director Director) *StreamingServer type Director interface { HandleRequest(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) - HandleResponse(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) + HandleResponseRecieved(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) + HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) + HandleResponseBodyComplete(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) GetRandomPod() *backend.Pod } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index a3e2d6d13..53a1d064b 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -62,22 +62,26 @@ type SaturationDetector interface { // NewDirectorWithConfig creates a new Director instance with all dependencies. func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { return &Director{ - datastore: datastore, - scheduler: scheduler, - saturationDetector: saturationDetector, - preRequestPlugins: config.preRequestPlugins, - postResponsePlugins: config.postResponsePlugins, - defaultPriority: 0, // define default priority explicitly + datastore: datastore, + scheduler: scheduler, + saturationDetector: saturationDetector, + preRequestPlugins: config.preRequestPlugins, + postResponseRecievedPlugins: config.postResponseRecievedPlugins, + postResponseStreamingPlugins: config.postResponseStreamingPlugins, + postResponseCompletePlugins: config.postResponseCompletePlugins, + defaultPriority: 0, // define default priority explicitly } } // Director orchestrates the request handling flow, including scheduling. type Director struct { - datastore Datastore - scheduler Scheduler - saturationDetector SaturationDetector - preRequestPlugins []PreRequest - postResponsePlugins []PostResponse + datastore Datastore + scheduler Scheduler + saturationDetector SaturationDetector + preRequestPlugins []PreRequest + postResponseRecievedPlugins []PostResponseRecieved + postResponseStreamingPlugins []PostResponseStreaming + postResponseCompletePlugins []PostResponseComplete // we just need a pointer to an int variable since priority is a pointer in InferenceObjective // no need to set this in the constructor, since the value we want is the default int val // and value types cannot be nil @@ -278,7 +282,8 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch return pm } -func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { +// HandleResponseRecieved is called when the first chunk of the response arrives. +func (d *Director) HandleResponseRecieved(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { response := &Response{ RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], Headers: reqCtx.Response.Headers, @@ -286,11 +291,40 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC // TODO: to extend fallback functionality, handle cases where target pod is unavailable // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224 - d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + d.runPostResponseRecievedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) return reqCtx, nil } +// HandleResponseBodyStreaming is called every time a chunk of the response body is received. +func (d *Director) HandleResponseBodyStreaming(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { + logger := log.FromContext(ctx).WithValues("stage", "bodyChunk") + logger.V(logutil.TRACE).Info("Entering HandleResponseBodyChunk") + response := &Response{ + RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], + Headers: reqCtx.Response.Headers, + } + + d.runPostResponseStreamingPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + logger.V(logutil.TRACE).Info("Exiting HandleResponseBodyChunk") + return reqCtx, nil +} + +// HandleResponseBodyComplete is called when the response body is fully received. +func (d *Director) HandleResponseBodyComplete(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { + logger := log.FromContext(ctx).WithValues("stage", "bodyChunk") + logger.V(logutil.DEBUG).Info("Entering HandleResponseBodyComplete") + response := &Response{ + RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], + Headers: reqCtx.Response.Headers, + } + + d.runPostResponseCompletePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + + logger.V(logutil.DEBUG).Info("Exiting HandleResponseBodyComplete") + return reqCtx, nil +} + func (d *Director) GetRandomPod() *backend.Pod { pods := d.datastore.PodList(backendmetrics.AllPodsPredicate) if len(pods) == 0 { @@ -313,13 +347,34 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } -func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (d *Director) runPostResponseRecievedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - for _, plugin := range d.postResponsePlugins { + for _, plugin := range d.postResponseRecievedPlugins { loggerDebug.Info("Running post-response plugin", "plugin", plugin.TypedName()) before := time.Now() - plugin.PostResponse(ctx, request, response, targetPod) - metrics.RecordPluginProcessingLatency(PostResponseExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + plugin.PostResponseRecieved(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(PostResponseRecievedExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName()) } } + +func (d *Director) runPostResponseStreamingPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { + loggerTrace := log.FromContext(ctx).V(logutil.TRACE) + for _, plugin := range d.postResponseStreamingPlugins { + loggerTrace.Info("Running post-response chunk plugin", "plugin", plugin.TypedName().Type) + before := time.Now() + plugin.PostResponseStreaming(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(PostResponseStreamingExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + } +} + +func (d *Director) runPostResponseCompletePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { + loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) + for _, plugin := range d.postResponseCompletePlugins { + loggerDebug.Info("Running post-response complete plugin", "plugin", plugin.TypedName().Type) + before := time.Now() + plugin.PostResponseComplete(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(PostResponseCompleteExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + loggerDebug.Info("Completed running post-response complete plugin successfully", "plugin", plugin.TypedName()) + } +} diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index a0cb7c325..61e3578cb 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -592,13 +592,13 @@ func TestGetRandomPod(t *testing.T) { } } -func TestDirector_HandleResponse(t *testing.T) { - pr1 := newTestPostResponse("pr1") +func TestDirector_HandleResponseRecieved(t *testing.T) { + pr1 := newTestPostResponseRecieved("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponsePlugins(pr1)) + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseRecievedPlugins(pr1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -613,7 +613,7 @@ func TestDirector_HandleResponse(t *testing.T) { TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}}, } - _, err := director.HandleResponse(ctx, reqCtx) + _, err := director.HandleResponseRecieved(ctx, reqCtx) if err != nil { t.Fatalf("HandleResponse() returned unexpected error: %v", err) } @@ -629,27 +629,143 @@ func TestDirector_HandleResponse(t *testing.T) { } } +func TestDirector_HandleResponseStreaming(t *testing.T) { + ps1 := newTestPostResponseStreaming("ps1") + + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + ds := datastore.NewDatastore(t.Context(), nil) + mockSched := &mockScheduler{} + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseStreamingPlugins(ps1)) + + reqCtx := &handlers.RequestContext{ + Request: &handlers.Request{ + Headers: map[string]string{ + requtil.RequestIdHeaderKey: "test-req-id-for-streaming", + }, + }, + Response: &handlers.Response{ + Headers: map[string]string{"X-Test-Streaming-Header": "StreamValue"}, + }, + TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}}, + } + + _, err := director.HandleResponseBodyStreaming(ctx, reqCtx) + if err != nil { + t.Fatalf("HandleResponseBodyStreaming() returned unexpected error: %v", err) + } + + if diff := cmp.Diff("test-req-id-for-streaming", ps1.lastRespOnStreaming.RequestId); diff != "" { + t.Errorf("Scheduler.OnStreaming RequestId mismatch (-want +got):\n%s", diff) + } + if diff := cmp.Diff(reqCtx.Response.Headers, ps1.lastRespOnStreaming.Headers); diff != "" { + t.Errorf("Scheduler.OnStreaming Headers mismatch (-want +got):\n%s", diff) + } + if diff := cmp.Diff("namespace1/test-pod-name", ps1.lastTargetPodOnStreaming); diff != "" { + t.Errorf("Scheduler.OnStreaming TargetPodName mismatch (-want +got):\n%s", diff) + } +} + +func TestDirector_HandleResponseComplete(t *testing.T) { + pc1 := newTestPostResponseComplete("pc1") + + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + ds := datastore.NewDatastore(t.Context(), nil) + mockSched := &mockScheduler{} + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseCompletePlugins(pc1)) + + reqCtx := &handlers.RequestContext{ + Request: &handlers.Request{ + Headers: map[string]string{ + requtil.RequestIdHeaderKey: "test-req-id-for-complete", + }, + }, + Response: &handlers.Response{ + Headers: map[string]string{"X-Test-Complete-Header": "CompleteValue"}, + }, + TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}}, + } + + _, err := director.HandleResponseBodyComplete(ctx, reqCtx) + if err != nil { + t.Fatalf("HandleResponseBodyComplete() returned unexpected error: %v", err) + } + + if diff := cmp.Diff("test-req-id-for-complete", pc1.lastRespOnComplete.RequestId); diff != "" { + t.Errorf("Scheduler.OnComplete RequestId mismatch (-want +got):\n%s", diff) + } + if diff := cmp.Diff(reqCtx.Response.Headers, pc1.lastRespOnComplete.Headers); diff != "" { + t.Errorf("Scheduler.OnComplete Headers mismatch (-want +got):\n%s", diff) + } + if diff := cmp.Diff("namespace1/test-pod-name", pc1.lastTargetPodOnComplete); diff != "" { + t.Errorf("Scheduler.OnComplete TargetPodName mismatch (-want +got):\n%s", diff) + } +} + const ( - testPostResponseType = "test-post-response" + testPostResponseRecievedType = "test-post-response" + testPostStreamingType = "test-post-streaming" + testPostCompleteType = "test-post-complete" ) -type testPostResponse struct { +type testPostResponseRecieved struct { tn plugins.TypedName lastRespOnResponse *Response lastTargetPodOnResponse string } -func newTestPostResponse(name string) *testPostResponse { - return &testPostResponse{ - tn: plugins.TypedName{Type: testPostResponseType, Name: name}, +type testPostResponseStreaming struct { + tn plugins.TypedName + lastRespOnStreaming *Response + lastTargetPodOnStreaming string +} + +type testPostResponseComplete struct { + tn plugins.TypedName + lastRespOnComplete *Response + lastTargetPodOnComplete string +} + +func newTestPostResponseRecieved(name string) *testPostResponseRecieved { + return &testPostResponseRecieved{ + tn: plugins.TypedName{Type: testPostResponseRecievedType, Name: name}, } } -func (p *testPostResponse) TypedName() plugins.TypedName { +func newTestPostResponseStreaming(name string) *testPostResponseStreaming { + return &testPostResponseStreaming{ + tn: plugins.TypedName{Type: testPostStreamingType, Name: name}, + } +} + +func newTestPostResponseComplete(name string) *testPostResponseComplete { + return &testPostResponseComplete{ + tn: plugins.TypedName{Type: testPostCompleteType, Name: name}, + } +} + +func (p *testPostResponseRecieved) TypedName() plugins.TypedName { + return p.tn +} + +func (p *testPostResponseStreaming) TypedName() plugins.TypedName { + return p.tn +} + +func (p *testPostResponseComplete) TypedName() plugins.TypedName { return p.tn } -func (p *testPostResponse) PostResponse(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (p *testPostResponseRecieved) PostResponseRecieved(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { p.lastRespOnResponse = response p.lastTargetPodOnResponse = targetPod.NamespacedName.String() } + +func (p *testPostResponseStreaming) PostResponseStreaming(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { + p.lastRespOnStreaming = response + p.lastTargetPodOnStreaming = targetPod.NamespacedName.String() +} + +func (p *testPostResponseComplete) PostResponseComplete(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { + p.lastRespOnComplete = response + p.lastTargetPodOnComplete = targetPod.NamespacedName.String() +} diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index ca823a670..71e21522c 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -25,8 +25,10 @@ import ( ) const ( - PreRequestExtensionPoint = "PreRequest" - PostResponseExtensionPoint = "PostResponse" + PreRequestExtensionPoint = "PreRequest" + PostResponseRecievedExtensionPoint = "PostResponseRecieved" + PostResponseStreamingExtensionPoint = "PostResponseStreaming" + PostResponseCompleteExtensionPoint = "PostResponseComplete" ) // PreRequest is called by the director after a getting result from scheduling layer and @@ -36,9 +38,21 @@ type PreRequest interface { PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int) } -// PostResponse is called by the director after a successful response was sent. +// PostResponseRecieved is called by the director after a successful response is sent. // The given pod argument is the pod that served the request. -type PostResponse interface { +type PostResponseRecieved interface { plugins.Plugin - PostResponse(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) + PostResponseRecieved(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) +} + +// PostResponseStreaming is called by the director after each chunk of streaming response is sent. +type PostResponseStreaming interface { + plugins.Plugin + PostResponseStreaming(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) +} + +// PostResponseComplete is called by the director after the complete response is sent. +type PostResponseComplete interface { + plugins.Plugin + PostResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index 2d6dc95e7..27108ca47 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -23,15 +23,19 @@ import ( // NewConfig creates a new Config object and returns its pointer. func NewConfig() *Config { return &Config{ - preRequestPlugins: []PreRequest{}, - postResponsePlugins: []PostResponse{}, + preRequestPlugins: []PreRequest{}, + postResponseRecievedPlugins: []PostResponseRecieved{}, + postResponseStreamingPlugins: []PostResponseStreaming{}, + postResponseCompletePlugins: []PostResponseComplete{}, } } // Config provides a configuration for the requestcontrol plugins. type Config struct { - preRequestPlugins []PreRequest - postResponsePlugins []PostResponse + preRequestPlugins []PreRequest + postResponseRecievedPlugins []PostResponseRecieved + postResponseStreamingPlugins []PostResponseStreaming + postResponseCompletePlugins []PostResponseComplete } // WithPreRequestPlugins sets the given plugins as the PreRequest plugins. @@ -43,18 +47,42 @@ func (c *Config) WithPreRequestPlugins(plugins ...PreRequest) *Config { // WithPostResponsePlugins sets the given plugins as the PostResponse plugins. // If the Config has PostResponse plugins already, this call replaces the existing plugins with the given ones. -func (c *Config) WithPostResponsePlugins(plugins ...PostResponse) *Config { - c.postResponsePlugins = plugins +func (c *Config) WithPostResponseRecievedPlugins(plugins ...PostResponseRecieved) *Config { + c.postResponseRecievedPlugins = plugins return c } +// WithPostResponseStreamingPlugins sets the given plugins as the PostResponseStreaming plugins. +// If the Config has PostResponseStreaming plugins already, this call replaces the existing plugins with the given ones. +func (c *Config) WithPostResponseStreamingPlugins(plugins ...PostResponseStreaming) *Config { + c.postResponseStreamingPlugins = plugins + return c +} + +// WithPostResponseCompletePlugins sets the given plugins as the PostResponseComplete plugins. +// If the Config has PostResponseComplete plugins already, this call replaces the existing plugins with the given ones. +func (c *Config) WithPostResponseCompletePlugins(plugins ...PostResponseComplete) *Config { + c.postResponseCompletePlugins = plugins + return c +} + +// AddPlugins adds the given plugins to the Config. +// The type of each plugin is checked and added to the corresponding list of plugins in the Config. +// If a plugin implements multiple plugin interfaces, it will be added to each corresponding list. + func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) { for _, plugin := range pluginObjects { if preRequestPlugin, ok := plugin.(PreRequest); ok { c.preRequestPlugins = append(c.preRequestPlugins, preRequestPlugin) } - if postResponsePlugin, ok := plugin.(PostResponse); ok { - c.postResponsePlugins = append(c.postResponsePlugins, postResponsePlugin) + if postResponseRecievedPlugin, ok := plugin.(PostResponseRecieved); ok { + c.postResponseRecievedPlugins = append(c.postResponseRecievedPlugins, postResponseRecievedPlugin) + } + if postResponseStreamingPlugin, ok := plugin.(PostResponseStreaming); ok { + c.postResponseStreamingPlugins = append(c.postResponseStreamingPlugins, postResponseStreamingPlugin) + } + if postResponseCompletePlugin, ok := plugin.(PostResponseComplete); ok { + c.postResponseCompletePlugins = append(c.postResponseCompletePlugins, postResponseCompletePlugin) } } } diff --git a/pkg/epp/server/server_test.go b/pkg/epp/server/server_test.go index aff6d4644..2c2e7b051 100644 --- a/pkg/epp/server/server_test.go +++ b/pkg/epp/server/server_test.go @@ -181,7 +181,15 @@ func (ts *testDirector) HandleRequest(ctx context.Context, reqCtx *handlers.Requ return reqCtx, nil } -func (ts *testDirector) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { +func (ts *testDirector) HandleResponseRecieved(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { + return reqCtx, nil +} + +func (ts *testDirector) HandleResponseBodyStreaming(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { + return reqCtx, nil +} + +func (ts *testDirector) HandleResponseBodyComplete(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { return reqCtx, nil } From 0174f0b8793715d9a4187513a765dc72c5109095 Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Sat, 27 Sep 2025 00:47:55 +0000 Subject: [PATCH 2/9] Fix typo in variable names --- pkg/epp/handlers/response.go | 2 +- pkg/epp/handlers/server.go | 8 +++---- pkg/epp/requestcontrol/director.go | 18 +++++++-------- pkg/epp/requestcontrol/director_test.go | 22 +++++++++---------- pkg/epp/requestcontrol/plugins.go | 8 +++---- .../requestcontrol/request_control_config.go | 12 +++++----- pkg/epp/server/server_test.go | 2 +- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index fa01b3bc4..d33a984f0 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -93,7 +93,7 @@ func (s *StreamingServer) HandleResponseHeaders(ctx context.Context, reqCtx *Req } } - reqCtx, err := s.director.HandleResponseRecieved(ctx, reqCtx) + reqCtx, err := s.director.HandleResponseReceived(ctx, reqCtx) return reqCtx, err } diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 213ff78e8..001fdc344 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -54,7 +54,7 @@ func NewStreamingServer(datastore Datastore, director Director) *StreamingServer type Director interface { HandleRequest(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) - HandleResponseRecieved(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) + HandleResponseReceived(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) HandleResponseBodyComplete(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) GetRandomPod() *backend.Pod @@ -123,7 +123,7 @@ const ( HeaderRequestResponseComplete StreamRequestState = 1 BodyRequestResponsesComplete StreamRequestState = 2 TrailerRequestResponsesComplete StreamRequestState = 3 - ResponseRecieved StreamRequestState = 4 + ResponseReceived StreamRequestState = 4 HeaderResponseResponseComplete StreamRequestState = 5 BodyResponseResponsesComplete StreamRequestState = 6 TrailerResponseResponsesComplete StreamRequestState = 7 @@ -253,7 +253,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) loggerTrace.Info("model server is streaming response") } } - reqCtx.RequestState = ResponseRecieved + reqCtx.RequestState = ResponseReceived var responseErr error reqCtx, responseErr = s.HandleResponseHeaders(ctx, reqCtx, v) @@ -379,7 +379,7 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } } - if r.RequestState == ResponseRecieved && r.respHeaderResp != nil { + if r.RequestState == ResponseReceived && r.respHeaderResp != nil { loggerTrace.Info("Sending response header response", "obj", r.respHeaderResp) if err := srv.Send(r.respHeaderResp); err != nil { return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 53a1d064b..0bb6f001c 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -66,7 +66,7 @@ func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationD scheduler: scheduler, saturationDetector: saturationDetector, preRequestPlugins: config.preRequestPlugins, - postResponseRecievedPlugins: config.postResponseRecievedPlugins, + postResponseReceivedPlugins: config.postResponseReceivedPlugins, postResponseStreamingPlugins: config.postResponseStreamingPlugins, postResponseCompletePlugins: config.postResponseCompletePlugins, defaultPriority: 0, // define default priority explicitly @@ -79,7 +79,7 @@ type Director struct { scheduler Scheduler saturationDetector SaturationDetector preRequestPlugins []PreRequest - postResponseRecievedPlugins []PostResponseRecieved + postResponseReceivedPlugins []PostResponseReceived postResponseStreamingPlugins []PostResponseStreaming postResponseCompletePlugins []PostResponseComplete // we just need a pointer to an int variable since priority is a pointer in InferenceObjective @@ -282,8 +282,8 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch return pm } -// HandleResponseRecieved is called when the first chunk of the response arrives. -func (d *Director) HandleResponseRecieved(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { +// HandleResponseReceived is called when the first chunk of the response arrives. +func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { response := &Response{ RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], Headers: reqCtx.Response.Headers, @@ -291,7 +291,7 @@ func (d *Director) HandleResponseRecieved(ctx context.Context, reqCtx *handlers. // TODO: to extend fallback functionality, handle cases where target pod is unavailable // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224 - d.runPostResponseRecievedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + d.runPostResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) return reqCtx, nil } @@ -347,13 +347,13 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } -func (d *Director) runPostResponseRecievedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (d *Director) runPostResponseReceivedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - for _, plugin := range d.postResponseRecievedPlugins { + for _, plugin := range d.postResponseReceivedPlugins { loggerDebug.Info("Running post-response plugin", "plugin", plugin.TypedName()) before := time.Now() - plugin.PostResponseRecieved(ctx, request, response, targetPod) - metrics.RecordPluginProcessingLatency(PostResponseRecievedExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + plugin.PostResponseReceived(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(PostResponseReceivedExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName()) } } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 61e3578cb..4b2c11eb1 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -592,13 +592,13 @@ func TestGetRandomPod(t *testing.T) { } } -func TestDirector_HandleResponseRecieved(t *testing.T) { - pr1 := newTestPostResponseRecieved("pr1") +func TestDirector_HandleResponseReceived(t *testing.T) { + pr1 := newTestPostResponseReceived("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseRecievedPlugins(pr1)) + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseReceivedPlugins(pr1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -613,7 +613,7 @@ func TestDirector_HandleResponseRecieved(t *testing.T) { TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}}, } - _, err := director.HandleResponseRecieved(ctx, reqCtx) + _, err := director.HandleResponseReceived(ctx, reqCtx) if err != nil { t.Fatalf("HandleResponse() returned unexpected error: %v", err) } @@ -702,12 +702,12 @@ func TestDirector_HandleResponseComplete(t *testing.T) { } const ( - testPostResponseRecievedType = "test-post-response" + testPostResponseReceivedType = "test-post-response" testPostStreamingType = "test-post-streaming" testPostCompleteType = "test-post-complete" ) -type testPostResponseRecieved struct { +type testPostResponseReceived struct { tn plugins.TypedName lastRespOnResponse *Response lastTargetPodOnResponse string @@ -725,9 +725,9 @@ type testPostResponseComplete struct { lastTargetPodOnComplete string } -func newTestPostResponseRecieved(name string) *testPostResponseRecieved { - return &testPostResponseRecieved{ - tn: plugins.TypedName{Type: testPostResponseRecievedType, Name: name}, +func newTestPostResponseReceived(name string) *testPostResponseReceived { + return &testPostResponseReceived{ + tn: plugins.TypedName{Type: testPostResponseReceivedType, Name: name}, } } @@ -743,7 +743,7 @@ func newTestPostResponseComplete(name string) *testPostResponseComplete { } } -func (p *testPostResponseRecieved) TypedName() plugins.TypedName { +func (p *testPostResponseReceived) TypedName() plugins.TypedName { return p.tn } @@ -755,7 +755,7 @@ func (p *testPostResponseComplete) TypedName() plugins.TypedName { return p.tn } -func (p *testPostResponseRecieved) PostResponseRecieved(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (p *testPostResponseReceived) PostResponseReceived(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { p.lastRespOnResponse = response p.lastTargetPodOnResponse = targetPod.NamespacedName.String() } diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 71e21522c..9f423a847 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -26,7 +26,7 @@ import ( const ( PreRequestExtensionPoint = "PreRequest" - PostResponseRecievedExtensionPoint = "PostResponseRecieved" + PostResponseReceivedExtensionPoint = "PostResponseReceived" PostResponseStreamingExtensionPoint = "PostResponseStreaming" PostResponseCompleteExtensionPoint = "PostResponseComplete" ) @@ -38,11 +38,11 @@ type PreRequest interface { PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int) } -// PostResponseRecieved is called by the director after a successful response is sent. +// PostResponseReceived is called by the director after a successful response is sent. // The given pod argument is the pod that served the request. -type PostResponseRecieved interface { +type PostResponseReceived interface { plugins.Plugin - PostResponseRecieved(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) + PostResponseReceived(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } // PostResponseStreaming is called by the director after each chunk of streaming response is sent. diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index 27108ca47..80284822b 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -24,7 +24,7 @@ import ( func NewConfig() *Config { return &Config{ preRequestPlugins: []PreRequest{}, - postResponseRecievedPlugins: []PostResponseRecieved{}, + postResponseReceivedPlugins: []PostResponseReceived{}, postResponseStreamingPlugins: []PostResponseStreaming{}, postResponseCompletePlugins: []PostResponseComplete{}, } @@ -33,7 +33,7 @@ func NewConfig() *Config { // Config provides a configuration for the requestcontrol plugins. type Config struct { preRequestPlugins []PreRequest - postResponseRecievedPlugins []PostResponseRecieved + postResponseReceivedPlugins []PostResponseReceived postResponseStreamingPlugins []PostResponseStreaming postResponseCompletePlugins []PostResponseComplete } @@ -47,8 +47,8 @@ func (c *Config) WithPreRequestPlugins(plugins ...PreRequest) *Config { // WithPostResponsePlugins sets the given plugins as the PostResponse plugins. // If the Config has PostResponse plugins already, this call replaces the existing plugins with the given ones. -func (c *Config) WithPostResponseRecievedPlugins(plugins ...PostResponseRecieved) *Config { - c.postResponseRecievedPlugins = plugins +func (c *Config) WithPostResponseReceivedPlugins(plugins ...PostResponseReceived) *Config { + c.postResponseReceivedPlugins = plugins return c } @@ -75,8 +75,8 @@ func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) { if preRequestPlugin, ok := plugin.(PreRequest); ok { c.preRequestPlugins = append(c.preRequestPlugins, preRequestPlugin) } - if postResponseRecievedPlugin, ok := plugin.(PostResponseRecieved); ok { - c.postResponseRecievedPlugins = append(c.postResponseRecievedPlugins, postResponseRecievedPlugin) + if postResponseReceivedPlugin, ok := plugin.(PostResponseReceived); ok { + c.postResponseReceivedPlugins = append(c.postResponseReceivedPlugins, postResponseReceivedPlugin) } if postResponseStreamingPlugin, ok := plugin.(PostResponseStreaming); ok { c.postResponseStreamingPlugins = append(c.postResponseStreamingPlugins, postResponseStreamingPlugin) diff --git a/pkg/epp/server/server_test.go b/pkg/epp/server/server_test.go index 2c2e7b051..7220c04b4 100644 --- a/pkg/epp/server/server_test.go +++ b/pkg/epp/server/server_test.go @@ -181,7 +181,7 @@ func (ts *testDirector) HandleRequest(ctx context.Context, reqCtx *handlers.Requ return reqCtx, nil } -func (ts *testDirector) HandleResponseRecieved(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { +func (ts *testDirector) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { return reqCtx, nil } From bbeb5b6047a5aa8878fb2c2815496addbc4baee5 Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Tue, 30 Sep 2025 20:01:29 +0000 Subject: [PATCH 3/9] Log typed name in director.go and remove redundant director nil check in response.go --- pkg/epp/handlers/response.go | 17 +++++++++-------- pkg/epp/handlers/response_test.go | 24 ++++++++++++++++++++++++ pkg/epp/requestcontrol/director.go | 4 ++-- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index d33a984f0..1cbacbae3 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -61,16 +61,16 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques reqCtx.ResponseComplete = true reqCtx.respBodyResp = generateResponseBodyResponses(responseBytes, true) - if s.director != nil { - s.director.HandleResponseBodyComplete(ctx, reqCtx) - } - return reqCtx, nil + + return s.director.HandleResponseBodyComplete(ctx, reqCtx) } // The function is to handle streaming response if the modelServer is streaming. func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx *RequestContext, responseText string) { - if s.director != nil { - s.director.HandleResponseBodyStreaming(ctx, reqCtx) + logger := log.FromContext(ctx) + _, err := s.director.HandleResponseBodyStreaming(ctx, reqCtx) + if err != nil { + logger.Error(err, "error in HandleResponseBodyStreaming") } if strings.Contains(responseText, streamingEndMsg) { reqCtx.ResponseComplete = true @@ -78,8 +78,9 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx.Usage = resp.Usage metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.CompletionTokens) - if s.director != nil { - s.director.HandleResponseBodyComplete(ctx, reqCtx) + _, err := s.director.HandleResponseBodyComplete(ctx, reqCtx) + if err != nil { + logger.Error(err, "error in HandleResponseBodyComplete") } } } diff --git a/pkg/epp/handlers/response_test.go b/pkg/epp/handlers/response_test.go index 6eb7734e4..63b2de0da 100644 --- a/pkg/epp/handlers/response_test.go +++ b/pkg/epp/handlers/response_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -59,6 +60,27 @@ data: [DONE] ` ) +type mockDirector struct{} + +func (m *mockDirector) HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { + return reqCtx, nil +} +func (m *mockDirector) HandleResponseBodyComplete(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { + return reqCtx, nil +} +func (m *mockDirector) HandleResponseReceived(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { + return reqCtx, nil +} +func (m *mockDirector) HandlePreRequest(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { + return reqCtx, nil +} +func (m *mockDirector) GetRandomPod() *backend.Pod { + return &backend.Pod{} +} +func (m *mockDirector) HandleRequest(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { + return reqCtx, nil +} + func TestHandleResponseBody(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) @@ -83,6 +105,7 @@ func TestHandleResponseBody(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { server := &StreamingServer{} + server.director = &mockDirector{} reqCtx := test.reqCtx if reqCtx == nil { reqCtx = &RequestContext{} @@ -143,6 +166,7 @@ func TestHandleStreamedResponseBody(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { server := &StreamingServer{} + server.director = &mockDirector{} reqCtx := test.reqCtx if reqCtx == nil { reqCtx = &RequestContext{} diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 0bb6f001c..aa1c39424 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -361,7 +361,7 @@ func (d *Director) runPostResponseReceivedPlugins(ctx context.Context, request * func (d *Director) runPostResponseStreamingPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) for _, plugin := range d.postResponseStreamingPlugins { - loggerTrace.Info("Running post-response chunk plugin", "plugin", plugin.TypedName().Type) + loggerTrace.Info("Running post-response chunk plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.PostResponseStreaming(ctx, request, response, targetPod) metrics.RecordPluginProcessingLatency(PostResponseStreamingExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) @@ -371,7 +371,7 @@ func (d *Director) runPostResponseStreamingPlugins(ctx context.Context, request func (d *Director) runPostResponseCompletePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.postResponseCompletePlugins { - loggerDebug.Info("Running post-response complete plugin", "plugin", plugin.TypedName().Type) + loggerDebug.Info("Running post-response complete plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.PostResponseComplete(ctx, request, response, targetPod) metrics.RecordPluginProcessingLatency(PostResponseCompleteExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) From 35658022bdfccd932f27d320cf6689303418b53c Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Wed, 1 Oct 2025 20:13:11 +0000 Subject: [PATCH 4/9] Renamed the post response plugins to not include the word post. --- pkg/epp/requestcontrol/director.go | 70 +++++++++++-------- pkg/epp/requestcontrol/director_test.go | 50 ++++++------- pkg/epp/requestcontrol/plugins.go | 26 +++---- .../requestcontrol/request_control_config.go | 52 +++++++------- pkg/epp/requestcontrol/types.go | 2 +- 5 files changed, 104 insertions(+), 96 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index aa1c39424..02b3af4ec 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -59,29 +59,37 @@ type SaturationDetector interface { IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool } +type RequestControlPlugins struct { + preRequestPlugins []PreRequest + responseReceivedPlugins []ResponseReceived + responseStreamingPlugins []ResponseStreaming + responseCompletePlugins []ResponseComplete +} + // NewDirectorWithConfig creates a new Director instance with all dependencies. func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { + RCPlugins := RequestControlPlugins{ + preRequestPlugins: config.preRequestPlugins, + responseReceivedPlugins: config.responseReceivedPlugins, + responseStreamingPlugins: config.responseStreamingPlugins, + responseCompletePlugins: config.responseCompletePlugins, + } + return &Director{ - datastore: datastore, - scheduler: scheduler, - saturationDetector: saturationDetector, - preRequestPlugins: config.preRequestPlugins, - postResponseReceivedPlugins: config.postResponseReceivedPlugins, - postResponseStreamingPlugins: config.postResponseStreamingPlugins, - postResponseCompletePlugins: config.postResponseCompletePlugins, - defaultPriority: 0, // define default priority explicitly + datastore: datastore, + scheduler: scheduler, + saturationDetector: saturationDetector, + requestControlPlugins: RCPlugins, + defaultPriority: 0, // define default priority explicitly } } // Director orchestrates the request handling flow, including scheduling. type Director struct { - datastore Datastore - scheduler Scheduler - saturationDetector SaturationDetector - preRequestPlugins []PreRequest - postResponseReceivedPlugins []PostResponseReceived - postResponseStreamingPlugins []PostResponseStreaming - postResponseCompletePlugins []PostResponseComplete + datastore Datastore + scheduler Scheduler + saturationDetector SaturationDetector + requestControlPlugins RequestControlPlugins // we just need a pointer to an int variable since priority is a pointer in InferenceObjective // no need to set this in the constructor, since the value we want is the default int val // and value types cannot be nil @@ -291,7 +299,7 @@ func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers. // TODO: to extend fallback functionality, handle cases where target pod is unavailable // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224 - d.runPostResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + d.runResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) return reqCtx, nil } @@ -305,7 +313,7 @@ func (d *Director) HandleResponseBodyStreaming(ctx context.Context, reqCtx *hand Headers: reqCtx.Response.Headers, } - d.runPostResponseStreamingPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + d.runResponseStreamingPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) logger.V(logutil.TRACE).Info("Exiting HandleResponseBodyChunk") return reqCtx, nil } @@ -319,7 +327,7 @@ func (d *Director) HandleResponseBodyComplete(ctx context.Context, reqCtx *handl Headers: reqCtx.Response.Headers, } - d.runPostResponseCompletePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) + d.runResponseCompletePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) logger.V(logutil.DEBUG).Info("Exiting HandleResponseBodyComplete") return reqCtx, nil @@ -338,7 +346,7 @@ func (d *Director) GetRandomPod() *backend.Pod { func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult, targetPort int) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - for _, plugin := range d.preRequestPlugins { + for _, plugin := range d.requestControlPlugins.preRequestPlugins { loggerDebug.Info("Running pre-request plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.PreRequest(ctx, request, schedulingResult, targetPort) @@ -347,34 +355,34 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } -func (d *Director) runPostResponseReceivedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (d *Director) runResponseReceivedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - for _, plugin := range d.postResponseReceivedPlugins { + for _, plugin := range d.requestControlPlugins.responseReceivedPlugins { loggerDebug.Info("Running post-response plugin", "plugin", plugin.TypedName()) before := time.Now() - plugin.PostResponseReceived(ctx, request, response, targetPod) - metrics.RecordPluginProcessingLatency(PostResponseReceivedExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + plugin.ResponseReceived(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(ResponseReceivedExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName()) } } -func (d *Director) runPostResponseStreamingPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (d *Director) runResponseStreamingPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - for _, plugin := range d.postResponseStreamingPlugins { + for _, plugin := range d.requestControlPlugins.responseStreamingPlugins { loggerTrace.Info("Running post-response chunk plugin", "plugin", plugin.TypedName()) before := time.Now() - plugin.PostResponseStreaming(ctx, request, response, targetPod) - metrics.RecordPluginProcessingLatency(PostResponseStreamingExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + plugin.ResponseStreaming(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(ResponseStreamingExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) } } -func (d *Director) runPostResponseCompletePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (d *Director) runResponseCompletePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - for _, plugin := range d.postResponseCompletePlugins { + for _, plugin := range d.requestControlPlugins.responseCompletePlugins { loggerDebug.Info("Running post-response complete plugin", "plugin", plugin.TypedName()) before := time.Now() - plugin.PostResponseComplete(ctx, request, response, targetPod) - metrics.RecordPluginProcessingLatency(PostResponseCompleteExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + plugin.ResponseComplete(ctx, request, response, targetPod) + metrics.RecordPluginProcessingLatency(ResponseCompleteExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) loggerDebug.Info("Completed running post-response complete plugin successfully", "plugin", plugin.TypedName()) } } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 4b2c11eb1..947d168cf 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -593,12 +593,12 @@ func TestGetRandomPod(t *testing.T) { } func TestDirector_HandleResponseReceived(t *testing.T) { - pr1 := newTestPostResponseReceived("pr1") + pr1 := newTestResponseReceived("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseReceivedPlugins(pr1)) + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseReceivedPlugins(pr1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -630,12 +630,12 @@ func TestDirector_HandleResponseReceived(t *testing.T) { } func TestDirector_HandleResponseStreaming(t *testing.T) { - ps1 := newTestPostResponseStreaming("ps1") + ps1 := newTestResponseStreaming("ps1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseStreamingPlugins(ps1)) + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -666,12 +666,12 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { } func TestDirector_HandleResponseComplete(t *testing.T) { - pc1 := newTestPostResponseComplete("pc1") + pc1 := newTestResponseComplete("pc1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponseCompletePlugins(pc1)) + director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -702,70 +702,70 @@ func TestDirector_HandleResponseComplete(t *testing.T) { } const ( - testPostResponseReceivedType = "test-post-response" - testPostStreamingType = "test-post-streaming" - testPostCompleteType = "test-post-complete" + testResponseReceivedType = "test-post-response" + testPostStreamingType = "test-post-streaming" + testPostCompleteType = "test-post-complete" ) -type testPostResponseReceived struct { +type testResponseReceived struct { tn plugins.TypedName lastRespOnResponse *Response lastTargetPodOnResponse string } -type testPostResponseStreaming struct { +type testResponseStreaming struct { tn plugins.TypedName lastRespOnStreaming *Response lastTargetPodOnStreaming string } -type testPostResponseComplete struct { +type testResponseComplete struct { tn plugins.TypedName lastRespOnComplete *Response lastTargetPodOnComplete string } -func newTestPostResponseReceived(name string) *testPostResponseReceived { - return &testPostResponseReceived{ - tn: plugins.TypedName{Type: testPostResponseReceivedType, Name: name}, +func newTestResponseReceived(name string) *testResponseReceived { + return &testResponseReceived{ + tn: plugins.TypedName{Type: testResponseReceivedType, Name: name}, } } -func newTestPostResponseStreaming(name string) *testPostResponseStreaming { - return &testPostResponseStreaming{ +func newTestResponseStreaming(name string) *testResponseStreaming { + return &testResponseStreaming{ tn: plugins.TypedName{Type: testPostStreamingType, Name: name}, } } -func newTestPostResponseComplete(name string) *testPostResponseComplete { - return &testPostResponseComplete{ +func newTestResponseComplete(name string) *testResponseComplete { + return &testResponseComplete{ tn: plugins.TypedName{Type: testPostCompleteType, Name: name}, } } -func (p *testPostResponseReceived) TypedName() plugins.TypedName { +func (p *testResponseReceived) TypedName() plugins.TypedName { return p.tn } -func (p *testPostResponseStreaming) TypedName() plugins.TypedName { +func (p *testResponseStreaming) TypedName() plugins.TypedName { return p.tn } -func (p *testPostResponseComplete) TypedName() plugins.TypedName { +func (p *testResponseComplete) TypedName() plugins.TypedName { return p.tn } -func (p *testPostResponseReceived) PostResponseReceived(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (p *testResponseReceived) ResponseReceived(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { p.lastRespOnResponse = response p.lastTargetPodOnResponse = targetPod.NamespacedName.String() } -func (p *testPostResponseStreaming) PostResponseStreaming(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (p *testResponseStreaming) ResponseStreaming(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { p.lastRespOnStreaming = response p.lastTargetPodOnStreaming = targetPod.NamespacedName.String() } -func (p *testPostResponseComplete) PostResponseComplete(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { +func (p *testResponseComplete) ResponseComplete(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { p.lastRespOnComplete = response p.lastTargetPodOnComplete = targetPod.NamespacedName.String() } diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 9f423a847..cad6d0627 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -25,10 +25,10 @@ import ( ) const ( - PreRequestExtensionPoint = "PreRequest" - PostResponseReceivedExtensionPoint = "PostResponseReceived" - PostResponseStreamingExtensionPoint = "PostResponseStreaming" - PostResponseCompleteExtensionPoint = "PostResponseComplete" + PreRequestExtensionPoint = "PreRequest" + ResponseReceivedExtensionPoint = "ResponseReceived" + ResponseStreamingExtensionPoint = "ResponseStreaming" + ResponseCompleteExtensionPoint = "ResponseComplete" ) // PreRequest is called by the director after a getting result from scheduling layer and @@ -38,21 +38,21 @@ type PreRequest interface { PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int) } -// PostResponseReceived is called by the director after a successful response is sent. +// ResponseReceived is called by the director after a successful response is sent. // The given pod argument is the pod that served the request. -type PostResponseReceived interface { +type ResponseReceived interface { plugins.Plugin - PostResponseReceived(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) + ResponseReceived(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } -// PostResponseStreaming is called by the director after each chunk of streaming response is sent. -type PostResponseStreaming interface { +// ResponseStreaming is called by the director after each chunk of streaming response is sent. +type ResponseStreaming interface { plugins.Plugin - PostResponseStreaming(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) + ResponseStreaming(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } -// PostResponseComplete is called by the director after the complete response is sent. -type PostResponseComplete interface { +// ResponseComplete is called by the director after the complete response is sent. +type ResponseComplete interface { plugins.Plugin - PostResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) + ResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index 80284822b..cec197ef7 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -23,19 +23,19 @@ import ( // NewConfig creates a new Config object and returns its pointer. func NewConfig() *Config { return &Config{ - preRequestPlugins: []PreRequest{}, - postResponseReceivedPlugins: []PostResponseReceived{}, - postResponseStreamingPlugins: []PostResponseStreaming{}, - postResponseCompletePlugins: []PostResponseComplete{}, + preRequestPlugins: []PreRequest{}, + responseReceivedPlugins: []ResponseReceived{}, + responseStreamingPlugins: []ResponseStreaming{}, + responseCompletePlugins: []ResponseComplete{}, } } // Config provides a configuration for the requestcontrol plugins. type Config struct { - preRequestPlugins []PreRequest - postResponseReceivedPlugins []PostResponseReceived - postResponseStreamingPlugins []PostResponseStreaming - postResponseCompletePlugins []PostResponseComplete + preRequestPlugins []PreRequest + responseReceivedPlugins []ResponseReceived + responseStreamingPlugins []ResponseStreaming + responseCompletePlugins []ResponseComplete } // WithPreRequestPlugins sets the given plugins as the PreRequest plugins. @@ -45,24 +45,24 @@ func (c *Config) WithPreRequestPlugins(plugins ...PreRequest) *Config { return c } -// WithPostResponsePlugins sets the given plugins as the PostResponse plugins. -// If the Config has PostResponse plugins already, this call replaces the existing plugins with the given ones. -func (c *Config) WithPostResponseReceivedPlugins(plugins ...PostResponseReceived) *Config { - c.postResponseReceivedPlugins = plugins +// WithResponsePlugins sets the given plugins as the Response plugins. +// If the Config has Response plugins already, this call replaces the existing plugins with the given ones. +func (c *Config) WithResponseReceivedPlugins(plugins ...ResponseReceived) *Config { + c.responseReceivedPlugins = plugins return c } -// WithPostResponseStreamingPlugins sets the given plugins as the PostResponseStreaming plugins. -// If the Config has PostResponseStreaming plugins already, this call replaces the existing plugins with the given ones. -func (c *Config) WithPostResponseStreamingPlugins(plugins ...PostResponseStreaming) *Config { - c.postResponseStreamingPlugins = plugins +// WithResponseStreamingPlugins sets the given plugins as the ResponseStreaming plugins. +// If the Config has ResponseStreaming plugins already, this call replaces the existing plugins with the given ones. +func (c *Config) WithResponseStreamingPlugins(plugins ...ResponseStreaming) *Config { + c.responseStreamingPlugins = plugins return c } -// WithPostResponseCompletePlugins sets the given plugins as the PostResponseComplete plugins. -// If the Config has PostResponseComplete plugins already, this call replaces the existing plugins with the given ones. -func (c *Config) WithPostResponseCompletePlugins(plugins ...PostResponseComplete) *Config { - c.postResponseCompletePlugins = plugins +// WithResponseCompletePlugins sets the given plugins as the ResponseComplete plugins. +// If the Config has ResponseComplete plugins already, this call replaces the existing plugins with the given ones. +func (c *Config) WithResponseCompletePlugins(plugins ...ResponseComplete) *Config { + c.responseCompletePlugins = plugins return c } @@ -75,14 +75,14 @@ func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) { if preRequestPlugin, ok := plugin.(PreRequest); ok { c.preRequestPlugins = append(c.preRequestPlugins, preRequestPlugin) } - if postResponseReceivedPlugin, ok := plugin.(PostResponseReceived); ok { - c.postResponseReceivedPlugins = append(c.postResponseReceivedPlugins, postResponseReceivedPlugin) + if responseReceivedPlugin, ok := plugin.(ResponseReceived); ok { + c.responseReceivedPlugins = append(c.responseReceivedPlugins, responseReceivedPlugin) } - if postResponseStreamingPlugin, ok := plugin.(PostResponseStreaming); ok { - c.postResponseStreamingPlugins = append(c.postResponseStreamingPlugins, postResponseStreamingPlugin) + if responseStreamingPlugin, ok := plugin.(ResponseStreaming); ok { + c.responseStreamingPlugins = append(c.responseStreamingPlugins, responseStreamingPlugin) } - if postResponseCompletePlugin, ok := plugin.(PostResponseComplete); ok { - c.postResponseCompletePlugins = append(c.postResponseCompletePlugins, postResponseCompletePlugin) + if responseCompletePlugin, ok := plugin.(ResponseComplete); ok { + c.responseCompletePlugins = append(c.responseCompletePlugins, responseCompletePlugin) } } } diff --git a/pkg/epp/requestcontrol/types.go b/pkg/epp/requestcontrol/types.go index 8604e1dda..c881ed713 100644 --- a/pkg/epp/requestcontrol/types.go +++ b/pkg/epp/requestcontrol/types.go @@ -16,7 +16,7 @@ limitations under the License. package requestcontrol -// Response contains information from the response received to be passed to PostResponse plugins +// Response contains information from the response received to be passed to the Response requestcontrol plugins type Response struct { // RequestId is the Envoy generated Id for the request being processed RequestId string From f53bf6403cfa0fc49201ceaa6d158ae47820f9b7 Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Fri, 3 Oct 2025 23:22:30 +0000 Subject: [PATCH 5/9] Fix function comment and pass existing logger into HandleResponseBodyStreaming --- pkg/epp/handlers/response.go | 2 +- pkg/epp/handlers/response_test.go | 3 ++- pkg/epp/handlers/server.go | 2 +- pkg/epp/requestcontrol/director.go | 4 ++-- pkg/epp/requestcontrol/director_test.go | 4 +++- pkg/epp/requestcontrol/request_control_config.go | 2 +- pkg/epp/server/server_test.go | 3 ++- 7 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index 1cbacbae3..5760cbfc6 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -68,7 +68,7 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques // The function is to handle streaming response if the modelServer is streaming. func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx *RequestContext, responseText string) { logger := log.FromContext(ctx) - _, err := s.director.HandleResponseBodyStreaming(ctx, reqCtx) + _, err := s.director.HandleResponseBodyStreaming(ctx, reqCtx, logger) if err != nil { logger.Error(err, "error in HandleResponseBodyStreaming") } diff --git a/pkg/epp/handlers/response_test.go b/pkg/epp/handlers/response_test.go index 63b2de0da..290161167 100644 --- a/pkg/epp/handlers/response_test.go +++ b/pkg/epp/handlers/response_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "testing" + "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" @@ -62,7 +63,7 @@ data: [DONE] type mockDirector struct{} -func (m *mockDirector) HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { +func (m *mockDirector) HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext, logger logr.Logger) (*RequestContext, error) { return reqCtx, nil } func (m *mockDirector) HandleResponseBodyComplete(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) { diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 001fdc344..59cde8949 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -55,7 +55,7 @@ func NewStreamingServer(datastore Datastore, director Director) *StreamingServer type Director interface { HandleRequest(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) HandleResponseReceived(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) - HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) + HandleResponseBodyStreaming(ctx context.Context, reqCtx *RequestContext, logger logr.Logger) (*RequestContext, error) HandleResponseBodyComplete(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) GetRandomPod() *backend.Pod } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 02b3af4ec..fe4f891e8 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -27,6 +27,7 @@ import ( "strings" "time" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" @@ -305,8 +306,7 @@ func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers. } // HandleResponseBodyStreaming is called every time a chunk of the response body is received. -func (d *Director) HandleResponseBodyStreaming(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { - logger := log.FromContext(ctx).WithValues("stage", "bodyChunk") +func (d *Director) HandleResponseBodyStreaming(ctx context.Context, reqCtx *handlers.RequestContext, logger logr.Logger) (*handlers.RequestContext, error) { logger.V(logutil.TRACE).Info("Entering HandleResponseBodyChunk") response := &Response{ RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 947d168cf..93cea6349 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -636,6 +637,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1)) + logger := log.FromContext(ctx) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -649,7 +651,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { TargetPod: &backend.Pod{NamespacedName: types.NamespacedName{Namespace: "namespace1", Name: "test-pod-name"}}, } - _, err := director.HandleResponseBodyStreaming(ctx, reqCtx) + _, err := director.HandleResponseBodyStreaming(ctx, reqCtx, logger) if err != nil { t.Fatalf("HandleResponseBodyStreaming() returned unexpected error: %v", err) } diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index cec197ef7..89841e1e3 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -45,7 +45,7 @@ func (c *Config) WithPreRequestPlugins(plugins ...PreRequest) *Config { return c } -// WithResponsePlugins sets the given plugins as the Response plugins. +// WithResponseReceivedPlugins sets the given plugins as the Response plugins. // If the Config has Response plugins already, this call replaces the existing plugins with the given ones. func (c *Config) WithResponseReceivedPlugins(plugins ...ResponseReceived) *Config { c.responseReceivedPlugins = plugins diff --git a/pkg/epp/server/server_test.go b/pkg/epp/server/server_test.go index 7220c04b4..f3fa16bfd 100644 --- a/pkg/epp/server/server_test.go +++ b/pkg/epp/server/server_test.go @@ -22,6 +22,7 @@ import ( "testing" pb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -185,7 +186,7 @@ func (ts *testDirector) HandleResponseReceived(ctx context.Context, reqCtx *hand return reqCtx, nil } -func (ts *testDirector) HandleResponseBodyStreaming(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { +func (ts *testDirector) HandleResponseBodyStreaming(ctx context.Context, reqCtx *handlers.RequestContext, logger logr.Logger) (*handlers.RequestContext, error) { return reqCtx, nil } From 26d53d3255242ee2e85eb35f7aedff0ad1a19896 Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Mon, 6 Oct 2025 12:19:51 -0700 Subject: [PATCH 6/9] Update pkg/epp/requestcontrol/plugins.go Co-authored-by: Nir Rozenbaum --- pkg/epp/requestcontrol/plugins.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index cad6d0627..422e85c5f 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -38,7 +38,7 @@ type PreRequest interface { PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int) } -// ResponseReceived is called by the director after a successful response is sent. +// ResponseReceived is called by the director after the response headers are successfully received which indicates the beginning of the response handling by the model server. // The given pod argument is the pod that served the request. type ResponseReceived interface { plugins.Plugin From 91e7c6396267fa81da295c17295aeccde8b87e76 Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Mon, 6 Oct 2025 12:20:15 -0700 Subject: [PATCH 7/9] Update pkg/epp/requestcontrol/request_control_config.go Co-authored-by: Nir Rozenbaum --- pkg/epp/requestcontrol/request_control_config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index 89841e1e3..ffa6c6609 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -45,8 +45,8 @@ func (c *Config) WithPreRequestPlugins(plugins ...PreRequest) *Config { return c } -// WithResponseReceivedPlugins sets the given plugins as the Response plugins. -// If the Config has Response plugins already, this call replaces the existing plugins with the given ones. +// WithResponseReceivedPlugins sets the given plugins as the ResponseReceived plugins. +// If the Config has ResponseReceived plugins already, this call replaces the existing plugins with the given ones. func (c *Config) WithResponseReceivedPlugins(plugins ...ResponseReceived) *Config { c.responseReceivedPlugins = plugins return c From ff896862678f4c694549565ef0e721ab191f55cc Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Mon, 6 Oct 2025 12:21:42 -0700 Subject: [PATCH 8/9] Update pkg/epp/requestcontrol/director.go Co-authored-by: Nir Rozenbaum --- pkg/epp/requestcontrol/director.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index fe4f891e8..e83bbc1ae 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -369,7 +369,7 @@ func (d *Director) runResponseReceivedPlugins(ctx context.Context, request *sche func (d *Director) runResponseStreamingPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) for _, plugin := range d.requestControlPlugins.responseStreamingPlugins { - loggerTrace.Info("Running post-response chunk plugin", "plugin", plugin.TypedName()) + loggerTrace.Info("Running ResponseStreaming plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.ResponseStreaming(ctx, request, response, targetPod) metrics.RecordPluginProcessingLatency(ResponseStreamingExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) From f73ef27c5845d0e4fa5cac5219c3647c275ae567 Mon Sep 17 00:00:00 2001 From: BenjaminBraunDev Date: Mon, 6 Oct 2025 19:40:38 +0000 Subject: [PATCH 9/9] Fix comments andlogs, simplify Director defintion to take in config --- pkg/epp/requestcontrol/director.go | 33 ++++++++----------------- pkg/epp/requestcontrol/director_test.go | 6 ++--- pkg/epp/requestcontrol/plugins.go | 3 ++- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index e83bbc1ae..56b4b1870 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -60,27 +60,13 @@ type SaturationDetector interface { IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool } -type RequestControlPlugins struct { - preRequestPlugins []PreRequest - responseReceivedPlugins []ResponseReceived - responseStreamingPlugins []ResponseStreaming - responseCompletePlugins []ResponseComplete -} - // NewDirectorWithConfig creates a new Director instance with all dependencies. func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { - RCPlugins := RequestControlPlugins{ - preRequestPlugins: config.preRequestPlugins, - responseReceivedPlugins: config.responseReceivedPlugins, - responseStreamingPlugins: config.responseStreamingPlugins, - responseCompletePlugins: config.responseCompletePlugins, - } - return &Director{ datastore: datastore, scheduler: scheduler, saturationDetector: saturationDetector, - requestControlPlugins: RCPlugins, + requestControlPlugins: *config, defaultPriority: 0, // define default priority explicitly } } @@ -90,7 +76,7 @@ type Director struct { datastore Datastore scheduler Scheduler saturationDetector SaturationDetector - requestControlPlugins RequestControlPlugins + requestControlPlugins Config // we just need a pointer to an int variable since priority is a pointer in InferenceObjective // no need to set this in the constructor, since the value we want is the default int val // and value types cannot be nil @@ -291,7 +277,7 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch return pm } -// HandleResponseReceived is called when the first chunk of the response arrives. +// HandleResponseReceived is called when the response headers are received. func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { response := &Response{ RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], @@ -347,22 +333,22 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling schedulingResult *schedulingtypes.SchedulingResult, targetPort int) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.preRequestPlugins { - loggerDebug.Info("Running pre-request plugin", "plugin", plugin.TypedName()) + loggerDebug.Info("Running PreRequest plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.PreRequest(ctx, request, schedulingResult, targetPort) metrics.RecordPluginProcessingLatency(PreRequestExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) - loggerDebug.Info("Completed running pre-request plugin successfully", "plugin", plugin.TypedName()) + loggerDebug.Info("Completed running PreRequest plugin successfully", "plugin", plugin.TypedName()) } } func (d *Director) runResponseReceivedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.responseReceivedPlugins { - loggerDebug.Info("Running post-response plugin", "plugin", plugin.TypedName()) + loggerDebug.Info("Running ResponseReceived plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.ResponseReceived(ctx, request, response, targetPod) metrics.RecordPluginProcessingLatency(ResponseReceivedExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) - loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName()) + loggerDebug.Info("Completed running ResponseReceived plugin successfully", "plugin", plugin.TypedName()) } } @@ -373,16 +359,17 @@ func (d *Director) runResponseStreamingPlugins(ctx context.Context, request *sch before := time.Now() plugin.ResponseStreaming(ctx, request, response, targetPod) metrics.RecordPluginProcessingLatency(ResponseStreamingExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) + loggerTrace.Info("Completed running ResponseStreaming plugin successfully", "plugin", plugin.TypedName()) } } func (d *Director) runResponseCompletePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.responseCompletePlugins { - loggerDebug.Info("Running post-response complete plugin", "plugin", plugin.TypedName()) + loggerDebug.Info("Running ResponseComplete plugin", "plugin", plugin.TypedName()) before := time.Now() plugin.ResponseComplete(ctx, request, response, targetPod) metrics.RecordPluginProcessingLatency(ResponseCompleteExtensionPoint, plugin.TypedName().Type, plugin.TypedName().Name, time.Since(before)) - loggerDebug.Info("Completed running post-response complete plugin successfully", "plugin", plugin.TypedName()) + loggerDebug.Info("Completed running ResponseComplete plugin successfully", "plugin", plugin.TypedName()) } } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 93cea6349..ea486d3c4 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -704,9 +704,9 @@ func TestDirector_HandleResponseComplete(t *testing.T) { } const ( - testResponseReceivedType = "test-post-response" - testPostStreamingType = "test-post-streaming" - testPostCompleteType = "test-post-complete" + testResponseReceivedType = "test-response-received" + testPostStreamingType = "test-response-streaming" + testPostCompleteType = "test-response-complete" ) type testResponseReceived struct { diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 422e85c5f..44334c68f 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -38,7 +38,8 @@ type PreRequest interface { PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int) } -// ResponseReceived is called by the director after the response headers are successfully received which indicates the beginning of the response handling by the model server. +// ResponseReceived is called by the director after the response headers are successfully received +// which indicates the beginning of the response handling by the model server. // The given pod argument is the pod that served the request. type ResponseReceived interface { plugins.Plugin