Skip to content

Commit 6688ada

Browse files
committed
client-go + apimachinery watch: context support
The Lister and Watcher interfaces only supported methods without context, but were typically implemented with client-go API calls which need a context. New interfaces get added using the same approach as in kubernetes#129109.
1 parent 00fa8f1 commit 6688ada

File tree

14 files changed

+373
-137
lines changed

14 files changed

+373
-137
lines changed

staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type Reporter interface {
5151
// StreamWatcher turns any stream for which you can write a Decoder interface
5252
// into a watch.Interface.
5353
type StreamWatcher struct {
54+
logger klog.Logger
5455
sync.Mutex
5556
source Decoder
5657
reporter Reporter
@@ -59,8 +60,16 @@ type StreamWatcher struct {
5960
}
6061

6162
// NewStreamWatcher creates a StreamWatcher from the given decoder.
63+
//
64+
// Contextual logging: NewStreamWatcherWithLogger should be used instead of NewStreamWatcher in code which supports contextual logging.
6265
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
66+
return NewStreamWatcherWithLogger(klog.Background(), d, r)
67+
}
68+
69+
// NewStreamWatcherWithLogger creates a StreamWatcher from the given decoder and logger.
70+
func NewStreamWatcherWithLogger(logger klog.Logger, d Decoder, r Reporter) *StreamWatcher {
6371
sw := &StreamWatcher{
72+
logger: logger,
6473
source: d,
6574
reporter: r,
6675
// It's easy for a consumer to add buffering via an extra
@@ -98,7 +107,7 @@ func (sw *StreamWatcher) Stop() {
98107

99108
// receive reads result from the decoder in a loop and sends down the result channel.
100109
func (sw *StreamWatcher) receive() {
101-
defer utilruntime.HandleCrash()
110+
defer utilruntime.HandleCrashWithLogger(sw.logger)
102111
defer close(sw.result)
103112
defer sw.Stop()
104113
for {
@@ -108,10 +117,10 @@ func (sw *StreamWatcher) receive() {
108117
case io.EOF:
109118
// watch closed normally
110119
case io.ErrUnexpectedEOF:
111-
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
120+
sw.logger.V(1).Info("Unexpected EOF during watch stream event decoding", "err", err)
112121
default:
113122
if net.IsProbableEOF(err) || net.IsTimeout(err) {
114-
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
123+
sw.logger.V(5).Info("Unable to decode an event from the watch stream", "err", err)
115124
} else {
116125
select {
117126
case <-sw.done:

staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func TestStreamWatcher(t *testing.T) {
6464
}
6565

6666
fd := fakeDecoder{items: make(chan Event, 5)}
67+
//nolint:logcheck // Intentionally uses the old API.
6768
sw := NewStreamWatcher(fd, nil)
6869

6970
for _, item := range table {
@@ -87,6 +88,7 @@ func TestStreamWatcher(t *testing.T) {
8788
func TestStreamWatcherError(t *testing.T) {
8889
fd := fakeDecoder{err: fmt.Errorf("test error")}
8990
fr := &fakeReporter{}
91+
//nolint:logcheck // Intentionally uses the old API.
9092
sw := NewStreamWatcher(fd, fr)
9193
evt, ok := <-sw.ResultChan()
9294
if !ok {
@@ -110,6 +112,7 @@ func TestStreamWatcherError(t *testing.T) {
110112
func TestStreamWatcherRace(t *testing.T) {
111113
fd := fakeDecoder{err: fmt.Errorf("test error")}
112114
fr := &fakeReporter{}
115+
//nolint:logcheck // Intentionally uses the old API.
113116
sw := NewStreamWatcher(fd, fr)
114117
time.Sleep(10 * time.Millisecond)
115118
sw.Stop()

staging/src/k8s.io/apimachinery/pkg/watch/watch.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/klog/v2"
2424

2525
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/utils/ptr"
2627
)
2728

2829
// Interface can be implemented by anything that knows how to watch and report changes.
@@ -103,29 +104,42 @@ func (w emptyWatch) ResultChan() <-chan Event {
103104

104105
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
105106
type FakeWatcher struct {
107+
logger klog.Logger
106108
result chan Event
107109
stopped bool
108110
sync.Mutex
109111
}
110112

113+
var _ Interface = &FakeWatcher{}
114+
115+
// Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging.
111116
func NewFake() *FakeWatcher {
112-
return &FakeWatcher{
113-
result: make(chan Event),
114-
}
117+
return NewFakeWithOptions(FakeOptions{})
115118
}
116119

120+
// Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging.
117121
func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
122+
return NewFakeWithOptions(FakeOptions{ChannelSize: size})
123+
}
124+
125+
func NewFakeWithOptions(options FakeOptions) *FakeWatcher {
118126
return &FakeWatcher{
119-
result: make(chan Event, size),
127+
logger: ptr.Deref(options.Logger, klog.Background()),
128+
result: make(chan Event, options.ChannelSize),
120129
}
121130
}
122131

132+
type FakeOptions struct {
133+
Logger *klog.Logger
134+
ChannelSize int
135+
}
136+
123137
// Stop implements Interface.Stop().
124138
func (f *FakeWatcher) Stop() {
125139
f.Lock()
126140
defer f.Unlock()
127141
if !f.stopped {
128-
klog.V(4).Infof("Stopping fake watcher.")
142+
f.logger.V(4).Info("Stopping fake watcher")
129143
close(f.result)
130144
f.stopped = true
131145
}
@@ -176,13 +190,22 @@ func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
176190

177191
// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
178192
type RaceFreeFakeWatcher struct {
193+
logger klog.Logger
179194
result chan Event
180195
Stopped bool
181196
sync.Mutex
182197
}
183198

199+
var _ Interface = &RaceFreeFakeWatcher{}
200+
201+
// Contextual logging: RaceFreeFakeWatcherWithLogger should be used instead of NewRaceFreeFake in code which supports contextual logging.
184202
func NewRaceFreeFake() *RaceFreeFakeWatcher {
203+
return NewRaceFreeFakeWithLogger(klog.Background())
204+
}
205+
206+
func NewRaceFreeFakeWithLogger(logger klog.Logger) *RaceFreeFakeWatcher {
185207
return &RaceFreeFakeWatcher{
208+
logger: logger,
186209
result: make(chan Event, DefaultChanSize),
187210
}
188211
}
@@ -192,7 +215,7 @@ func (f *RaceFreeFakeWatcher) Stop() {
192215
f.Lock()
193216
defer f.Unlock()
194217
if !f.Stopped {
195-
klog.V(4).Infof("Stopping fake watcher.")
218+
f.logger.V(4).Info("Stopping fake watcher")
196219
close(f.result)
197220
f.Stopped = true
198221
}

staging/src/k8s.io/client-go/rest/request.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1008,7 +1008,8 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa
10081008
frameReader := framer.NewFrameReader(resp.Body)
10091009
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
10101010

1011-
return watch.NewStreamWatcher(
1011+
return watch.NewStreamWatcherWithLogger(
1012+
klog.FromContext(ctx),
10121013
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
10131014
// use 500 to indicate that the cause of the error is unknown - other error codes
10141015
// are more specific to HTTP interactions, and set a reason

staging/src/k8s.io/client-go/tools/cache/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func TestUpdate(t *testing.T) {
367367
// everything we've added has been deleted.
368368
watchCh := make(chan struct{})
369369
_, controller := NewInformer(
370-
&testLW{
370+
&ListWatch{
371371
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
372372
watch, err := source.Watch(options)
373373
close(watchCh)

0 commit comments

Comments
 (0)