@@ -46,8 +46,9 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
46
46
return
47
47
}
48
48
klog .Warningf ("data consistency check for %s is enabled, this will result in an additional call to the API server." , identity )
49
- listOptions .ResourceVersion = lastSyncedResourceVersion
50
- listOptions .ResourceVersionMatch = metav1 .ResourceVersionMatchExact
49
+
50
+ retrievedItems := toMetaObjectSliceOrDie (retrieveItemsFn ())
51
+ listOptions = prepareListCallOptions (lastSyncedResourceVersion , listOptions , len (retrievedItems ))
51
52
var list runtime.Object
52
53
err := wait .PollUntilContextCancel (ctx , time .Second , true , func (_ context.Context ) (done bool , err error ) {
53
54
list , err = listFn (ctx , listOptions )
@@ -69,9 +70,7 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
69
70
if err != nil {
70
71
panic (err ) // this should never happen
71
72
}
72
-
73
73
listItems := toMetaObjectSliceOrDie (rawListItems )
74
- retrievedItems := toMetaObjectSliceOrDie (retrieveItemsFn ())
75
74
76
75
sort .Sort (byUID (listItems ))
77
76
sort .Sort (byUID (retrievedItems ))
@@ -85,24 +84,49 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
85
84
86
85
// canFormAdditionalListCall ensures that we can form a valid LIST requests
87
86
// for checking data consistency.
88
- func canFormAdditionalListCall (resourceVersion string , options metav1.ListOptions ) bool {
87
+ func canFormAdditionalListCall (lastSyncedResourceVersion string , listOptions metav1.ListOptions ) bool {
89
88
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
90
89
// we need to make sure that the continuation hasn't been set
91
90
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38
92
- if len (options .Continue ) > 0 {
91
+ if len (listOptions .Continue ) > 0 {
93
92
return false
94
93
}
95
94
96
95
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
97
96
// we need to make sure that the RV is valid because the validation code forbids RV == "0"
98
97
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44
99
- if resourceVersion == "0" {
98
+ if lastSyncedResourceVersion == "0" {
100
99
return false
101
100
}
102
101
103
102
return true
104
103
}
105
104
105
+ // prepareListCallOptions changes the input list options so that
106
+ // the list call goes directly to etcd
107
+ func prepareListCallOptions (lastSyncedResourceVersion string , listOptions metav1.ListOptions , retrievedItemsCount int ) metav1.ListOptions {
108
+ // this is our legacy case:
109
+ //
110
+ // the watch cache skips the Limit if the ResourceVersion was set to "0"
111
+ // thus, to compare with data retrieved directly from etcd
112
+ // we need to skip the limit to for the list call as well.
113
+ //
114
+ // note that when the number of retrieved items is less than the request limit,
115
+ // it means either the watch cache is disabled, or there is not enough data.
116
+ // in both cases, we can use the limit because we will be able to compare
117
+ // the data with the items retrieved from etcd.
118
+ if listOptions .ResourceVersion == "0" && listOptions .Limit > 0 && int64 (retrievedItemsCount ) > listOptions .Limit {
119
+ listOptions .Limit = 0
120
+ }
121
+
122
+ // set the RV and RVM so that we get the snapshot of data
123
+ // directly from etcd.
124
+ listOptions .ResourceVersion = lastSyncedResourceVersion
125
+ listOptions .ResourceVersionMatch = metav1 .ResourceVersionMatchExact
126
+
127
+ return listOptions
128
+ }
129
+
106
130
type byUID []metav1.Object
107
131
108
132
func (a byUID ) Len () int { return len (a ) }
0 commit comments