Skip to content

Commit 69eee1c

Browse files
authored
Merge pull request kubernetes#126149 from sttts/sttts-aggregator-availability-controller-split
Step 11 - Split aggregator availability controller into local and remote part
2 parents 815efa2 + b271428 commit 69eee1c

File tree

9 files changed

+575
-125
lines changed

9 files changed

+575
-125
lines changed

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"net/http"
23+
"sync"
2324
"time"
2425

2526
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -37,6 +38,7 @@ import (
3738
utilfeature "k8s.io/apiserver/pkg/util/feature"
3839
"k8s.io/client-go/kubernetes"
3940
"k8s.io/client-go/transport"
41+
"k8s.io/component-base/metrics/legacyregistry"
4042
"k8s.io/component-base/tracing"
4143
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
4244
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
@@ -49,11 +51,16 @@ import (
4951
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
5052
openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
5153
openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
52-
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
54+
localavailability "k8s.io/kube-aggregator/pkg/controllers/status/local"
55+
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
56+
remoteavailability "k8s.io/kube-aggregator/pkg/controllers/status/remote"
5357
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
5458
openapicommon "k8s.io/kube-openapi/pkg/common"
5559
)
5660

61+
// making sure we only register metrics once into legacy registry
62+
var registerIntoLegacyRegistryOnce sync.Once
63+
5764
func init() {
5865
// we need to add the options (like ListOptions) to empty v1
5966
metav1.AddToGroupVersion(aggregatorscheme.Scheme, schema.GroupVersion{Group: "", Version: "v1"})
@@ -96,12 +103,11 @@ type ExtraConfig struct {
96103

97104
RejectForwardingRedirects bool
98105

99-
// DisableAvailableConditionController disables the controller that updates the Available conditions for
100-
// APIServices, Endpoints and Services. This controller runs in kube-aggregator and can interfere with
101-
// Generic Control Plane components when certain apis are not available.
102-
// TODO: We should find a better way to handle this. For now it will be for Generic Control Plane authors to
103-
// disable this controller if they see issues.
104-
DisableAvailableConditionController bool
106+
// DisableRemoteAvailableConditionController disables the controller that updates the Available conditions for
107+
// remote APIServices via querying endpoints of the referenced services. In generic controlplane use-cases,
108+
// the concept of services and endpoints might differ, and might require another implementation of this
109+
// controller. Local APIService are reconciled nevertheless.
110+
DisableRemoteAvailableConditionController bool
105111
}
106112

107113
// Config represents the configuration needed to create an APIAggregator.
@@ -314,31 +320,54 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
314320
})
315321
}
316322

317-
// If the AvailableConditionController is disabled, we don't need to start the informers
318-
// and the controller.
319-
if !c.ExtraConfig.DisableAvailableConditionController {
320-
availableController, err := statuscontrollers.NewAvailableConditionController(
323+
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
324+
informerFactory.Start(context.Done())
325+
c.GenericConfig.SharedInformerFactory.Start(context.Done())
326+
return nil
327+
})
328+
329+
// create shared (remote and local) availability metrics
330+
// TODO: decouple from legacyregistry
331+
metrics := availabilitymetrics.New()
332+
registerIntoLegacyRegistryOnce.Do(func() { err = metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) })
333+
if err != nil {
334+
return nil, err
335+
}
336+
337+
// always run local availability controller
338+
local, err := localavailability.New(
339+
informerFactory.Apiregistration().V1().APIServices(),
340+
apiregistrationClient.ApiregistrationV1(),
341+
metrics,
342+
)
343+
if err != nil {
344+
return nil, err
345+
}
346+
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-local-available-controller", func(context genericapiserver.PostStartHookContext) error {
347+
// if we end up blocking for long periods of time, we may need to increase workers.
348+
go local.Run(5, context.Done())
349+
return nil
350+
})
351+
352+
// conditionally run remote availability controller. This could be replaced in certain
353+
// generic controlplane use-cases where there is another concept of services and/or endpoints.
354+
if !c.ExtraConfig.DisableRemoteAvailableConditionController {
355+
remote, err := remoteavailability.New(
321356
informerFactory.Apiregistration().V1().APIServices(),
322357
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
323358
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
324359
apiregistrationClient.ApiregistrationV1(),
325360
proxyTransportDial,
326361
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
327362
s.serviceResolver,
363+
metrics,
328364
)
329365
if err != nil {
330366
return nil, err
331367
}
332-
333-
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
334-
informerFactory.Start(context.Done())
335-
c.GenericConfig.SharedInformerFactory.Start(context.Done())
336-
return nil
337-
})
338-
339-
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
368+
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-remote-available-controller", func(context genericapiserver.PostStartHookContext) error {
340369
// if we end up blocking for long periods of time, we may need to increase workers.
341-
go availableController.Run(5, context.Done())
370+
go remote.Run(5, context.Done())
342371
return nil
343372
})
344373
}
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package external
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"k8s.io/apimachinery/pkg/api/equality"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/client-go/tools/cache"
30+
"k8s.io/client-go/util/workqueue"
31+
"k8s.io/klog/v2"
32+
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
33+
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
34+
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
35+
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
36+
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
37+
"k8s.io/kube-aggregator/pkg/controllers"
38+
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
39+
)
40+
41+
// AvailableConditionController handles checking the availability of registered local API services.
42+
type AvailableConditionController struct {
43+
apiServiceClient apiregistrationclient.APIServicesGetter
44+
45+
apiServiceLister listers.APIServiceLister
46+
apiServiceSynced cache.InformerSynced
47+
48+
// To allow injection for testing.
49+
syncFn func(key string) error
50+
51+
queue workqueue.TypedRateLimitingInterface[string]
52+
53+
// metrics registered into legacy registry
54+
metrics *availabilitymetrics.Metrics
55+
}
56+
57+
// New returns a new local availability AvailableConditionController.
58+
func New(
59+
apiServiceInformer informers.APIServiceInformer,
60+
apiServiceClient apiregistrationclient.APIServicesGetter,
61+
metrics *availabilitymetrics.Metrics,
62+
) (*AvailableConditionController, error) {
63+
c := &AvailableConditionController{
64+
apiServiceClient: apiServiceClient,
65+
apiServiceLister: apiServiceInformer.Lister(),
66+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
67+
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
68+
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
69+
// the maximum disruption time to a minimum, but it does prevent hot loops.
70+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
71+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "LocalAvailabilityController"},
72+
),
73+
metrics: metrics,
74+
}
75+
76+
// resync on this one because it is low cardinality and rechecking the actual discovery
77+
// allows us to detect health in a more timely fashion when network connectivity to
78+
// nodes is snipped, but the network still attempts to route there. See
79+
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
80+
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
81+
cache.ResourceEventHandlerFuncs{
82+
AddFunc: c.addAPIService,
83+
UpdateFunc: c.updateAPIService,
84+
DeleteFunc: c.deleteAPIService,
85+
},
86+
30*time.Second)
87+
c.apiServiceSynced = apiServiceHandler.HasSynced
88+
89+
c.syncFn = c.sync
90+
91+
return c, nil
92+
}
93+
94+
func (c *AvailableConditionController) sync(key string) error {
95+
originalAPIService, err := c.apiServiceLister.Get(key)
96+
if apierrors.IsNotFound(err) {
97+
c.metrics.ForgetAPIService(key)
98+
return nil
99+
}
100+
if err != nil {
101+
return err
102+
}
103+
104+
if originalAPIService.Spec.Service != nil {
105+
// this controller only handles local APIServices
106+
return nil
107+
}
108+
109+
// local API services are always considered available
110+
apiService := originalAPIService.DeepCopy()
111+
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
112+
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
113+
return err
114+
}
115+
116+
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
117+
// apiservices. Doing that means we don't want to quickly issue no-op updates.
118+
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
119+
// update this metric on every sync operation to reflect the actual state
120+
c.metrics.SetUnavailableGauge(newAPIService)
121+
122+
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
123+
return newAPIService, nil
124+
}
125+
126+
orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
127+
now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
128+
unknown := apiregistrationv1.APIServiceCondition{
129+
Type: apiregistrationv1.Available,
130+
Status: apiregistrationv1.ConditionUnknown,
131+
}
132+
if orig == nil {
133+
orig = &unknown
134+
}
135+
if now == nil {
136+
now = &unknown
137+
}
138+
if *orig != *now {
139+
klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
140+
}
141+
142+
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
143+
if err != nil {
144+
return nil, err
145+
}
146+
147+
c.metrics.SetUnavailableCounter(originalAPIService, newAPIService)
148+
return newAPIService, nil
149+
}
150+
151+
// Run starts the AvailableConditionController loop which manages the availability condition of API services.
152+
func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
153+
defer utilruntime.HandleCrash()
154+
defer c.queue.ShutDown()
155+
156+
klog.Info("Starting LocalAvailability controller")
157+
defer klog.Info("Shutting down LocalAvailability controller")
158+
159+
// This waits not just for the informers to sync, but for our handlers
160+
// to be called; since the handlers are three different ways of
161+
// enqueueing the same thing, waiting for this permits the queue to
162+
// maximally de-duplicate the entries.
163+
if !controllers.WaitForCacheSync("LocalAvailability", stopCh, c.apiServiceSynced) {
164+
return
165+
}
166+
167+
for i := 0; i < workers; i++ {
168+
go wait.Until(c.runWorker, time.Second, stopCh)
169+
}
170+
171+
<-stopCh
172+
}
173+
174+
func (c *AvailableConditionController) runWorker() {
175+
for c.processNextWorkItem() {
176+
}
177+
}
178+
179+
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
180+
func (c *AvailableConditionController) processNextWorkItem() bool {
181+
key, quit := c.queue.Get()
182+
if quit {
183+
return false
184+
}
185+
defer c.queue.Done(key)
186+
187+
err := c.syncFn(key)
188+
if err == nil {
189+
c.queue.Forget(key)
190+
return true
191+
}
192+
193+
utilruntime.HandleError(fmt.Errorf("%v failed with: %w", key, err))
194+
c.queue.AddRateLimited(key)
195+
196+
return true
197+
}
198+
199+
func (c *AvailableConditionController) addAPIService(obj interface{}) {
200+
castObj := obj.(*apiregistrationv1.APIService)
201+
klog.V(4).Infof("Adding %s", castObj.Name)
202+
c.queue.Add(castObj.Name)
203+
}
204+
205+
func (c *AvailableConditionController) updateAPIService(oldObj, _ interface{}) {
206+
oldCastObj := oldObj.(*apiregistrationv1.APIService)
207+
klog.V(4).Infof("Updating %s", oldCastObj.Name)
208+
c.queue.Add(oldCastObj.Name)
209+
}
210+
211+
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
212+
castObj, ok := obj.(*apiregistrationv1.APIService)
213+
if !ok {
214+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
215+
if !ok {
216+
klog.Errorf("Couldn't get object from tombstone %#v", obj)
217+
return
218+
}
219+
castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
220+
if !ok {
221+
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
222+
return
223+
}
224+
}
225+
klog.V(4).Infof("Deleting %q", castObj.Name)
226+
c.queue.Add(castObj.Name)
227+
}

0 commit comments

Comments
 (0)