Skip to content

Commit fe123cb

Browse files
committed
Add status watch label & field selector filters
Using a label filter significantly cuts down on watch events and memory used by the informer's watch cache. But you'll need to set the labels on the objects yourself before providing them to the applier.
1 parent 227a03f commit fe123cb

File tree

6 files changed

+857
-68
lines changed

6 files changed

+857
-68
lines changed

pkg/apply/applier_builder.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,8 @@ func (b *ApplierBuilder) WithStatusWatcher(statusWatcher watcher.StatusWatcher)
8686
b.statusWatcher = statusWatcher
8787
return b
8888
}
89+
90+
func (b *ApplierBuilder) WithStatusWatcherFilters(filters *watcher.Filters) *ApplierBuilder {
91+
b.statusWatcherFilters = filters
92+
return b
93+
}

pkg/apply/builder.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type commonBuilder struct {
2727
restConfig *rest.Config
2828
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
2929
statusWatcher watcher.StatusWatcher
30+
statusWatcherFilters *watcher.Filters
3031
}
3132

3233
func (cb *commonBuilder) finalize() (*commonBuilder, error) {
@@ -78,7 +79,15 @@ func (cb *commonBuilder) finalize() (*commonBuilder, error) {
7879
cx.unstructuredClientForMapping = cx.factory.UnstructuredClientForMapping
7980
}
8081
if cx.statusWatcher == nil {
81-
cx.statusWatcher = watcher.NewDefaultStatusWatcher(cx.client, cx.mapper)
82+
statusWatcher := watcher.NewDefaultStatusWatcher(cx.client, cx.mapper)
83+
if cx.statusWatcherFilters != nil {
84+
statusWatcher.Filters = cx.statusWatcherFilters
85+
}
86+
cx.statusWatcher = statusWatcher
87+
} else if cx.statusWatcherFilters != nil {
88+
// If you want to use a custom status watcher with a label selector,
89+
// configure it before injecting the status watcher.
90+
return nil, errors.New("status watcher and status watcher filters must not both be provided")
8291
}
8392
return &cx, nil
8493
}

pkg/kstatus/watcher/default_status_watcher.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"k8s.io/apimachinery/pkg/api/meta"
1212
"k8s.io/apimachinery/pkg/runtime/schema"
1313
"k8s.io/client-go/dynamic"
14+
"k8s.io/client-go/tools/cache"
1415
"k8s.io/klog/v2"
1516
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
1617
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
@@ -43,6 +44,15 @@ type DefaultStatusWatcher struct {
4344
// required for computing parent object status, to compensate for
4445
// controllers that aren't following status conventions.
4546
ClusterReader engine.ClusterReader
47+
48+
// Indexers control how the watch cache is indexed, allowing namespace
49+
// filtering and field selectors. If you watch at namespace scope, you must
50+
// provide the namespace indexer. If you specify a field selector filter,
51+
// you must also provide an indexer for that field.
52+
Indexers cache.Indexers
53+
54+
// Filters allows filtering the objects being watched.
55+
Filters *Filters
4656
}
4757

4858
var _ StatusWatcher = &DefaultStatusWatcher{}
@@ -60,6 +70,7 @@ func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMa
6070
DynamicClient: dynamicClient,
6171
Mapper: mapper,
6272
},
73+
Indexers: DefaultIndexers(),
6374
}
6475
}
6576

@@ -88,13 +99,18 @@ func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadata
8899
}
89100

90101
informer := &ObjectStatusReporter{
91-
InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
92-
Mapper: w.Mapper,
93-
StatusReader: w.StatusReader,
94-
ClusterReader: w.ClusterReader,
95-
Targets: targets,
96-
ObjectFilter: &AllowListObjectFilter{AllowList: ids},
97-
RESTScope: scope,
102+
InformerFactory: &DynamicInformerFactory{
103+
Client: w.DynamicClient,
104+
ResyncPeriod: w.ResyncPeriod,
105+
Indexers: w.Indexers,
106+
Filters: w.Filters,
107+
},
108+
Mapper: w.Mapper,
109+
StatusReader: w.StatusReader,
110+
ClusterReader: w.ClusterReader,
111+
Targets: targets,
112+
ObjectFilter: &AllowListObjectFilter{AllowList: ids},
113+
RESTScope: scope,
98114
}
99115
return informer.Start(ctx)
100116
}

0 commit comments

Comments
 (0)