Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type ProxyRunOptions struct {
AuthenticationAudience string
// Path to kubeconfig (used by kubernetes client)
KubeconfigPath string
// Use in cluster config or not (used by kubernetes client)
UseInCluster bool
// Client maximum QPS.
KubeconfigQPS float32
// Client maximum burst for throttle.
Expand Down Expand Up @@ -142,6 +144,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.AgentNamespace, "agent-namespace", o.AgentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).")
flags.StringVar(&o.AgentServiceAccount, "agent-service-account", o.AgentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
flags.BoolVar(&o.UseInCluster, "use-in-cluster", o.UseInCluster, "Use in-cluster config to setup kubernetes client.")
flags.Float32Var(&o.KubeconfigQPS, "kubeconfig-qps", o.KubeconfigQPS, "Maximum client QPS (proxy server uses this client to authenticate agent tokens).")
flags.IntVar(&o.KubeconfigBurst, "kubeconfig-burst", o.KubeconfigBurst, "Maximum client burst (proxy server uses this client to authenticate agent tokens).")
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
Expand Down Expand Up @@ -289,8 +292,8 @@ func (o *ProxyRunOptions) Validate() error {
}

// validate agent authentication params
// all 4 parameters must be empty or must have value (except KubeconfigPath that might be empty)
if o.AgentNamespace != "" || o.AgentServiceAccount != "" || o.AuthenticationAudience != "" || o.KubeconfigPath != "" {
// all 3 parameters must be empty or must have value
if o.AgentNamespace != "" || o.AgentServiceAccount != "" || o.AuthenticationAudience != "" {
if o.ClusterCaCert != "" {
return fmt.Errorf("ClusterCaCert can not be used when service account authentication is enabled")
}
Expand All @@ -303,13 +306,19 @@ func (o *ProxyRunOptions) Validate() error {
if o.AuthenticationAudience == "" {
return fmt.Errorf("AuthenticationAudience cannot be empty when agent authentication is enabled")
}
if o.KubeconfigPath != "" {
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}
}

// either use kubeconfig path or incluster config for kubernetes client
if util.IsRunningInKubernetes() {
if (len(o.KubeconfigPath) > 0) == o.UseInCluster {
return fmt.Errorf("set only one of --use-in-cluster or --kubeconfig")
}

if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
return fmt.Errorf("checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}

}
// validate the proxy strategies
if len(o.ProxyStrategies) == 0 {
return fmt.Errorf("ProxyStrategies cannot be empty")
Expand Down
50 changes: 24 additions & 26 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
Expand Down Expand Up @@ -97,42 +96,41 @@ type Proxy struct {
type StopFunc func()

func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
var err error
var k8sClient *kubernetes.Clientset
var authOpt *server.AgentTokenAuthenticationOptions
o.Print()
if err := o.Validate(); err != nil {
if err = o.Validate(); err != nil {
return fmt.Errorf("failed to validate server options with %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var k8sClient *kubernetes.Clientset
if o.AgentNamespace != "" {
config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
if err != nil {
return fmt.Errorf("failed to load kubernetes client config: %v", err)
if util.IsRunningInKubernetes() {
clientsetcfg := &util.ClientsetConfig{
Kubeconfig: o.KubeconfigPath,
InClusterConfig: o.UseInCluster,
ClientConfig: &util.ClientConfig{
Burst: o.KubeconfigBurst,
QPS: o.KubeconfigQPS,
APIContentType: o.APIContentType,
},
}

if o.KubeconfigQPS != 0 {
klog.V(1).Infof("Setting k8s client QPS: %v", o.KubeconfigQPS)
config.QPS = o.KubeconfigQPS
}
if o.KubeconfigBurst != 0 {
klog.V(1).Infof("Setting k8s client Burst: %v", o.KubeconfigBurst)
config.Burst = o.KubeconfigBurst
}
config.ContentType = o.APIContentType
k8sClient, err = kubernetes.NewForConfig(config)
k8sClient, err = clientsetcfg.NewClientset()
if err != nil {
return fmt.Errorf("failed to create kubernetes clientset: %v", err)
return err
}
}

authOpt := &server.AgentTokenAuthenticationOptions{
Enabled: o.AgentNamespace != "",
AgentNamespace: o.AgentNamespace,
AgentServiceAccount: o.AgentServiceAccount,
KubernetesClient: k8sClient,
AuthenticationAudience: o.AuthenticationAudience,
authOpt = &server.AgentTokenAuthenticationOptions{
Enabled: o.AgentNamespace != "",
AgentNamespace: o.AgentNamespace,
AgentServiceAccount: o.AgentServiceAccount,
KubernetesClient: k8sClient,
AuthenticationAudience: o.AuthenticationAudience,
}
}

klog.V(1).Infoln("Starting frontend server for client connections.")
ps, err := server.ParseProxyStrategies(o.ProxyStrategies)
if err != nil {
Expand Down Expand Up @@ -160,7 +158,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
return err
}

if o.EnableLeaseController {
if util.IsRunningInKubernetes() && o.EnableLeaseController {
leaseController := leases.NewController(
k8sClient,
o.ServerID,
Expand Down
73 changes: 73 additions & 0 deletions pkg/util/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package util

import (
"fmt"
"os"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

type ClientsetConfig struct {
Kubeconfig string
InClusterConfig bool
ClientConfig *ClientConfig
}

type ClientConfig struct {
QPS float32
Burst int
APIContentType string
}

func IsRunningInKubernetes() bool {
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
return true
}

return false
}

func (cfg *ClientsetConfig) NewClientset() (*kubernetes.Clientset, error) {
var config *rest.Config
var err error

switch {
case cfg.Kubeconfig != "":
config, err = clientcmd.BuildConfigFromFlags("", cfg.Kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to load kubernetes client config: %v", err)
}
case cfg.InClusterConfig:
config, err = rest.InClusterConfig()
default:
return nil, fmt.Errorf("no valid configuration option provided; either provide kubeconfig path or set --use-in-cluster=true with necessary RBAC permissions")
}

cfg.setClientOptions(config)

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes clientset: %w", err)
}

return clientset, nil
}

func (cfg *ClientsetConfig) setClientOptions(config *rest.Config) {

if cfg.ClientConfig.QPS != 0 {
klog.V(1).Infof("Setting k8s client QPS: %v", cfg.ClientConfig.QPS)
config.QPS = cfg.ClientConfig.QPS
}
if cfg.ClientConfig.Burst != 0 {
klog.V(1).Infof("Setting k8s client Burst: %v", cfg.ClientConfig.Burst)
config.Burst = cfg.ClientConfig.Burst
}
if len(cfg.ClientConfig.APIContentType) != 0 {
klog.V(1).Infof("Setting k8s client Content type: %v", cfg.ClientConfig.APIContentType)
config.ContentType = cfg.ClientConfig.APIContentType
}
}
Loading