Skip to content

Commit a622769

Browse files
authored
Merge pull request kubernetes#128402 from richabanker/mvp-agg-discovery
KEP 4020: Replace StorageVersionAPI with aggregated discovery to fetch served resources by a peer apiserver
2 parents 4dfed14 + 8b2cee8 commit a622769

File tree

16 files changed

+1021
-483
lines changed

16 files changed

+1021
-483
lines changed

pkg/controller/storageversiongc/gc_controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/client-go/tools/cache"
3737
"k8s.io/client-go/util/workqueue"
3838
"k8s.io/kubernetes/pkg/controlplane"
39+
"k8s.io/kubernetes/pkg/controlplane/apiserver"
3940

4041
"k8s.io/klog/v2"
4142
)
@@ -214,7 +215,7 @@ func (c *Controller) syncStorageVersion(ctx context.Context, name string) error
214215
for _, v := range sv.Status.StorageVersions {
215216
lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{})
216217
if err != nil || lease == nil || lease.Labels == nil ||
217-
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
218+
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != apiserver.KubeAPIServer {
218219
// We cannot find a corresponding identity lease from apiserver as well.
219220
// We need to clean up this storage version.
220221
hasInvalidID = true
@@ -243,7 +244,7 @@ func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverint
243244
for _, sv := range obj.Status.StorageVersions {
244245
lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
245246
if err != nil || lease == nil || lease.Labels == nil ||
246-
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
247+
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != apiserver.KubeAPIServer {
247248
// we cannot find a corresponding identity lease in cache, enqueue the storageversion
248249
logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name)
249250
c.storageVersionQueue.Add(obj.Name)
@@ -269,7 +270,7 @@ func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) {
269270

270271
if castObj.Namespace == metav1.NamespaceSystem &&
271272
castObj.Labels != nil &&
272-
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
273+
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == apiserver.KubeAPIServer {
273274
logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name)
274275
c.enqueueLease(castObj)
275276
}

pkg/controlplane/apiserver/config.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,17 @@ func CreateConfig(
311311
if err != nil {
312312
return nil, nil, err
313313
}
314-
// build peer proxy config only if peer ca file exists
315314
if opts.PeerCAFile != "" {
316-
config.PeerProxy, err = BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
317-
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.Generic.Serializer)
315+
leaseInformer := versionedInformers.Coordination().V1().Leases()
316+
config.PeerProxy, err = BuildPeerProxy(
317+
leaseInformer,
318+
genericConfig.LoopbackClientConfig,
319+
opts.ProxyClientCertFile,
320+
opts.ProxyClientKeyFile, opts.PeerCAFile,
321+
opts.PeerAdvertiseAddress,
322+
genericConfig.APIServerID,
323+
config.Extra.PeerEndpointLeaseReconciler,
324+
config.Generic.Serializer)
318325
if err != nil {
319326
return nil, nil, err
320327
}

pkg/controlplane/apiserver/options/validation.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,6 @@ func validateNodeSelectorAuthorizationFeature() []error {
7878
return nil
7979
}
8080

81-
func validateUnknownVersionInteroperabilityProxyFeature() []error {
82-
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
83-
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
84-
return nil
85-
}
86-
return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")}
87-
}
88-
return nil
89-
}
90-
9181
func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error {
9282
err := []error{}
9383
if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
@@ -142,7 +132,6 @@ func (s *Options) Validate() []error {
142132
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
143133
errs = append(errs, validateTokenRequest(s)...)
144134
errs = append(errs, s.Metrics.Validate()...)
145-
errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...)
146135
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)
147136
errs = append(errs, validateNodeSelectorAuthorizationFeature()...)
148137
errs = append(errs, validateServiceAccountTokenSigningConfig(s)...)

pkg/controlplane/apiserver/options/validation_test.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
genericoptions "k8s.io/apiserver/pkg/server/options"
2727
utilfeature "k8s.io/apiserver/pkg/util/feature"
2828
basecompatibility "k8s.io/component-base/compatibility"
29-
"k8s.io/component-base/featuregate"
3029
basemetrics "k8s.io/component-base/metrics"
3130
"k8s.io/kubernetes/pkg/features"
3231

@@ -165,34 +164,6 @@ func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) {
165164
}
166165
}
167166

168-
func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) {
169-
const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled"
170-
tests := []struct {
171-
name string
172-
featuresEnabled []featuregate.Feature
173-
}{
174-
{
175-
name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI",
176-
featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy},
177-
},
178-
}
179-
180-
for _, test := range tests {
181-
t.Run(test.name, func(t *testing.T) {
182-
for _, feature := range test.featuresEnabled {
183-
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true)
184-
}
185-
var errMessageGot string
186-
if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 {
187-
errMessageGot = errs[0].Error()
188-
}
189-
if !strings.Contains(errMessageGot, conflict) {
190-
t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot)
191-
}
192-
})
193-
}
194-
}
195-
196167
func TestValidateOptions(t *testing.T) {
197168
testCases := []struct {
198169
name string

pkg/controlplane/apiserver/peer.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import (
2424

2525
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/apiserver/pkg/reconcilers"
27+
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/transport"
29+
2730
genericapiserver "k8s.io/apiserver/pkg/server"
2831
serverstorage "k8s.io/apiserver/pkg/server/storage"
29-
"k8s.io/apiserver/pkg/storageversion"
3032
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
31-
clientgoinformers "k8s.io/client-go/informers"
32-
"k8s.io/client-go/transport"
33-
"k8s.io/klog/v2"
33+
coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
3434
api "k8s.io/kubernetes/pkg/apis/core"
3535
)
3636

@@ -43,17 +43,24 @@ const (
4343
DefaultPeerEndpointReconcilerTTL = 15 * time.Second
4444
)
4545

46-
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
47-
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
48-
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
46+
func BuildPeerProxy(
47+
leaseInformer coordinationv1informers.LeaseInformer,
48+
loopbackClientConfig *rest.Config,
49+
proxyClientCertFile string,
50+
proxyClientKeyFile string,
51+
peerCAFile string,
52+
peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
53+
apiServerID string,
54+
reconciler reconcilers.PeerEndpointLeaseReconciler,
55+
serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
4956
if proxyClientCertFile == "" {
5057
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
5158
}
5259
if proxyClientKeyFile == "" {
5360
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
5461
}
55-
// create proxy client config
56-
clientConfig := &transport.Config{
62+
63+
proxyClientConfig := &transport.Config{
5764
TLS: transport.TLSConfig{
5865
Insecure: false,
5966
CertFile: proxyClientCertFile,
@@ -62,20 +69,15 @@ func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, s
6269
ServerName: "kubernetes.default.svc",
6370
}}
6471

65-
// build proxy transport
66-
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
67-
if transportBuildingError != nil {
68-
klog.Error(transportBuildingError.Error())
69-
return nil, transportBuildingError
70-
}
7172
return utilpeerproxy.NewPeerProxyHandler(
72-
versionedInformer,
73-
svm,
74-
proxyRoundTripper,
7573
apiServerID,
74+
IdentityLeaseComponentLabelKey+"="+KubeAPIServer,
75+
leaseInformer,
7676
reconciler,
7777
serializer,
78-
), nil
78+
loopbackClientConfig,
79+
proxyClientConfig,
80+
)
7981
}
8082

8183
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop

pkg/controlplane/apiserver/server.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ const (
7474
// 1. the lease is an identity lease (different from leader election leases)
7575
// 2. which component owns this lease
7676
IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity"
77+
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
78+
KubeAPIServer = "kube-apiserver"
7779
)
7880

7981
// Server is a struct that contains a generic control plane apiserver instance
@@ -212,7 +214,20 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
212214
return nil
213215
})
214216
if c.Extra.PeerProxy != nil {
215-
s.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
217+
// Run local-discovery sync loop
218+
s.GenericAPIServer.AddPostStartHookOrDie("local-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error {
219+
err := c.Extra.PeerProxy.RunLocalDiscoveryCacheSync(context.Done())
220+
return err
221+
})
222+
223+
// Run peer-discovery sync loop.
224+
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error {
225+
go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1)
226+
return nil
227+
})
228+
229+
// Wait for handler to be ready.
230+
s.GenericAPIServer.AddPostStartHookOrDie("mixed-version-proxy-handler", func(context genericapiserver.PostStartHookContext) error {
216231
err := c.Extra.PeerProxy.WaitForCacheSync(context.Done())
217232
return err
218233
})

pkg/controlplane/instance.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ const (
117117
// 2. which component owns this lease
118118
// TODO(sttts): remove this indirection
119119
IdentityLeaseComponentLabelKey = controlplaneapiserver.IdentityLeaseComponentLabelKey
120-
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
121-
KubeAPIServer = "kube-apiserver"
122120
// repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops
123121
repairLoopInterval = 3 * time.Minute
124122
)
@@ -314,7 +312,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
314312
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
315313
}
316314

317-
cp, err := c.ControlPlane.New(KubeAPIServer, delegationTarget)
315+
cp, err := c.ControlPlane.New(controlplaneapiserver.KubeAPIServer, delegationTarget)
318316
if err != nil {
319317
return nil, err
320318
}

staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ func Register() {
5050
})
5151
}
5252

53+
// Only used for tests.
54+
func Reset() {
55+
legacyregistry.Reset()
56+
}
57+
5358
// IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver
5459
func IncPeerProxiedRequest(ctx context.Context, status string) {
5560
peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)

0 commit comments

Comments
 (0)