Skip to content

Commit 1a8d8c9

Browse files
committed
client-go watch: NewIndexerInformerWatcherWithContext -> WithLogger
The ability to automatically stop on context cancellation was new functionality that adds complexity and wasn't really used in Kubernetes. If someone wants this, they can add it outside of the function. A *WithLogger variant avoids the complexity and is consistent with NewStreamWatcherWithLogger over in apimachinery.
1 parent 8cc74e8 commit 1a8d8c9

File tree

3 files changed

+6
-51
lines changed

3 files changed

+6
-51
lines changed

staging/src/k8s.io/client-go/tools/watch/informerwatcher.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package watch
1818

1919
import (
20-
"context"
2120
"sync"
2221

2322
"k8s.io/apimachinery/pkg/runtime"
@@ -107,18 +106,15 @@ func (e *eventProcessor) stop() {
107106
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
108107
// it also returns a channel you can use to wait for the informers to fully shutdown.
109108
//
110-
// Contextual logging: NewIndexerInformerWatcherWithContext should be used instead of NewIndexerInformerWatcher in code which supports contextual logging.
109+
// Contextual logging: NewIndexerInformerWatcherWithLogger should be used instead of NewIndexerInformerWatcher in code which supports contextual logging.
111110
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
112-
return NewIndexerInformerWatcherWithContext(context.Background(), lw, objType)
111+
return NewIndexerInformerWatcherWithLogger(klog.Background(), lw, objType)
113112
}
114113

115-
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
114+
// NewIndexerInformerWatcherWithLogger will create an IndexerInformer and wrap it into watch.Interface
116115
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
117116
// it also returns a channel you can use to wait for the informers to fully shutdown.
118-
//
119-
// Cancellation of the context has the same effect as calling [watch.Interface.Stop]. One or
120-
// the other can be used.
121-
func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
117+
func NewIndexerInformerWatcherWithLogger(logger klog.Logger, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
122118
ch := make(chan watch.Event)
123119
w := watch.NewProxyWatcher(ch)
124120
e := newEventProcessor(ch)
@@ -155,24 +151,12 @@ func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWa
155151
// This will get stopped, but without waiting for it.
156152
go e.run()
157153

158-
logger := klog.FromContext(ctx)
159-
if ctx.Done() != nil {
160-
go func() {
161-
select {
162-
case <-ctx.Done():
163-
// Map cancellation to Stop. The informer below only waits for that.
164-
w.Stop()
165-
case <-w.StopChan():
166-
}
167-
}()
168-
}
169-
170154
doneCh := make(chan struct{})
171155
go func() {
172156
defer close(doneCh)
173157
defer e.stop()
174158
// Waiting for w.StopChan() is the traditional behavior which gets
175-
// preserved here. Context cancellation is handled above.
159+
// preserved here, with the logger added to support contextual logging.
176160
ctx := wait.ContextForChannel(w.StopChan())
177161
ctx = klog.NewContext(ctx, logger)
178162
informer.RunWithContext(ctx)

staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package watch
1818

1919
import (
2020
"context"
21-
"errors"
2221
"reflect"
2322
goruntime "runtime"
2423
"sort"
@@ -40,8 +39,6 @@ import (
4039
fakeclientset "k8s.io/client-go/kubernetes/fake"
4140
testcore "k8s.io/client-go/testing"
4241
"k8s.io/client-go/tools/cache"
43-
"k8s.io/klog/v2"
44-
"k8s.io/klog/v2/ktesting"
4542
)
4643

4744
// TestEventProcessorExit is expected to timeout if the event processor fails
@@ -467,29 +464,3 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) {
467464
t.Fatalf("expected at least 1 watch call, got %d", watchCalls)
468465
}
469466
}
470-
471-
func TestInformerContext(t *testing.T) {
472-
logger, ctx := ktesting.NewTestContext(t)
473-
ctx, cancel := context.WithCancel(ctx)
474-
defer cancel()
475-
476-
// Whatever gets called first will stop.
477-
validateContext := func(ctx context.Context) error {
478-
if reflect.TypeOf(logger.GetSink()) != reflect.TypeOf(klog.FromContext(ctx).GetSink()) {
479-
t.Errorf("Expected logger %+v from context, got %+v", logger, klog.FromContext(ctx))
480-
}
481-
cancel()
482-
return errors.New("not implemented by text")
483-
}
484-
lw := &cache.ListWatch{
485-
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
486-
return nil, validateContext(ctx)
487-
},
488-
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
489-
return nil, validateContext(ctx)
490-
},
491-
}
492-
493-
_, _, _, done := NewIndexerInformerWatcherWithContext(ctx, lw, &corev1.Secret{})
494-
<-done
495-
}

staging/src/k8s.io/client-go/tools/watch/until.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac
126126
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
127127
// waiting for object reaching a state, "small" controllers, ...
128128
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
129-
indexer, informer, watcher, done := NewIndexerInformerWatcherWithContext(ctx, lw, objType)
129+
indexer, informer, watcher, done := NewIndexerInformerWatcherWithLogger(klog.FromContext(ctx), lw, objType)
130130
// We need to wait for the internal informers to fully stop so it's easier to reason about
131131
// and it works with non-thread safe clients.
132132
defer func() { <-done }()

0 commit comments

Comments
 (0)