@@ -14,6 +14,8 @@ import (
1414 "net"
1515 "os"
1616 "path/filepath"
17+ "strings"
18+ "time"
1719
1820 egextension "github.com/envoyproxy/gateway/proto/extension"
1921 "go.uber.org/zap/zapcore"
@@ -22,7 +24,9 @@ import (
2224 admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
2325 corev1 "k8s.io/api/core/v1"
2426 ctrl "sigs.k8s.io/controller-runtime"
27+ "sigs.k8s.io/controller-runtime/pkg/cache"
2528 "sigs.k8s.io/controller-runtime/pkg/client"
29+ "sigs.k8s.io/controller-runtime/pkg/config"
2630 "sigs.k8s.io/controller-runtime/pkg/log/zap"
2731 "sigs.k8s.io/controller-runtime/pkg/webhook"
2832
@@ -51,7 +55,9 @@ type flags struct {
5155 // extProcMaxRecvMsgSize is the maximum message size in bytes that the gRPC server can receive.
5256 extProcMaxRecvMsgSize int
5357 // maxRecvMsgSize is the maximum message size in bytes that the gRPC extension server can receive.
54- maxRecvMsgSize int
58+ maxRecvMsgSize int
59+ watchNamespaces []string
60+ cacheSyncTimeout time.Duration
5561}
5662
5763// parsePullPolicy parses string into a k8s PullPolicy.
@@ -64,6 +70,18 @@ func parsePullPolicy(s string) (corev1.PullPolicy, error) {
6470 }
6571}
6672
73+ // parseWatchNamespaces parses a comma-separated list of namespaces into a slice of strings.
74+ func parseWatchNamespaces (s string ) []string {
75+ var namespaces []string
76+ for _ , n := range strings .Split (s , "," ) {
77+ ns := strings .TrimSpace (n )
78+ if ns != "" {
79+ namespaces = append (namespaces , ns )
80+ }
81+ }
82+ return namespaces
83+ }
84+
6785// parseAndValidateFlags parses the command-line arguments provided in args,
6886// validates them, and returns the parsed configuration.
6987func parseAndValidateFlags (args []string ) (flags , error ) {
@@ -159,6 +177,16 @@ func parseAndValidateFlags(args []string) (flags, error) {
159177 4 * 1024 * 1024 ,
160178 "Maximum message size in bytes that the gRPC extension server can receive. Default is 4MB." ,
161179 )
180+ watchNamespaces := fs .String (
181+ "watchNamespaces" ,
182+ "" ,
183+ "Comma-separated list of namespaces to watch. If not set, the controller watches all namespaces." ,
184+ )
185+ cacheSyncTimeout := fs .Duration (
186+ "cacheSyncTimeout" ,
187+ 2 * time .Minute , // This is the controller-runtime default
188+ "Maximum time to wait for k8s caches to sync" ,
189+ )
162190
163191 if err := fs .Parse (args ); err != nil {
164192 err = fmt .Errorf ("failed to parse flags: %w" , err )
@@ -238,41 +266,47 @@ func parseAndValidateFlags(args []string) (flags, error) {
238266 extProcImagePullSecrets : * extProcImagePullSecrets ,
239267 extProcMaxRecvMsgSize : * extProcMaxRecvMsgSize ,
240268 maxRecvMsgSize : * maxRecvMsgSize ,
269+ watchNamespaces : parseWatchNamespaces (* watchNamespaces ),
270+ cacheSyncTimeout : * cacheSyncTimeout ,
241271 }, nil
242272}
243273
244274func main () {
245275 setupLog := ctrl .Log .WithName ("setup" )
246276
247- flags , err := parseAndValidateFlags (os .Args [1 :])
277+ parsedFlags , err := parseAndValidateFlags (os .Args [1 :])
248278 if err != nil {
249279 setupLog .Error (err , "failed to parse and validate flags" )
250280 os .Exit (1 )
251281 }
252282
253283 // Warn if deprecated flag is being used.
254- if flags .metricsRequestHeaderLabels != "" {
284+ if parsedFlags .metricsRequestHeaderLabels != "" {
255285 setupLog .Info ("The --metricsRequestHeaderLabels flag is deprecated and will be removed in a future release. Please use --metricsRequestHeaderAttributes instead." )
256286 }
257287
258- ctrl .SetLogger (zap .New (zap .UseFlagOptions (& zap.Options {Development : true , Level : flags .logLevel })))
288+ ctrl .SetLogger (zap .New (zap .UseFlagOptions (& zap.Options {Development : true , Level : parsedFlags .logLevel })))
259289 k8sConfig := ctrl .GetConfigOrDie ()
260290
261- lis , err := net .Listen ("tcp" , flags .extensionServerPort )
291+ lis , err := net .Listen ("tcp" , parsedFlags .extensionServerPort )
262292 if err != nil {
263- setupLog .Error (err , "failed to listen" , "port" , flags .extensionServerPort )
293+ setupLog .Error (err , "failed to listen" , "port" , parsedFlags .extensionServerPort )
264294 os .Exit (1 )
265295 }
266296
297+ setupLog .Info ("configuring kubernetes cache" , "watch-namespaces" , parsedFlags .watchNamespaces , "sync-timeout" , parsedFlags .cacheSyncTimeout )
298+
267299 ctx := ctrl .SetupSignalHandler ()
268300 mgrOpts := ctrl.Options {
301+ Cache : setupCache (parsedFlags ),
302+ Controller : config.Controller {CacheSyncTimeout : parsedFlags .cacheSyncTimeout },
269303 Scheme : controller .Scheme ,
270- LeaderElection : flags .enableLeaderElection ,
304+ LeaderElection : parsedFlags .enableLeaderElection ,
271305 LeaderElectionID : "envoy-ai-gateway-controller" ,
272306 WebhookServer : webhook .NewServer (webhook.Options {
273- CertDir : flags .tlsCertDir ,
274- CertName : flags .tlsCertName ,
275- KeyName : flags .tlsKeyName ,
307+ CertDir : parsedFlags .tlsCertDir ,
308+ CertName : parsedFlags .tlsCertName ,
309+ KeyName : parsedFlags .tlsKeyName ,
276310 Port : 9443 ,
277311 }),
278312 }
@@ -287,14 +321,14 @@ func main() {
287321 setupLog .Error (err , "failed to create client" )
288322 os .Exit (1 )
289323 }
290- if err := maybePatchAdmissionWebhook (ctx , cli , filepath .Join (flags .tlsCertDir , flags .caBundleName )); err != nil {
324+ if err := maybePatchAdmissionWebhook (ctx , cli , filepath .Join (parsedFlags .tlsCertDir , parsedFlags .caBundleName )); err != nil {
291325 setupLog .Error (err , "failed to patch admission webhook" )
292326 os .Exit (1 )
293327 }
294328
295329 // Start the extension server running alongside the controller.
296330 const extProcUDSPath = "/etc/ai-gateway-extproc-uds/run.sock"
297- s := grpc .NewServer (grpc .MaxRecvMsgSize (flags .maxRecvMsgSize ))
331+ s := grpc .NewServer (grpc .MaxRecvMsgSize (parsedFlags .maxRecvMsgSize ))
298332 extSrv := extensionserver .New (mgr .GetClient (), ctrl .Log , extProcUDSPath , false )
299333 egextension .RegisterEnvoyGatewayExtensionServer (s , extSrv )
300334 grpc_health_v1 .RegisterHealthServer (s , extSrv )
@@ -310,17 +344,17 @@ func main() {
310344
311345 // Start the controller.
312346 if err := controller .StartControllers (ctx , mgr , k8sConfig , ctrl .Log .WithName ("controller" ), controller.Options {
313- ExtProcImage : flags .extProcImage ,
314- ExtProcImagePullPolicy : flags .extProcImagePullPolicy ,
315- ExtProcLogLevel : flags .extProcLogLevel ,
316- EnableLeaderElection : flags .enableLeaderElection ,
347+ ExtProcImage : parsedFlags .extProcImage ,
348+ ExtProcImagePullPolicy : parsedFlags .extProcImagePullPolicy ,
349+ ExtProcLogLevel : parsedFlags .extProcLogLevel ,
350+ EnableLeaderElection : parsedFlags .enableLeaderElection ,
317351 UDSPath : extProcUDSPath ,
318- MetricsRequestHeaderAttributes : flags .metricsRequestHeaderAttributes ,
319- TracingRequestHeaderAttributes : flags .spanRequestHeaderAttributes ,
320- RootPrefix : flags .rootPrefix ,
321- ExtProcExtraEnvVars : flags .extProcExtraEnvVars ,
322- ExtProcImagePullSecrets : flags .extProcImagePullSecrets ,
323- ExtProcMaxRecvMsgSize : flags .extProcMaxRecvMsgSize ,
352+ MetricsRequestHeaderAttributes : parsedFlags .metricsRequestHeaderAttributes ,
353+ TracingRequestHeaderAttributes : parsedFlags .spanRequestHeaderAttributes ,
354+ RootPrefix : parsedFlags .rootPrefix ,
355+ ExtProcExtraEnvVars : parsedFlags .extProcExtraEnvVars ,
356+ ExtProcImagePullSecrets : parsedFlags .extProcImagePullSecrets ,
357+ ExtProcMaxRecvMsgSize : parsedFlags .extProcMaxRecvMsgSize ,
324358 }); err != nil {
325359 setupLog .Error (err , "failed to start controller" )
326360 }
@@ -356,3 +390,19 @@ func maybePatchAdmissionWebhook(ctx context.Context, cli client.Client, bundlePa
356390 }
357391 return nil
358392}
393+
394+ // setupCache sets up the cache options based on the provided flags.
395+ func setupCache (f flags ) cache.Options {
396+ var namespaceCacheConfig map [string ]cache.Config
397+ if len (f .watchNamespaces ) > 0 {
398+ namespaceCacheConfig = make (map [string ]cache.Config , len (f .watchNamespaces ))
399+ for _ , ns := range f .watchNamespaces {
400+ namespaceCacheConfig [ns ] = cache.Config {}
401+ }
402+ }
403+
404+ return cache.Options {
405+ DefaultNamespaces : namespaceCacheConfig ,
406+ DefaultTransform : cache .TransformStripManagedFields (),
407+ }
408+ }
0 commit comments