Skip to content

Commit c6b6163

Browse files
authored
Merge pull request kubernetes#124576 from sttts/sttts-peer-proxy-generic-move
controlplane/apiserver: move peer proxy code to allow generic aggregator construction
2 parents 97332c1 + c252ebe commit c6b6163

File tree

6 files changed

+122
-71
lines changed

6 files changed

+122
-71
lines changed

cmd/kube-apiserver/app/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
8484
}
8585
c.ApiExtensions = apiExtensions
8686

87-
aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.Extra.PeerProxy, pluginInitializer)
87+
aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer)
8888
if err != nil {
8989
return nil, err
9090
}

cmd/kube-apiserver/app/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,14 +266,14 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
266266
}
267267

268268
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
269-
config.Extra.PeerEndpointLeaseReconciler, err = controlplane.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
269+
config.ControlPlane.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
270270
if err != nil {
271271
return nil, nil, nil, err
272272
}
273273
// build peer proxy config only if peer ca file exists
274274
if opts.PeerCAFile != "" {
275-
config.Extra.PeerProxy, err = controlplane.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
276-
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
275+
config.ControlPlane.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
276+
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ControlPlane.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
277277
if err != nil {
278278
return nil, nil, nil, err
279279
}

pkg/controlplane/apiserver/completion.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,9 @@ func (c *Config) Complete() CompletedConfig {
4040
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.Generic.ExternalAddress}
4141
cfg.Generic.DiscoveryAddresses = discoveryAddresses
4242

43+
if cfg.Extra.PeerEndpointReconcileInterval == 0 {
44+
cfg.Extra.PeerEndpointReconcileInterval = DefaultPeerEndpointReconcileInterval
45+
}
46+
4347
return CompletedConfig{&cfg}
4448
}

pkg/controlplane/apiserver/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ import (
3131
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
3232
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
3333
genericfeatures "k8s.io/apiserver/pkg/features"
34+
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
3435
genericapiserver "k8s.io/apiserver/pkg/server"
3536
"k8s.io/apiserver/pkg/server/egressselector"
3637
"k8s.io/apiserver/pkg/server/filters"
3738
serverstorage "k8s.io/apiserver/pkg/server/storage"
3839
utilfeature "k8s.io/apiserver/pkg/util/feature"
3940
"k8s.io/apiserver/pkg/util/openapi"
41+
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
4042
clientgoinformers "k8s.io/client-go/informers"
4143
clientgoclientset "k8s.io/client-go/kubernetes"
4244
"k8s.io/component-base/version"
@@ -67,6 +69,18 @@ type Extra struct {
6769
EnableLogsSupport bool
6870
ProxyTransport *http.Transport
6971

72+
// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
73+
// that can not be served locally
74+
PeerProxy utilpeerproxy.Interface
75+
// PeerEndpointReconcileInterval defines how often the endpoint leases are reconciled in etcd.
76+
PeerEndpointReconcileInterval time.Duration
77+
// PeerEndpointLeaseReconciler updates the peer endpoint leases
78+
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
79+
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
80+
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
81+
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
82+
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
83+
7084
ServiceAccountIssuer serviceaccount.TokenGenerator
7185
ServiceAccountMaxExpiration time.Duration
7286
ExtendExpiration bool

pkg/controlplane/apiserver/peer.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apiserver
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/runtime"
24+
"k8s.io/apiserver/pkg/reconcilers"
25+
genericapiserver "k8s.io/apiserver/pkg/server"
26+
serverstorage "k8s.io/apiserver/pkg/server/storage"
27+
"k8s.io/apiserver/pkg/storageversion"
28+
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
29+
clientgoinformers "k8s.io/client-go/informers"
30+
"k8s.io/client-go/transport"
31+
"k8s.io/klog/v2"
32+
api "k8s.io/kubernetes/pkg/apis/core"
33+
)
34+
35+
const (
36+
// DefaultPeerEndpointReconcileInterval is the default amount of time for how often
37+
// the peer endpoint leases are reconciled.
38+
DefaultPeerEndpointReconcileInterval = 10 * time.Second
39+
// DefaultPeerEndpointReconcilerTTL is the default TTL timeout for peer endpoint
40+
// leases on the storage layer
41+
DefaultPeerEndpointReconcilerTTL = 15 * time.Second
42+
)
43+
44+
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
45+
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
46+
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
47+
if proxyClientCertFile == "" {
48+
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
49+
}
50+
if proxyClientKeyFile == "" {
51+
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
52+
}
53+
// create proxy client config
54+
clientConfig := &transport.Config{
55+
TLS: transport.TLSConfig{
56+
Insecure: false,
57+
CertFile: proxyClientCertFile,
58+
KeyFile: proxyClientKeyFile,
59+
CAFile: peerCAFile,
60+
ServerName: "kubernetes.default.svc",
61+
}}
62+
63+
// build proxy transport
64+
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
65+
if transportBuildingError != nil {
66+
klog.Error(transportBuildingError.Error())
67+
return nil, transportBuildingError
68+
}
69+
return utilpeerproxy.NewPeerProxyHandler(
70+
versionedInformer,
71+
svm,
72+
proxyRoundTripper,
73+
apiServerID,
74+
reconciler,
75+
serializer,
76+
), nil
77+
}
78+
79+
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
80+
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
81+
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
82+
ttl := DefaultPeerEndpointReconcilerTTL
83+
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
84+
if err != nil {
85+
return nil, fmt.Errorf("error creating storage factory config: %w", err)
86+
}
87+
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
88+
return reconciler, err
89+
}

pkg/controlplane/instance.go

Lines changed: 11 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ import (
5454
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
5555
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
5656
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
57-
kruntime "k8s.io/apimachinery/pkg/runtime"
5857
"k8s.io/apimachinery/pkg/runtime/schema"
5958
utilnet "k8s.io/apimachinery/pkg/util/net"
6059
"k8s.io/apimachinery/pkg/util/runtime"
@@ -67,14 +66,10 @@ import (
6766
genericapiserver "k8s.io/apiserver/pkg/server"
6867
"k8s.io/apiserver/pkg/server/dynamiccertificates"
6968
serverstorage "k8s.io/apiserver/pkg/server/storage"
70-
"k8s.io/apiserver/pkg/storageversion"
7169
utilfeature "k8s.io/apiserver/pkg/util/feature"
72-
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
73-
clientgoinformers "k8s.io/client-go/informers"
7470
"k8s.io/client-go/kubernetes"
7571
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
7672
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
77-
"k8s.io/client-go/transport"
7873
"k8s.io/component-helpers/apimachinery/lease"
7974
"k8s.io/klog/v2"
8075
api "k8s.io/kubernetes/pkg/apis/core"
@@ -157,16 +152,6 @@ type Extra struct {
157152
EndpointReconcilerConfig EndpointReconcilerConfig
158153
KubeletClientConfig kubeletclient.KubeletClientConfig
159154

160-
// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
161-
// that can not be served locally
162-
PeerProxy utilpeerproxy.Interface
163-
// PeerEndpointLeaseReconciler updates the peer endpoint leases
164-
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
165-
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
166-
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
167-
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
168-
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
169-
170155
// Values to build the IP addresses used by discovery
171156
// The range of IPs to be assigned to services with type=ClusterIP or greater
172157
ServiceIPRange net.IPNet
@@ -290,6 +275,12 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
290275

291276
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
292277
func (c *Config) Complete() CompletedConfig {
278+
if c.ControlPlane.PeerEndpointReconcileInterval == 0 && c.EndpointReconcilerConfig.Interval != 0 {
279+
// default this to the endpoint reconciler value before the generic
280+
// controlplane completion can kick in
281+
c.ControlPlane.PeerEndpointReconcileInterval = c.EndpointReconcilerConfig.Interval
282+
}
283+
293284
cfg := completedConfig{
294285
c.ControlPlane.Complete(),
295286
&c.Extra,
@@ -508,11 +499,11 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
508499
}
509500

510501
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
511-
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
502+
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
512503
peerEndpointCtrl := peerreconcilers.New(
513504
c.ControlPlane.Generic.APIServerID,
514505
peeraddress,
515-
c.Extra.PeerEndpointLeaseReconciler,
506+
c.ControlPlane.Extra.PeerEndpointLeaseReconciler,
516507
c.Extra.EndpointReconcilerConfig.Interval,
517508
client)
518509
if err != nil {
@@ -529,9 +520,9 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
529520
return nil
530521
})
531522
// Add PostStartHooks for Unknown Version Proxy filter.
532-
if c.Extra.PeerProxy != nil {
523+
if c.ControlPlane.Extra.PeerProxy != nil {
533524
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
534-
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
525+
err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
535526
return err
536527
})
537528
}
@@ -579,7 +570,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
579570
leaseName := m.GenericAPIServer.APIServerID
580571
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
581572

582-
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
573+
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
583574
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
584575
controller := lease.NewController(
585576
clock.RealClock{},
@@ -782,53 +773,6 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
782773
return ret
783774
}
784775

785-
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
786-
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
787-
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (peerreconcilers.PeerEndpointLeaseReconciler, error) {
788-
ttl := DefaultEndpointReconcilerTTL
789-
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
790-
if err != nil {
791-
return nil, fmt.Errorf("error creating storage factory config: %w", err)
792-
}
793-
reconciler, err := peerreconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
794-
return reconciler, err
795-
}
796-
797-
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
798-
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress,
799-
apiServerID string, reconciler peerreconcilers.PeerEndpointLeaseReconciler, serializer kruntime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
800-
if proxyClientCertFile == "" {
801-
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
802-
}
803-
if proxyClientKeyFile == "" {
804-
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
805-
}
806-
// create proxy client config
807-
clientConfig := &transport.Config{
808-
TLS: transport.TLSConfig{
809-
Insecure: false,
810-
CertFile: proxyClientCertFile,
811-
KeyFile: proxyClientKeyFile,
812-
CAFile: peerCAFile,
813-
ServerName: "kubernetes.default.svc",
814-
}}
815-
816-
// build proxy transport
817-
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
818-
if transportBuildingError != nil {
819-
klog.Error(transportBuildingError.Error())
820-
return nil, transportBuildingError
821-
}
822-
return utilpeerproxy.NewPeerProxyHandler(
823-
versionedInformer,
824-
svm,
825-
proxyRoundTripper,
826-
apiServerID,
827-
reconciler,
828-
serializer,
829-
), nil
830-
}
831-
832776
// utility function to get the apiserver address that is used by peer apiservers to proxy
833777
// requests to this apiserver in case the peer is incapable of serving the request
834778
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {

0 commit comments

Comments
 (0)