Skip to content

Commit 4c26165

Browse files
authored
Merge pull request #375 from werf/feat/optimize-kube-api-load
feat: greatly decrease Kubernetes apiserver load
2 parents f394e96 + 40930a9 commit 4c26165

File tree

27 files changed

+1204
-736
lines changed

27 files changed

+1204
-736
lines changed

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/werf/kubedog
22

3-
go 1.21
4-
5-
toolchain go1.21.6
3+
go 1.23.0
64

75
require (
86
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d

pkg/informer/factory.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package informer
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/apimachinery/pkg/runtime/schema"
9+
"k8s.io/client-go/dynamic"
10+
"k8s.io/client-go/dynamic/dynamicinformer"
11+
12+
"github.com/werf/kubedog/pkg/trackers/dyntracker/util"
13+
)
14+
15+
type ConcurrentInformerFactoryOptions struct{}
16+
17+
func NewConcurrentInformerFactory(stopCh <-chan struct{}, watchErrCh chan<- error, dynamicClient dynamic.Interface, opts ConcurrentInformerFactoryOptions) *util.Concurrent[*InformerFactory] {
18+
lock := &sync.RWMutex{}
19+
return util.NewConcurrentWithLock(&InformerFactory{
20+
dynamicClient: dynamicClient,
21+
namespacedFactories: make(map[string]dynamicinformer.DynamicSharedInformerFactory),
22+
informersLock: lock,
23+
stopCh: stopCh,
24+
watchErrCh: watchErrCh,
25+
}, lock)
26+
}
27+
28+
type InformerFactory struct {
29+
clusteredFactory dynamicinformer.DynamicSharedInformerFactory
30+
dynamicClient dynamic.Interface
31+
informersLock *sync.RWMutex
32+
namespacedFactories map[string]dynamicinformer.DynamicSharedInformerFactory
33+
stopCh <-chan struct{}
34+
watchErrCh chan<- error
35+
}
36+
37+
func (f *InformerFactory) ForNamespace(gvr schema.GroupVersionResource, namespace string) (*util.Concurrent[*Informer], error) {
38+
factory, found := f.namespacedFactories[namespace]
39+
if !found {
40+
factory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.dynamicClient, 0, namespace, nil)
41+
f.namespacedFactories[namespace] = factory
42+
}
43+
44+
informer, err := newInformerFromFactory(gvr, factory, f.stopCh, f.watchErrCh, informerFromFactoryOptions{
45+
Namespace: namespace,
46+
})
47+
if err != nil {
48+
return nil, fmt.Errorf("construct informer: %w", err)
49+
}
50+
51+
return util.NewConcurrentWithLock(informer, f.informersLock), nil
52+
}
53+
54+
func (f *InformerFactory) Clustered(gvr schema.GroupVersionResource) (*util.Concurrent[*Informer], error) {
55+
if f.clusteredFactory == nil {
56+
f.clusteredFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.dynamicClient, 0, metav1.NamespaceAll, nil)
57+
}
58+
59+
informer, err := newInformerFromFactory(gvr, f.clusteredFactory, f.stopCh, f.watchErrCh, informerFromFactoryOptions{})
60+
if err != nil {
61+
return nil, fmt.Errorf("construct informer: %w", err)
62+
}
63+
64+
return util.NewConcurrentWithLock(informer, f.informersLock), nil
65+
}

pkg/informer/informer.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package informer
2+
3+
import (
4+
"fmt"
5+
"io"
6+
7+
apierrors "k8s.io/apimachinery/pkg/api/errors"
8+
"k8s.io/apimachinery/pkg/labels"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/apimachinery/pkg/runtime/schema"
11+
"k8s.io/client-go/dynamic/dynamicinformer"
12+
"k8s.io/client-go/tools/cache"
13+
14+
"github.com/werf/kubedog/pkg/tracker/debug"
15+
)
16+
17+
type informerFromFactoryOptions struct {
18+
Namespace string
19+
}
20+
21+
func newInformerFromFactory(gvr schema.GroupVersionResource, factory dynamicinformer.DynamicSharedInformerFactory, stopCh <-chan struct{}, watchErrCh chan<- error, opts informerFromFactoryOptions) (*Informer, error) {
22+
informer := factory.ForResource(gvr)
23+
24+
if err := setWatchErrorHandler(informer.Informer().SetWatchErrorHandler, watchErrCh, gvr); err != nil {
25+
return nil, fmt.Errorf("set watch error handler for resource %s: %w", gvr.String(), err)
26+
}
27+
28+
return &Informer{
29+
factory: factory,
30+
informer: informer.Informer(),
31+
lister: informer.Lister(),
32+
namespace: opts.Namespace,
33+
stopCh: stopCh,
34+
}, nil
35+
}
36+
37+
type Informer struct {
38+
factory dynamicinformer.DynamicSharedInformerFactory
39+
informer cache.SharedIndexInformer
40+
lister cache.GenericLister
41+
namespace string
42+
stopCh <-chan struct{}
43+
}
44+
45+
func (i *Informer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
46+
return i.informer.AddEventHandler(handler)
47+
}
48+
49+
func (i *Informer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
50+
return i.informer.RemoveEventHandler(handle)
51+
}
52+
53+
func (i *Informer) Run() {
54+
i.factory.Start(i.stopCh)
55+
}
56+
57+
func (i *Informer) List(selector labels.Selector) (ret []runtime.Object, err error) {
58+
if i.namespace != "" {
59+
return i.lister.ByNamespace(i.namespace).List(selector)
60+
}
61+
62+
return i.lister.List(selector)
63+
}
64+
65+
func (i *Informer) Get(name string) (runtime.Object, error) {
66+
if i.namespace != "" {
67+
return i.lister.ByNamespace(i.namespace).Get(name)
68+
}
69+
70+
return i.lister.Get(name)
71+
}
72+
73+
func setWatchErrorHandler(setWatchErrorHandler func(handler cache.WatchErrorHandler) error, watchErrCh chan<- error, gvr schema.GroupVersionResource) error {
74+
if err := setWatchErrorHandler(
75+
func(r *cache.Reflector, err error) {
76+
isExpiredError := func(err error) bool {
77+
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
78+
}
79+
80+
// Based on: k8s.io/client-go@v0.30.11/tools/cache/reflector.go
81+
switch {
82+
case isExpiredError(err):
83+
if debug.Debug() {
84+
fmt.Printf("[SetWatchErrorHandler] %s watch closed with expired error: %s\n", gvr.String(), err)
85+
}
86+
case err == io.EOF:
87+
// watch closed normally
88+
case err == io.ErrUnexpectedEOF:
89+
if debug.Debug() {
90+
fmt.Printf("[SetWatchErrorHandler] %s watch closed with unexpected EOF error: %s\n", gvr.String(), err)
91+
}
92+
default:
93+
if debug.Debug() {
94+
fmt.Printf("[SetWatchErrorHandler] %s watch closed with an error: %s\n", gvr.String(), err)
95+
}
96+
97+
watchErrCh <- fmt.Errorf("unrecoverable watch error for %s: %w", gvr.String(), err)
98+
}
99+
},
100+
); err != nil {
101+
if err.Error() == "informer has already started" {
102+
return nil
103+
}
104+
105+
return err
106+
}
107+
108+
return nil
109+
}

pkg/tracker/canary/feed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (f *feed) Track(name, namespace string, kube kubernetes.Interface, opts tra
6767
ctx, cancel := watchtools.ContextWithOptionalTimeout(parentContext, opts.Timeout)
6868
defer cancel()
6969

70-
canary := NewTracker(name, namespace, kube, nil, opts)
70+
canary := NewTracker(name, namespace, kube, nil, nil, opts)
7171

7272
go func() {
7373
err := canary.Track(ctx)

pkg/tracker/canary/tracker.go

Lines changed: 66 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,26 @@ import (
66
"time"
77

88
"github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
9-
flaggerv1beta1 "github.com/fluxcd/flagger/pkg/client/clientset/versioned/typed/flagger/v1beta1"
10-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11-
"k8s.io/apimachinery/pkg/fields"
9+
flaggerscheme "github.com/fluxcd/flagger/pkg/client/clientset/versioned/scheme"
10+
"github.com/samber/lo"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1212
"k8s.io/apimachinery/pkg/runtime"
1313
"k8s.io/apimachinery/pkg/runtime/schema"
14-
"k8s.io/apimachinery/pkg/watch"
1514
"k8s.io/client-go/dynamic"
1615
"k8s.io/client-go/kubernetes"
16+
"k8s.io/client-go/kubernetes/scheme"
1717
"k8s.io/client-go/tools/cache"
18-
watchtools "k8s.io/client-go/tools/watch"
1918

20-
"github.com/werf/kubedog/pkg/kube"
19+
"github.com/werf/kubedog/pkg/informer"
2120
"github.com/werf/kubedog/pkg/tracker"
2221
"github.com/werf/kubedog/pkg/tracker/debug"
22+
"github.com/werf/kubedog/pkg/trackers/dyntracker/util"
2323
)
2424

25+
func init() {
26+
flaggerscheme.AddToScheme(scheme.Scheme)
27+
}
28+
2529
type FailedReport struct {
2630
FailedReason string
2731
CanaryStatus CanaryStatus
@@ -54,14 +58,15 @@ type Tracker struct {
5458
dynamicClient dynamic.Interface
5559
}
5660

57-
func NewTracker(name, namespace string, kube kubernetes.Interface, dynamicClient dynamic.Interface, opts tracker.Options) *Tracker {
61+
func NewTracker(name, namespace string, kube kubernetes.Interface, dynamicClient dynamic.Interface, informerFactory *util.Concurrent[*informer.InformerFactory], opts tracker.Options) *Tracker {
5862
return &Tracker{
5963
Tracker: tracker.Tracker{
6064
Kube: kube,
6165
Namespace: namespace,
6266
FullResourceName: fmt.Sprintf("canary/%s", name),
6367
ResourceName: name,
6468
LogsFromTime: opts.LogsFromTime,
69+
InformerFactory: informerFactory,
6570
},
6671

6772
Added: make(chan CanaryStatus, 1),
@@ -84,10 +89,12 @@ func NewTracker(name, namespace string, kube kubernetes.Interface, dynamicClient
8489
}
8590

8691
func (canary *Tracker) Track(ctx context.Context) error {
87-
err := canary.runInformer(ctx)
92+
canaryInformerCleanupFn, err := canary.runInformer(ctx)
8893
if err != nil {
8994
return err
9095
}
96+
defer canaryInformerCleanupFn()
97+
9198
for {
9299
select {
93100
case object := <-canary.objectAdded:
@@ -123,85 +130,67 @@ func (canary *Tracker) Track(ctx context.Context) error {
123130
}
124131
}
125132

126-
func (canary *Tracker) runInformer(ctx context.Context) error {
127-
tweakListOptions := func(options metav1.ListOptions) metav1.ListOptions {
128-
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", canary.ResourceName).String()
129-
return options
130-
}
131-
132-
var lw *cache.ListWatch
133-
if canary.dynamicClient != nil {
134-
lw = &cache.ListWatch{
135-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
136-
return canary.dynamicClient.Resource(schema.GroupVersionResource{
137-
Group: "flagger.app",
138-
Version: "v1beta1",
139-
Resource: "canaries",
140-
}).Namespace(canary.Namespace).List(ctx, tweakListOptions(options))
141-
},
142-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
143-
return canary.dynamicClient.Resource(schema.GroupVersionResource{
144-
Group: "flagger.app",
145-
Version: "v1beta1",
146-
Resource: "canaries",
147-
}).Namespace(canary.Namespace).Watch(ctx, tweakListOptions(options))
148-
},
149-
}
150-
} else {
151-
config, err := kube.GetKubeConfig(kube.KubeConfigOptions{})
133+
func (canary *Tracker) runInformer(ctx context.Context) (cleanupFn func(), err error) {
134+
var inform *util.Concurrent[*informer.Informer]
135+
if err := canary.InformerFactory.RWTransactionErr(func(factory *informer.InformerFactory) error {
136+
inform, err = factory.ForNamespace(schema.GroupVersionResource{
137+
Group: "flagger.app",
138+
Version: "v1beta1",
139+
Resource: "canaries",
140+
}, canary.Namespace)
152141
if err != nil {
153-
fmt.Print(err)
142+
return fmt.Errorf("get informer from factory: %w", err)
154143
}
155144

156-
flagger, err := flaggerv1beta1.NewForConfig(config.Config)
157-
if err != nil {
158-
fmt.Print(err)
159-
}
145+
return nil
146+
}); err != nil {
147+
return nil, err
148+
}
160149

161-
lw = &cache.ListWatch{
162-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
163-
return flagger.Canaries(canary.Namespace).List(ctx, tweakListOptions(options))
164-
},
165-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
166-
return flagger.Canaries(canary.Namespace).Watch(ctx, tweakListOptions(options))
150+
if err := inform.RWTransactionErr(func(inf *informer.Informer) error {
151+
handler, err := inf.AddEventHandler(
152+
cache.FilteringResourceEventHandler{
153+
FilterFunc: func(obj interface{}) bool {
154+
canaryObj := &v1beta1.Canary{}
155+
lo.Must0(runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(*unstructured.Unstructured).Object, canaryObj))
156+
return canaryObj.Name == canary.ResourceName &&
157+
canaryObj.Namespace == canary.Namespace
158+
},
159+
Handler: cache.ResourceEventHandlerFuncs{
160+
AddFunc: func(obj interface{}) {
161+
canaryObj := &v1beta1.Canary{}
162+
lo.Must0(runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(*unstructured.Unstructured).Object, canaryObj))
163+
canary.objectAdded <- canaryObj
164+
},
165+
UpdateFunc: func(oldObj, newObj interface{}) {
166+
canaryObj := &v1beta1.Canary{}
167+
lo.Must0(runtime.DefaultUnstructuredConverter.FromUnstructured(newObj.(*unstructured.Unstructured).Object, canaryObj))
168+
canary.objectModified <- canaryObj
169+
},
170+
DeleteFunc: func(obj interface{}) {
171+
canaryObj := &v1beta1.Canary{}
172+
lo.Must0(runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(*unstructured.Unstructured).Object, canaryObj))
173+
canary.objectDeleted <- canaryObj
174+
},
175+
},
167176
},
177+
)
178+
if err != nil {
179+
return fmt.Errorf("add event handler: %w", err)
168180
}
169-
}
170-
171-
go func() {
172-
_, err := watchtools.UntilWithSync(ctx, lw, &v1beta1.Canary{}, nil, func(e watch.Event) (bool, error) {
173-
if debug.Debug() {
174-
fmt.Printf("Canary `%s` informer event: %#v\n", canary.ResourceName, e.Type)
175-
}
176-
177-
var object *v1beta1.Canary
178-
179-
if e.Type != watch.Error {
180-
var ok bool
181-
object, ok = e.Object.(*v1beta1.Canary)
182-
if !ok {
183-
return true, fmt.Errorf("expected %s to be a *v1beta1.Canary, got %T", canary.ResourceName, e.Object)
184-
}
185-
}
186181

187-
switch e.Type {
188-
case watch.Added:
189-
canary.objectAdded <- object
190-
case watch.Modified:
191-
canary.objectModified <- object
192-
case watch.Deleted:
193-
canary.objectDeleted <- object
194-
}
182+
cleanupFn = func() {
183+
inf.RemoveEventHandler(handler)
184+
}
195185

196-
return false, nil
197-
})
186+
inf.Run()
198187

199-
if err != tracker.AdaptInformerError(err) {
200-
canary.errors <- fmt.Errorf("canary informer error: %w", err)
201-
}
202-
}()
188+
return nil
189+
}); err != nil {
190+
return nil, err
191+
}
203192

204-
return nil
193+
return cleanupFn, nil
205194
}
206195

207196
func (canary *Tracker) handleCanaryState(ctx context.Context, object *v1beta1.Canary) error {

0 commit comments

Comments
 (0)