Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kube-apiserver/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func NewServerRunOptions() *ServerRunOptions {
}
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
// Set the default for admission plugins names
s.Admission.PluginNames = []string{"AlwaysAdmit"}
return &s
}

Expand Down
126 changes: 71 additions & 55 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"

clientgoinformers "k8s.io/client-go/informers"
clientgo "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/cmd/kube-apiserver/app/preflight"
"k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -99,11 +101,11 @@ cluster's shared state through which all other components interact.`,

// Run runs the specified APIServer. This should never exit.
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions)
kubeAPIServerConfig, internalSharedInformers, externalSharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions)
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, stopCh)
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, internalSharedInformers, externalSharedInformers, stopCh)
if err != nil {
return err
}
Expand All @@ -129,7 +131,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, internalSharedInformers, stopCh)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
Expand All @@ -138,38 +140,39 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
}

// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, internalSharedInformers informers.SharedInformerFactory, externalSharedInformers clientgoinformers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New()
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(stopCh)
internalSharedInformers.Start(stopCh)
externalSharedInformers.Start(stopCh)
return nil
})

return kubeAPIServer, nil
}

// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
// set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// validate options
if errs := s.Validate(); len(errs) != 0 {
return nil, nil, nil, utilerrors.NewAggregate(errs)
return nil, nil, nil, nil, utilerrors.NewAggregate(errs)
}

genericConfig, sharedInformers, insecureServingOptions, err := BuildGenericConfig(s)
genericConfig, internalSharedInformerFactory, externalSharedInformerFactory, insecureServingOptions, err := BuildGenericConfig(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
return nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
}

capabilities.Initialize(capabilities.Capabilities{
Expand All @@ -191,18 +194,18 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, inf
var installSSHKey tunneler.InstallSSHKey
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
if err != nil {
return nil, nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
return nil, nil, nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
installSSHKey = instances.AddSSHKeyToAllInstances
}
}
if s.KubeletConfig.Port == 0 {
return nil, nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
return nil, nil, nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
}
if s.KubeletConfig.ReadOnlyPort == 0 {
return nil, nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
return nil, nil, nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
}
// Set up the nodeTunneler
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
Expand All @@ -228,21 +231,21 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, inf

serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

config := &master.Config{
Expand Down Expand Up @@ -278,30 +281,30 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, inf
MasterCount: s.MasterCount,
}

return config, sharedInformers, insecureServingOptions, nil
return config, internalSharedInformerFactory, externalSharedInformerFactory, insecureServingOptions, nil
}

// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
genericConfig := genericapiserver.NewConfig(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
insecureServingOptions, err := s.InsecureServing.ApplyTo(genericConfig)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Audit.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Features.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
Expand All @@ -319,10 +322,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,

storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// Use protobufs for self-communication.
Expand All @@ -331,67 +334,80 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"

client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
if len(kubeAPIVersions) == 0 {
return nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
}
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
internalClient, intErr := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if intErr != nil && len(kubeAPIVersions) == 0 {
return nil, nil, nil, nil, fmt.Errorf("failed to create internal clientset: %v", intErr)
}

externalClient, extErr := clientgo.NewForConfig(genericConfig.LoopbackClientConfig)
if extErr != nil && len(kubeAPIVersions) == 0 {
return nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", extErr)
}

if intErr != nil || extErr != nil {
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
// groups. This leads to a nil client above and undefined behaviour further down.
//
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
glog.Errorf("Failed to create client with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)

genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
// create shared informers
internalSharedInformersFactory := informers.NewSharedInformerFactory(internalClient, 10*time.Minute)
externalSharedInformerFactory := clientgoinformers.NewSharedInformerFactory(externalClient, genericConfig.LoopbackClientConfig.Timeout)

genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, internalClient, internalSharedInformersFactory)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
return nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}

genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers)
genericConfig.Authorizer, err = BuildAuthorizer(s, internalSharedInformersFactory)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
return nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
}
if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}

genericConfig.AdmissionControl, err = BuildAdmission(s,
s.Admission.Plugins,
client,
sharedInformers,
pluginInitializer, err := BuildAdmissionPluginInitializer(
s,
internalClient,
internalSharedInformersFactory,
genericConfig.Authorizer,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}

return genericConfig, sharedInformers, insecureServingOptions, nil
err = s.Admission.ApplyTo(
genericConfig.Authorizer,
genericConfig.LoopbackClientConfig,
genericConfig,
externalSharedInformerFactory,
pluginInitializer)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
}
return genericConfig, internalSharedInformersFactory, externalSharedInformerFactory, insecureServingOptions, nil
}

// BuildAdmission constructs the admission chain
func BuildAdmission(s *options.ServerRunOptions, plugins *admission.Plugins, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.Interface, error) {
admissionControlPluginNames := strings.Split(s.Admission.Control, ",")
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
var cloudConfig []byte
var err error

if s.CloudProvider.CloudConfigFile != "" {
var err error
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}

// TODO: use a dynamic restmapper. See https://github.com/kubernetes/kubernetes/pull/42615.
restMapper := api.Registry.RESTMapper()
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to read plugin config: %v", err)
}
return plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
return pluginInitializer, nil
}

// BuildAuthenticator constructs the authenticator
Expand Down
3 changes: 2 additions & 1 deletion federation/cmd/federation-apiserver/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)

Expand Down
2 changes: 2 additions & 0 deletions federation/cmd/federation-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func NewServerRunOptions() *ServerRunOptions {
}
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
// Set the default for admission plugins names
s.Admission.PluginNames = []string{"AlwaysAdmit"}
return &s
}

Expand Down
Loading