Skip to content

Commit f5d62f7

Browse files
authored
Merge pull request kubernetes#124446 from p0lyn0mial/watch-list-consistency-detector-more-generic
client-go/consistency-detector: change the signature of checkWatchListConsistencyIfRequested
2 parents 4bb4345 + 83c7542 commit f5d62f7

File tree

3 files changed

+62
-39
lines changed

3 files changed

+62
-39
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
695695
// we utilize the temporaryStore to ensure independence from the current store implementation.
696696
// as of today, the store is implemented as a queue and will be drained by the higher-level
697697
// component as soon as it finishes replacing the content.
698-
checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
698+
checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List)
699699

700700
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
701701
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
@@ -933,6 +933,13 @@ func isWatchErrorRetriable(err error) bool {
933933
return false
934934
}
935935

936+
// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it.
937+
func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
938+
return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
939+
return listFn(options)
940+
}
941+
}
942+
936943
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
937944
// which marks the end of the watch stream, has not been received within the defined tick interval.
938945
//

staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"os"
2223
"sort"
2324
"strconv"
@@ -32,42 +33,46 @@ import (
3233
"k8s.io/klog/v2"
3334
)
3435

35-
var dataConsistencyDetectionEnabled = false
36+
var dataConsistencyDetectionForWatchListEnabled = false
3637

3738
func init() {
38-
dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
39+
dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
3940
}
4041

41-
// checkWatchListConsistencyIfRequested performs a data consistency check only when
42+
type retrieveItemsFunc[U any] func() []U
43+
44+
type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
45+
46+
// checkWatchListDataConsistencyIfRequested performs a data consistency check only when
4247
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
4348
//
4449
// The consistency check is meant to be enforced only in the CI, not in production.
4550
// The check ensures that data retrieved by the watch-list api call
46-
// is exactly the same as data received by the standard list api call.
51+
// is exactly the same as data received by the standard list api call against etcd.
4752
//
4853
// Note that this function will panic when data inconsistency is detected.
4954
// This is intentional because we want to catch it in the CI.
50-
func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
51-
if !dataConsistencyDetectionEnabled {
55+
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) {
56+
if !dataConsistencyDetectionForWatchListEnabled {
5257
return
5358
}
54-
checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store)
59+
// for informers we pass an empty ListOptions because
60+
// listFn might be wrapped for filtering during informer construction.
61+
checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
5562
}
5663

57-
// checkWatchListConsistency exists solely for testing purposes.
58-
// we cannot use checkWatchListConsistencyIfRequested because
64+
// checkDataConsistency exists solely for testing purposes.
65+
// we cannot use checkWatchListDataConsistencyIfRequested because
5966
// it is guarded by an environmental variable.
6067
// we cannot manipulate the environmental variable because
6168
// it will affect other tests in this package.
62-
func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
63-
klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity)
64-
opts := metav1.ListOptions{
65-
ResourceVersion: lastSyncedResourceVersion,
66-
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
67-
}
69+
func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) {
70+
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
71+
listOptions.ResourceVersion = lastSyncedResourceVersion
72+
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
6873
var list runtime.Object
69-
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) {
70-
list, err = listerWatcher.List(opts)
74+
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
75+
list, err = listFn(ctx, listOptions)
7176
if err != nil {
7277
// the consistency check will only be enabled in the CI
7378
// and LIST calls in general will be retired by the client-go library
@@ -78,7 +83,7 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync
7883
return true, nil
7984
})
8085
if err != nil {
81-
klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err)
86+
klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
8287
return
8388
}
8489

@@ -88,14 +93,14 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync
8893
}
8994

9095
listItems := toMetaObjectSliceOrDie(rawListItems)
91-
storeItems := toMetaObjectSliceOrDie(store.List())
96+
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
9297

9398
sort.Sort(byUID(listItems))
94-
sort.Sort(byUID(storeItems))
99+
sort.Sort(byUID(retrievedItems))
95100

96-
if !cmp.Equal(listItems, storeItems) {
97-
klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems))
98-
msg := "data inconsistency detected for the watch-list feature, panicking!"
101+
if !cmp.Equal(listItems, retrievedItems) {
102+
klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems))
103+
msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
99104
panic(msg)
100105
}
101106
}

staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"testing"
2223

@@ -25,62 +26,71 @@ import (
2526
v1 "k8s.io/api/core/v1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
29+
"k8s.io/apimachinery/pkg/util/wait"
2830
"k8s.io/apimachinery/pkg/watch"
31+
"k8s.io/utils/ptr"
2932
)
3033

31-
func TestWatchListConsistency(t *testing.T) {
34+
func TestDataConsistencyChecker(t *testing.T) {
3235
scenarios := []struct {
3336
name string
3437

35-
podList *v1.PodList
36-
storeContent []*v1.Pod
38+
podList *v1.PodList
39+
storeContent []*v1.Pod
40+
requestOptions metav1.ListOptions
3741

3842
expectedRequestOptions []metav1.ListOptions
3943
expectedListRequests int
4044
expectPanic bool
4145
}{
4246
{
43-
name: "watchlist consistency check won't panic when data is consistent",
47+
name: "data consistency check won't panic when data is consistent",
4448
podList: &v1.PodList{
4549
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
4650
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
4751
},
52+
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
4853
storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
4954
expectedListRequests: 1,
5055
expectedRequestOptions: []metav1.ListOptions{
5156
{
5257
ResourceVersion: "2",
5358
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
59+
TimeoutSeconds: ptr.To(int64(39)),
5460
},
5561
},
5662
},
5763

5864
{
59-
name: "watchlist consistency check won't panic when there is no data",
65+
name: "data consistency check won't panic when there is no data",
6066
podList: &v1.PodList{
6167
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
6268
},
69+
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
6370
expectedListRequests: 1,
6471
expectedRequestOptions: []metav1.ListOptions{
6572
{
6673
ResourceVersion: "2",
6774
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
75+
TimeoutSeconds: ptr.To(int64(39)),
6876
},
6977
},
7078
},
7179

7280
{
73-
name: "watchlist consistency panics when data is inconsistent",
81+
name: "data consistency panics when data is inconsistent",
7482
podList: &v1.PodList{
7583
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
7684
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
7785
},
86+
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
7887
storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
7988
expectedListRequests: 1,
8089
expectedRequestOptions: []metav1.ListOptions{
8190
{
8291
ResourceVersion: "2",
8392
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
93+
TimeoutSeconds: ptr.To(int64(39)),
8494
},
8595
},
8696
expectPanic: true,
@@ -90,15 +100,18 @@ func TestWatchListConsistency(t *testing.T) {
90100
for _, scenario := range scenarios {
91101
t.Run(scenario.name, func(t *testing.T) {
92102
listWatcher, store, _, stopCh := testData()
103+
ctx := wait.ContextForChannel(stopCh)
93104
for _, obj := range scenario.storeContent {
94105
require.NoError(t, store.Add(obj))
95106
}
96107
listWatcher.customListResponse = scenario.podList
97108

98109
if scenario.expectPanic {
99-
require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) })
110+
require.Panics(t, func() {
111+
checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List)
112+
})
100113
} else {
101-
checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store)
114+
checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List)
102115
}
103116

104117
verifyListCounter(t, listWatcher, scenario.expectedListRequests)
@@ -108,20 +121,18 @@ func TestWatchListConsistency(t *testing.T) {
108121
}
109122

110123
func TestDriveWatchLisConsistencyIfRequired(t *testing.T) {
111-
stopCh := make(chan struct{})
112-
defer close(stopCh)
113-
checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil)
124+
ctx := context.TODO()
125+
checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil)
114126
}
115127

116-
func TestWatchListConsistencyRetry(t *testing.T) {
128+
func TestDataConsistencyCheckerRetry(t *testing.T) {
117129
store := NewStore(MetaNamespaceKeyFunc)
118-
stopCh := make(chan struct{})
119-
defer close(stopCh)
130+
ctx := context.TODO()
120131

121132
stopListErrorAfter := 5
122133
errLister := &errorLister{stopErrorAfter: stopListErrorAfter}
123134

124-
checkWatchListConsistency(stopCh, "", "", errLister, store)
135+
checkDataConsistency(ctx, "", "", wrapListFuncWithContext(errLister.List), metav1.ListOptions{}, store.List)
125136
require.Equal(t, errLister.listCounter, errLister.stopErrorAfter)
126137
}
127138

0 commit comments

Comments
 (0)