Skip to content

Commit c469493

Browse files
committed
feat: replace StatusPoller w/ StatusWatcher
- Add DefaultStatusWatcher that wraps DynamicClient and manages informers for a set of resource objects. - Supports two modes: root-scoped & namespace-scoped. - Root-scoped mode uses root-scoped informers to efficiency and performance. - Namespace-scoped mode uses namespace-scoped informers to minimize the permissions needed to run and the size of the in-memory object cache. - Automatic mode selects which mode to use based on whether the objects being watched are in one or multiple namespaces. This is the default mode, optimizing for performance. - If CRDs are being watched, the creation/deletion of CRDs can cause informers for those custom resources to be created/deleted. - In namespace-scope mode, if namespaces are being watched, the creation/deletion of namespaces can also trigger informers to be created/deleted. - All creates/updates/deletes to CRDs also cause RESTMapper reset. - Allow pods to be unschedulable for 15s before reporting the status as Failed. Any update resets the timer. - Add BlindStatusWatcher for testing and disabling for dry-run. - Add DynamicClusterReader that wraps DynamicClient. This is now used to look up generated resources (ex: Deployment > ReplicaSets > Pods). - Add DefaultStatusReader which uses a DelegatingStatusReader to wrap a list of conventional and specific StatusReaders. This should make it easier to extend the list of StatusReaders. - Move some pending WaitEvents to be optional in tests, now that StatusWatcher can resolve their status before the WaitTask starts. - Add a new Thousand Deployments stress test (10x kind nodes) - Add some new logs for easier debugging - Add internal SyncEvent so that apply/delete tasks don't start until the StatusWatcher has finished initial synchronization. This helps avoid missing events from actions that happen while synchronization is incomplete. - Filter optional pending WaitEvents when testing. BREAKING CHANGE: Replace StatusPoller w/ StatusWatcher BREAKING CHANGE: Remove PollInterval (obsolete with watcher)
1 parent 02d2092 commit c469493

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3251
-204
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ test-e2e-focus: "$(MYGOBIN)/ginkgo" "$(MYGOBIN)/kind"
101101

102102
.PHONY: test-stress
103103
test-stress: "$(MYGOBIN)/ginkgo" "$(MYGOBIN)/kind"
104-
kind delete cluster --name=cli-utils-e2e && kind create cluster --name=cli-utils-e2e --wait 5m
104+
kind delete cluster --name=cli-utils-e2e && kind create cluster --name=cli-utils-e2e --wait 5m \
105+
--config=./test/stress/kind-cluster.yaml
106+
kubectl wait nodes --for=condition=ready --all --timeout=5m
105107
"$(MYGOBIN)/ginkgo" -v ./test/stress/... -- -v 3
106108

107109
.PHONY: vet

cmd/apply/cmdapply.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ func GetRunner(factory cmdutil.Factory, invFactory inventory.ClientFactory,
4545

4646
cmd.Flags().StringVar(&r.output, "output", printers.DefaultPrinter(),
4747
fmt.Sprintf("Output format, must be one of %s", strings.Join(printers.SupportedPrinters(), ",")))
48-
cmd.Flags().DurationVar(&r.period, "poll-period", 2*time.Second,
49-
"Polling period for resource statuses.")
5048
cmd.Flags().DurationVar(&r.reconcileTimeout, "reconcile-timeout", time.Duration(0),
5149
"Timeout threshold for waiting for all resources to reach the Current status.")
5250
cmd.Flags().BoolVar(&r.noPrune, "no-prune", r.noPrune,
@@ -81,7 +79,6 @@ type Runner struct {
8179

8280
serverSideOptions common.ServerSideOptions
8381
output string
84-
period time.Duration
8582
reconcileTimeout time.Duration
8683
noPrune bool
8784
prunePropagationPolicy string
@@ -156,7 +153,6 @@ func (r *Runner) RunE(cmd *cobra.Command, args []string) error {
156153

157154
ch := a.Run(ctx, inv, objs, apply.ApplierOptions{
158155
ServerSideOptions: r.serverSideOptions,
159-
PollInterval: r.period,
160156
ReconcileTimeout: r.reconcileTimeout,
161157
// If we are not waiting for status, tell the applier to not
162158
// emit the events.

pkg/apply/applier.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@ import (
2020
"sigs.k8s.io/cli-utils/pkg/apply/filter"
2121
"sigs.k8s.io/cli-utils/pkg/apply/info"
2222
"sigs.k8s.io/cli-utils/pkg/apply/mutator"
23-
"sigs.k8s.io/cli-utils/pkg/apply/poller"
2423
"sigs.k8s.io/cli-utils/pkg/apply/prune"
2524
"sigs.k8s.io/cli-utils/pkg/apply/solver"
2625
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
2726
"sigs.k8s.io/cli-utils/pkg/common"
2827
"sigs.k8s.io/cli-utils/pkg/inventory"
28+
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
2929
"sigs.k8s.io/cli-utils/pkg/object"
3030
"sigs.k8s.io/cli-utils/pkg/object/validation"
3131
)
3232

33-
const defaultPollInterval = 2 * time.Second
34-
3533
// Applier performs the step of applying a set of resources into a cluster,
3634
// conditionally waits for all of them to be fully reconciled and finally
3735
// performs prune to clean up any resources that has been deleted.
@@ -44,7 +42,7 @@ const defaultPollInterval = 2 * time.Second
4442
// cluster, different sets of tasks might be needed.
4543
type Applier struct {
4644
pruner *prune.Pruner
47-
statusPoller poller.Poller
45+
statusWatcher watcher.StatusWatcher
4846
invClient inventory.Client
4947
client dynamic.Interface
5048
openAPIGetter discovery.OpenAPISchemaInterface
@@ -236,10 +234,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
236234
// Create a new TaskStatusRunner to execute the taskQueue.
237235
klog.V(4).Infoln("applier building TaskStatusRunner...")
238236
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
239-
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
237+
statusWatcher := a.statusWatcher
238+
// Disable watcher for dry runs
239+
if opts.DryRunStrategy.ClientOrServerDryRun() {
240+
statusWatcher = watcher.BlindStatusWatcher{}
241+
}
242+
runner := taskrunner.NewTaskStatusRunner(allIds, statusWatcher)
240243
klog.V(4).Infoln("applier running TaskStatusRunner...")
241244
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
242-
PollInterval: options.PollInterval,
243245
EmitStatusEvents: options.EmitStatusEvents,
244246
})
245247
if err != nil {
@@ -259,10 +261,6 @@ type ApplierOptions struct {
259261
// how long to wait.
260262
ReconcileTimeout time.Duration
261263

262-
// PollInterval defines how often we should poll for the status
263-
// of resources.
264-
PollInterval time.Duration
265-
266264
// EmitStatusEvents defines whether status events should be
267265
// emitted on the eventChannel to the caller.
268266
EmitStatusEvents bool
@@ -295,9 +293,6 @@ type ApplierOptions struct {
295293
// setDefaults set the options to the default values if they
296294
// have not been provided.
297295
func setDefaults(o *ApplierOptions) {
298-
if o.PollInterval == 0 {
299-
o.PollInterval = defaultPollInterval
300-
}
301296
if o.PrunePropagationPolicy == "" {
302297
o.PrunePropagationPolicy = metav1.DeletePropagationBackground
303298
}

pkg/apply/applier_builder.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,10 @@ import (
1313
"k8s.io/client-go/dynamic"
1414
"k8s.io/client-go/rest"
1515
"k8s.io/kubectl/pkg/cmd/util"
16-
"k8s.io/kubectl/pkg/scheme"
1716
"sigs.k8s.io/cli-utils/pkg/apply/info"
18-
"sigs.k8s.io/cli-utils/pkg/apply/poller"
1917
"sigs.k8s.io/cli-utils/pkg/apply/prune"
2018
"sigs.k8s.io/cli-utils/pkg/inventory"
21-
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
22-
"sigs.k8s.io/controller-runtime/pkg/client"
19+
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
2320
)
2421

2522
type ApplierBuilder struct {
@@ -31,7 +28,7 @@ type ApplierBuilder struct {
3128
mapper meta.RESTMapper
3229
restConfig *rest.Config
3330
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
34-
statusPoller poller.Poller
31+
statusWatcher watcher.StatusWatcher
3532
}
3633

3734
// NewApplierBuilder returns a new ApplierBuilder.
@@ -52,7 +49,7 @@ func (b *ApplierBuilder) Build() (*Applier, error) {
5249
Client: bx.client,
5350
Mapper: bx.mapper,
5451
},
55-
statusPoller: bx.statusPoller,
52+
statusWatcher: bx.statusWatcher,
5653
invClient: bx.invClient,
5754
client: bx.client,
5855
openAPIGetter: bx.discoClient,
@@ -109,12 +106,8 @@ func (b *ApplierBuilder) finalize() (*ApplierBuilder, error) {
109106
}
110107
bx.unstructuredClientForMapping = bx.factory.UnstructuredClientForMapping
111108
}
112-
if bx.statusPoller == nil {
113-
c, err := client.New(bx.restConfig, client.Options{Scheme: scheme.Scheme, Mapper: bx.mapper})
114-
if err != nil {
115-
return nil, fmt.Errorf("error creating client: %v", err)
116-
}
117-
bx.statusPoller = polling.NewStatusPoller(c, bx.mapper, polling.Options{})
109+
if bx.statusWatcher == nil {
110+
bx.statusWatcher = watcher.NewDefaultStatusWatcher(bx.client, bx.mapper)
118111
}
119112
return &bx, nil
120113
}
@@ -154,7 +147,7 @@ func (b *ApplierBuilder) WithUnstructuredClientForMapping(unstructuredClientForM
154147
return b
155148
}
156149

157-
func (b *ApplierBuilder) WithStatusPoller(statusPoller poller.Poller) *ApplierBuilder {
158-
b.statusPoller = statusPoller
150+
func (b *ApplierBuilder) WithStatusWatcher(statusWatcher watcher.StatusWatcher) *ApplierBuilder {
151+
b.statusWatcher = statusWatcher
159152
return b
160153
}

pkg/apply/applier_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"sigs.k8s.io/cli-utils/pkg/inventory"
1919
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
2020
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
21+
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
2122
"sigs.k8s.io/cli-utils/pkg/multierror"
2223
"sigs.k8s.io/cli-utils/pkg/object"
2324
"sigs.k8s.io/cli-utils/pkg/object/validation"
@@ -97,7 +98,7 @@ func TestApplier(t *testing.T) {
9798
clusterObjs object.UnstructuredSet
9899
// options input to applier.Run
99100
options ApplierOptions
100-
// fake input events from the status poller
101+
// fake input events from the statusWatcher
101102
statusEvents []pollevent.Event
102103
// expected output status events (async)
103104
expectedStatusEvents []testutil.ExpEvent
@@ -1401,7 +1402,7 @@ func TestApplier(t *testing.T) {
14011402

14021403
for tn, tc := range testCases {
14031404
t.Run(tn, func(t *testing.T) {
1404-
poller := newFakePoller(tc.statusEvents)
1405+
statusWatcher := newFakeWatcher(tc.statusEvents)
14051406

14061407
// Only feed valid objects into the TestApplier.
14071408
// Invalid objects should not generate API requests.
@@ -1418,7 +1419,7 @@ func TestApplier(t *testing.T) {
14181419
tc.invInfo,
14191420
validObjs,
14201421
tc.clusterObjs,
1421-
poller,
1422+
statusWatcher,
14221423
)
14231424

14241425
// Context for Applier.Run
@@ -1463,7 +1464,7 @@ func TestApplier(t *testing.T) {
14631464
e.ActionGroupEvent.Action == event.PruneAction {
14641465
once.Do(func() {
14651466
// start events
1466-
poller.Start()
1467+
statusWatcher.Start()
14671468
})
14681469
}
14691470
}
@@ -1519,7 +1520,7 @@ func TestApplierCancel(t *testing.T) {
15191520
runTimeout time.Duration
15201521
// timeout for the test
15211522
testTimeout time.Duration
1522-
// fake input events from the status poller
1523+
// fake input events from the statusWatcher
15231524
statusEvents []pollevent.Event
15241525
// expected output status events (async)
15251526
expectedStatusEvents []testutil.ExpEvent
@@ -1854,13 +1855,13 @@ func TestApplierCancel(t *testing.T) {
18541855

18551856
for tn, tc := range testCases {
18561857
t.Run(tn, func(t *testing.T) {
1857-
poller := newFakePoller(tc.statusEvents)
1858+
statusWatcher := newFakeWatcher(tc.statusEvents)
18581859

18591860
applier := newTestApplier(t,
18601861
tc.invInfo,
18611862
tc.resources,
18621863
tc.clusterObjs,
1863-
poller,
1864+
statusWatcher,
18641865
)
18651866

18661867
// Context for Applier.Run
@@ -1902,7 +1903,7 @@ func TestApplierCancel(t *testing.T) {
19021903
e.ActionGroupEvent.Action == event.PruneAction {
19031904
once.Do(func() {
19041905
// start events
1905-
poller.Start()
1906+
statusWatcher.Start()
19061907
})
19071908
}
19081909
}
@@ -2046,7 +2047,7 @@ func TestReadAndPrepareObjects(t *testing.T) {
20462047
tc.resources,
20472048
tc.clusterObjs,
20482049
// no events needed for prepareObjects
2049-
newFakePoller([]pollevent.Event{}),
2050+
watcher.BlindStatusWatcher{},
20502051
)
20512052

20522053
applyObjs, pruneObjs, err := applier.prepareObjects(tc.invInfo.toWrapped(), tc.resources, ApplierOptions{})

pkg/apply/cache/resource_cache_map.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ func (rc *ResourceCacheMap) Get(id object.ObjMetadata) ResourceStatus {
5353
defer rc.mu.RUnlock()
5454

5555
obj, found := rc.cache[id]
56-
if klog.V(4).Enabled() {
56+
if klog.V(6).Enabled() {
5757
if found {
58-
klog.Infof("resource cache hit: %s", id)
58+
klog.V(6).Infof("resource cache hit: %s", id)
5959
} else {
60-
klog.Infof("resource cache miss: %s", id)
60+
klog.V(6).Infof("resource cache miss: %s", id)
6161
}
6262
}
6363
if !found {

pkg/apply/common_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ import (
2727
"k8s.io/klog/v2"
2828
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
2929
"k8s.io/kubectl/pkg/scheme"
30-
"sigs.k8s.io/cli-utils/pkg/apply/poller"
3130
"sigs.k8s.io/cli-utils/pkg/common"
3231
"sigs.k8s.io/cli-utils/pkg/inventory"
3332
"sigs.k8s.io/cli-utils/pkg/jsonpath"
34-
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
3533
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
34+
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
3635
"sigs.k8s.io/cli-utils/pkg/object"
3736
)
3837

@@ -74,7 +73,7 @@ func newTestApplier(
7473
invInfo inventoryInfo,
7574
resources object.UnstructuredSet,
7675
clusterObjs object.UnstructuredSet,
77-
statusPoller poller.Poller,
76+
statusWatcher watcher.StatusWatcher,
7877
) *Applier {
7978
tf := newTestFactory(t, invInfo, resources, clusterObjs)
8079
defer tf.Cleanup()
@@ -88,7 +87,7 @@ func newTestApplier(
8887
applier, err := NewApplierBuilder().
8988
WithFactory(tf).
9089
WithInventoryClient(invClient).
91-
WithStatusPoller(statusPoller).
90+
WithStatusWatcher(statusWatcher).
9291
Build()
9392
require.NoError(t, err)
9493

@@ -103,7 +102,7 @@ func newTestDestroyer(
103102
t *testing.T,
104103
invInfo inventoryInfo,
105104
clusterObjs object.UnstructuredSet,
106-
statusPoller poller.Poller,
105+
statusWatcher watcher.StatusWatcher,
107106
) *Destroyer {
108107
tf := newTestFactory(t, invInfo, object.UnstructuredSet{}, clusterObjs)
109108
defer tf.Cleanup()
@@ -112,7 +111,7 @@ func newTestDestroyer(
112111

113112
destroyer, err := NewDestroyer(tf, invClient)
114113
require.NoError(t, err)
115-
destroyer.StatusPoller = statusPoller
114+
destroyer.statusWatcher = statusWatcher
116115

117116
return destroyer
118117
}
@@ -345,27 +344,29 @@ func (n *nsHandler) handle(t *testing.T, req *http.Request) (*http.Response, boo
345344
return nil, false, nil
346345
}
347346

348-
type fakePoller struct {
347+
type fakeWatcher struct {
349348
start chan struct{}
350349
events []pollevent.Event
351350
}
352351

353-
func newFakePoller(statusEvents []pollevent.Event) *fakePoller {
354-
return &fakePoller{
352+
func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
353+
return &fakeWatcher{
355354
events: statusEvents,
356355
start: make(chan struct{}),
357356
}
358357
}
359358

360359
// Start events being sent on the status channel
361-
func (f *fakePoller) Start() {
360+
func (f *fakeWatcher) Start() {
362361
close(f.start)
363362
}
364363

365-
func (f *fakePoller) Poll(ctx context.Context, _ object.ObjMetadataSet, _ polling.PollOptions) <-chan pollevent.Event {
364+
func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ watcher.Options) <-chan pollevent.Event {
366365
eventChannel := make(chan pollevent.Event)
367366
go func() {
368367
defer close(eventChannel)
368+
// send sync event immediately
369+
eventChannel <- pollevent.Event{Type: pollevent.SyncEvent}
369370
// wait until started to send the events
370371
<-f.start
371372
for _, f := range f.events {

0 commit comments

Comments
 (0)