From cd7c550517ccba4b26bf958116d854dfa3caa9ce Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Tue, 10 Dec 2024 17:58:09 +0000 Subject: [PATCH] fix: lease based controller when using mTLS currently when the lease based server count is enabled along with mTLS between apiserver and konnectivity server, it breaks. this fixes this by setting up the k8s clientset correctly. Signed-off-by: Imran Pochi --- cmd/server/app/options/options.go | 23 +++++++--- cmd/server/app/server.go | 50 ++++++++++----------- pkg/util/k8s.go | 73 +++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 33 deletions(-) create mode 100644 pkg/util/k8s.go diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index f5a26621c..fd7311809 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -83,6 +83,8 @@ type ProxyRunOptions struct { AuthenticationAudience string // Path to kubeconfig (used by kubernetes client) KubeconfigPath string + // Use in cluster config or not (used by kubernetes client) + UseInCluster bool // Client maximum QPS. KubeconfigQPS float32 // Client maximum burst for throttle. @@ -142,6 +144,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.AgentNamespace, "agent-namespace", o.AgentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).") flags.StringVar(&o.AgentServiceAccount, "agent-service-account", o.AgentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") + flags.BoolVar(&o.UseInCluster, "use-in-cluster", o.UseInCluster, "Use in-cluster config to setup kubernetes client.") flags.Float32Var(&o.KubeconfigQPS, "kubeconfig-qps", o.KubeconfigQPS, "Maximum client QPS (proxy server uses this client to authenticate agent tokens).") flags.IntVar(&o.KubeconfigBurst, "kubeconfig-burst", o.KubeconfigBurst, "Maximum client burst (proxy server uses this client to authenticate agent tokens).") flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.") @@ -289,8 +292,8 @@ func (o *ProxyRunOptions) Validate() error { } // validate agent authentication params - // all 4 parameters must be empty or must have value (except KubeconfigPath that might be empty) - if o.AgentNamespace != "" || o.AgentServiceAccount != "" || o.AuthenticationAudience != "" || o.KubeconfigPath != "" { + // all 3 parameters must be empty or must have value + if o.AgentNamespace != "" || o.AgentServiceAccount != "" || o.AuthenticationAudience != "" { if o.ClusterCaCert != "" { return fmt.Errorf("ClusterCaCert can not be used when service account authentication is enabled") } @@ -303,13 +306,19 @@ func (o *ProxyRunOptions) Validate() error { if o.AuthenticationAudience == "" { return fmt.Errorf("AuthenticationAudience cannot be empty when agent authentication is enabled") } - if o.KubeconfigPath != "" { - if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) { - return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err) - } - } } + // either use kubeconfig path or incluster config for kubernetes client + if util.IsRunningInKubernetes() { + if (len(o.KubeconfigPath) > 0) == o.UseInCluster { + return fmt.Errorf("set only one of --use-in-cluster or --kubeconfig") + } + + if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) { + return fmt.Errorf("checking KubeconfigPath %q, got %v", o.KubeconfigPath, err) + } + + } // validate the proxy strategies if len(o.ProxyStrategies) == 0 { return fmt.Errorf("ProxyStrategies cannot be empty") diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 8f8ddfa36..e1b379683 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -40,7 +40,6 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" @@ -97,42 +96,41 @@ type Proxy struct { type StopFunc func() func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { + var err error + var k8sClient *kubernetes.Clientset + var authOpt *server.AgentTokenAuthenticationOptions o.Print() - if err := o.Validate(); err != nil { + if err = o.Validate(); err != nil { return fmt.Errorf("failed to validate server options with %v", err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var k8sClient *kubernetes.Clientset - if o.AgentNamespace != "" { - config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath) - if err != nil { - return fmt.Errorf("failed to load kubernetes client config: %v", err) + if util.IsRunningInKubernetes() { + clientsetcfg := &util.ClientsetConfig{ + Kubeconfig: o.KubeconfigPath, + InClusterConfig: o.UseInCluster, + ClientConfig: &util.ClientConfig{ + Burst: o.KubeconfigBurst, + QPS: o.KubeconfigQPS, + APIContentType: o.APIContentType, + }, } - if o.KubeconfigQPS != 0 { - klog.V(1).Infof("Setting k8s client QPS: %v", o.KubeconfigQPS) - config.QPS = o.KubeconfigQPS - } - if o.KubeconfigBurst != 0 { - klog.V(1).Infof("Setting k8s client Burst: %v", o.KubeconfigBurst) - config.Burst = o.KubeconfigBurst - } - config.ContentType = o.APIContentType - k8sClient, err = kubernetes.NewForConfig(config) + k8sClient, err = clientsetcfg.NewClientset() if err != nil { - return fmt.Errorf("failed to create kubernetes clientset: %v", err) + return err } - } - authOpt := &server.AgentTokenAuthenticationOptions{ - Enabled: o.AgentNamespace != "", - AgentNamespace: o.AgentNamespace, - AgentServiceAccount: o.AgentServiceAccount, - KubernetesClient: k8sClient, - AuthenticationAudience: o.AuthenticationAudience, + authOpt = &server.AgentTokenAuthenticationOptions{ + Enabled: o.AgentNamespace != "", + AgentNamespace: o.AgentNamespace, + AgentServiceAccount: o.AgentServiceAccount, + KubernetesClient: k8sClient, + AuthenticationAudience: o.AuthenticationAudience, + } } + klog.V(1).Infoln("Starting frontend server for client connections.") ps, err := server.ParseProxyStrategies(o.ProxyStrategies) if err != nil { @@ -160,7 +158,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { return err } - if o.EnableLeaseController { + if util.IsRunningInKubernetes() && o.EnableLeaseController { leaseController := leases.NewController( k8sClient, o.ServerID, diff --git a/pkg/util/k8s.go b/pkg/util/k8s.go new file mode 100644 index 000000000..8b5c73749 --- /dev/null +++ b/pkg/util/k8s.go @@ -0,0 +1,73 @@ +package util + +import ( + "fmt" + "os" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +type ClientsetConfig struct { + Kubeconfig string + InClusterConfig bool + ClientConfig *ClientConfig +} + +type ClientConfig struct { + QPS float32 + Burst int + APIContentType string +} + +func IsRunningInKubernetes() bool { + if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { + return true + } + + return false +} + +func (cfg *ClientsetConfig) NewClientset() (*kubernetes.Clientset, error) { + var config *rest.Config + var err error + + switch { + case cfg.Kubeconfig != "": + config, err = clientcmd.BuildConfigFromFlags("", cfg.Kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to load kubernetes client config: %v", err) + } + case cfg.InClusterConfig: + config, err = rest.InClusterConfig() + default: + return nil, fmt.Errorf("no valid configuration option provided; either provide kubeconfig path or set --use-in-cluster=true with necessary RBAC permissions") + } + + cfg.setClientOptions(config) + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes clientset: %w", err) + } + + return clientset, nil +} + +func (cfg *ClientsetConfig) setClientOptions(config *rest.Config) { + + if cfg.ClientConfig.QPS != 0 { + klog.V(1).Infof("Setting k8s client QPS: %v", cfg.ClientConfig.QPS) + config.QPS = cfg.ClientConfig.QPS + } + if cfg.ClientConfig.Burst != 0 { + klog.V(1).Infof("Setting k8s client Burst: %v", cfg.ClientConfig.Burst) + config.Burst = cfg.ClientConfig.Burst + } + if len(cfg.ClientConfig.APIContentType) != 0 { + klog.V(1).Infof("Setting k8s client Content type: %v", cfg.ClientConfig.APIContentType) + config.ContentType = cfg.ClientConfig.APIContentType + } +}