11package main
22
33import (
4- "context"
54 "flag"
65 "fmt"
76 "net"
8- "os"
9- "os/signal"
10- "syscall"
117 "time"
128
139 extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1410 "google.golang.org/grpc"
15- "google.golang.org/grpc/codes"
1611 healthPb "google.golang.org/grpc/health/grpc_health_v1"
17- "google.golang.org/grpc/status"
1812 "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1913 "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2014 "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
@@ -29,10 +23,14 @@ import (
2923)
3024
3125var (
32- port = flag .Int (
33- "port " ,
26+ grpcPort = flag .Int (
27+ "grpcPort " ,
3428 9002 ,
35- "gRPC port" )
29+ "The gRPC port used for communicating with Envoy proxy" )
30+ grpcHealthPort = flag .Int (
31+ "grpcHealthPort" ,
32+ 9003 ,
33+ "The port used for gRPC liveness and readiness probes" )
3634 targetPodHeader = flag .String (
3735 "targetPodHeader" ,
3836 "target-pod" ,
@@ -65,55 +63,39 @@ var (
6563 scheme = runtime .NewScheme ()
6664)
6765
68- type healthServer struct {}
69-
70- func (s * healthServer ) Check (
71- ctx context.Context ,
72- in * healthPb.HealthCheckRequest ,
73- ) (* healthPb.HealthCheckResponse , error ) {
74- klog .Infof ("Handling grpc Check request + %s" , in .String ())
75- return & healthPb.HealthCheckResponse {Status : healthPb .HealthCheckResponse_SERVING }, nil
76- }
77-
78- func (s * healthServer ) Watch (in * healthPb.HealthCheckRequest , srv healthPb.Health_WatchServer ) error {
79- return status .Error (codes .Unimplemented , "Watch is not implemented" )
80- }
81-
8266func init () {
8367 utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
8468 utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
8569}
8670
8771func main () {
88-
8972 klog .InitFlags (nil )
9073 flag .Parse ()
9174
9275 ctrl .SetLogger (klog .TODO ())
9376
77+ // Validate flags
78+ if err := validateFlags (); err != nil {
79+ klog .Fatalf ("Failed to validate flags: %v" , err )
80+ }
81+
9482 // Print all flag values
9583 flags := "Flags: "
9684 flag .VisitAll (func (f * flag.Flag ) {
9785 flags += fmt .Sprintf ("%s=%v; " , f .Name , f .Value )
9886 })
9987 klog .Info (flags )
10088
101- klog . Infof ( "Listening on %q" , fmt . Sprintf ( ":%d" , * port ))
102- lis , err := net . Listen ( "tcp" , fmt . Sprintf ( ":%d" , * port ) )
89+ // Create a new manager to manage controllers
90+ mgr , err := ctrl . NewManager ( ctrl . GetConfigOrDie (), ctrl. Options { Scheme : scheme } )
10391 if err != nil {
104- klog .Fatalf ("failed to listen : %v" , err )
92+ klog .Fatalf ("Failed to create controller manager : %v" , err )
10593 }
10694
95+ // Create the data store used to cache watched resources
10796 datastore := backend .NewK8sDataStore ()
10897
109- mgr , err := ctrl .NewManager (ctrl .GetConfigOrDie (), ctrl.Options {
110- Scheme : scheme ,
111- })
112- if err != nil {
113- klog .Error (err , "unable to start manager" )
114- os .Exit (1 )
115- }
116-
98+ // Create the controllers and register them with the manager
11799 if err := (& backend.InferencePoolReconciler {
118100 Datastore : datastore ,
119101 Scheme : mgr .GetScheme (),
@@ -124,7 +106,7 @@ func main() {
124106 },
125107 Record : mgr .GetEventRecorderFor ("InferencePool" ),
126108 }).SetupWithManager (mgr ); err != nil {
127- klog .Error ( err , "Error setting up InferencePoolReconciler" )
109+ klog .Fatalf ( "Failed setting up InferencePoolReconciler: %v" , err )
128110 }
129111
130112 if err := (& backend.InferenceModelReconciler {
@@ -137,7 +119,7 @@ func main() {
137119 },
138120 Record : mgr .GetEventRecorderFor ("InferenceModel" ),
139121 }).SetupWithManager (mgr ); err != nil {
140- klog .Error ( err , "Error setting up InferenceModelReconciler" )
122+ klog .Fatalf ( "Failed setting up InferenceModelReconciler: %v" , err )
141123 }
142124
143125 if err := (& backend.EndpointSliceReconciler {
@@ -148,53 +130,105 @@ func main() {
148130 ServiceName : * serviceName ,
149131 Zone : * zone ,
150132 }).SetupWithManager (mgr ); err != nil {
151- klog .Error (err , "Error setting up EndpointSliceReconciler" )
133+ klog .Fatalf ("Failed setting up EndpointSliceReconciler: %v" , err )
134+ }
135+
136+ // Start health and ext-proc servers in goroutines
137+ healthSvr := startHealthServer (datastore , * grpcHealthPort )
138+ extProcSvr := startExternalProcessorServer (
139+ datastore ,
140+ * grpcPort ,
141+ * refreshPodsInterval ,
142+ * refreshMetricsInterval ,
143+ * targetPodHeader ,
144+ )
145+
146+ // Start the controller manager. Blocking and will return when shutdown is complete.
147+ klog .Infof ("Starting controller manager" )
148+ if err := mgr .Start (ctrl .SetupSignalHandler ()); err != nil {
149+ klog .Fatalf ("Error starting controller manager: %v" , err )
150+ }
151+ klog .Info ("Controller manager shutting down" )
152+
153+ // Gracefully shutdown servers
154+ if healthSvr != nil {
155+ klog .Info ("Health server shutting down" )
156+ healthSvr .GracefulStop ()
152157 }
158+ if extProcSvr != nil {
159+ klog .Info ("Ext-proc server shutting down" )
160+ extProcSvr .GracefulStop ()
161+ }
162+
163+ klog .Info ("All components shutdown" )
164+ }
165+
166+ // startHealthServer starts the gRPC health probe server in a goroutine.
167+ func startHealthServer (ds * backend.K8sDatastore , port int ) * grpc.Server {
168+ svr := grpc .NewServer ()
169+ healthPb .RegisterHealthServer (svr , & healthServer {datastore : ds })
153170
154- errChan := make (chan error )
155171 go func () {
156- if err := mgr . Start ( ctrl . SetupSignalHandler ()); err != nil {
157- klog . Error ( err , "Error running manager" )
158- errChan <- err
172+ lis , err := net . Listen ( "tcp" , fmt . Sprintf ( ":%d" , port ))
173+ if err != nil {
174+ klog . Fatalf ( "Health server failed to listen: %v" , err )
159175 }
176+ klog .Infof ("Health server listening on port: %d" , port )
177+
178+ // Blocking and will return when shutdown is complete.
179+ if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
180+ klog .Fatalf ("Health server failed: %v" , err )
181+ }
182+ klog .Info ("Health server shutting down" )
160183 }()
184+ return svr
185+ }
161186
162- s := grpc .NewServer ()
187+ // startExternalProcessorServer starts the Envoy external processor server in a goroutine.
188+ func startExternalProcessorServer (
189+ datastore * backend.K8sDatastore ,
190+ port int ,
191+ refreshPodsInterval , refreshMetricsInterval time.Duration ,
192+ targetPodHeader string ,
193+ ) * grpc.Server {
194+ svr := grpc .NewServer ()
163195
164- pp := backend .NewProvider (& vllm.PodMetricsClientImpl {}, datastore )
165- if err := pp .Init (* refreshPodsInterval , * refreshMetricsInterval ); err != nil {
166- klog .Fatalf ("failed to initialize: %v" , err )
167- }
168- extProcPb .RegisterExternalProcessorServer (
169- s ,
170- handlers .NewServer (
171- pp ,
172- scheduling .NewScheduler (pp ),
173- * targetPodHeader ,
174- datastore ))
175- healthPb .RegisterHealthServer (s , & healthServer {})
176-
177- klog .Infof ("Starting gRPC server on port :%v" , * port )
178-
179- // shutdown
180- var gracefulStop = make (chan os.Signal , 1 )
181- signal .Notify (gracefulStop , syscall .SIGTERM )
182- signal .Notify (gracefulStop , syscall .SIGINT )
183196 go func () {
184- select {
185- case sig := <- gracefulStop :
186- klog .Infof ("caught sig: %+v" , sig )
187- os .Exit (0 )
188- case err := <- errChan :
189- klog .Infof ("caught error in controller: %+v" , err )
190- os .Exit (0 )
197+ lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
198+ if err != nil {
199+ klog .Fatalf ("Ext-proc server failed to listen: %v" , err )
191200 }
201+ klog .Infof ("Ext-proc server listening on port: %d" , port )
192202
203+ // Initialize backend provider
204+ pp := backend .NewProvider (& vllm.PodMetricsClientImpl {}, datastore )
205+ if err := pp .Init (refreshPodsInterval , refreshMetricsInterval ); err != nil {
206+ klog .Fatalf ("Failed to initialize backend provider: %v" , err )
207+ }
208+
209+ // Register ext_proc handlers
210+ extProcPb .RegisterExternalProcessorServer (
211+ svr ,
212+ handlers .NewServer (pp , scheduling .NewScheduler (pp ), targetPodHeader , datastore ),
213+ )
214+
215+ // Blocking and will return when shutdown is complete.
216+ if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
217+ klog .Fatalf ("Ext-proc server failed: %v" , err )
218+ }
219+ klog .Info ("Ext-proc server shutting down" )
193220 }()
221+ return svr
222+ }
194223
195- err = s .Serve (lis )
196- if err != nil {
197- klog .Fatalf ("Ext-proc failed with the err: %v" , err )
224+ func validateFlags () error {
225+ if * poolName == "" {
226+ return fmt .Errorf ("required %q flag not set" , "poolName" )
227+ }
228+
229+ if * serviceName == "" {
230+ return fmt .Errorf ("required %q flag not set" , "serviceName" )
198231 }
199232
233+ return nil
200234}
0 commit comments