Skip to content

Commit d8ed461

Browse files
authored
Merge pull request kubernetes#124612 from ah8ad3/add-clc-warning
Feat: warn user if etcd version is not supported for RequestWatchProgress feature.
2 parents f75d8e9 + 9f8273a commit d8ed461

File tree

6 files changed

+83
-55
lines changed

6 files changed

+83
-55
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"k8s.io/apiserver/pkg/features"
4343
"k8s.io/apiserver/pkg/storage"
4444
"k8s.io/apiserver/pkg/storage/cacher/metrics"
45+
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
4546
utilfeature "k8s.io/apiserver/pkg/util/feature"
4647
"k8s.io/client-go/tools/cache"
4748
"k8s.io/component-base/tracing"
@@ -728,9 +729,10 @@ func shouldDelegateList(opts storage.ListOptions) bool {
728729
pred := opts.Predicate
729730
match := opts.ResourceVersionMatch
730731
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
732+
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
731733

732734
// Serve consistent reads from storage if ConsistentListFromCache is disabled
733-
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
735+
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
734736
// Watch cache doesn't support continuations, so serve them from etcd.
735737
hasContinuation := len(pred.Continue) > 0
736738
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
@@ -773,7 +775,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
773775
// minimal resource version, simply forward the request to storage.
774776
return c.storage.GetList(ctx, key, opts, listObj)
775777
}
776-
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
778+
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
779+
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
777780
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
778781
if err != nil {
779782
return err

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"k8s.io/apiserver/pkg/features"
3434
"k8s.io/apiserver/pkg/storage"
3535
"k8s.io/apiserver/pkg/storage/cacher/metrics"
36+
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
3637
utilfeature "k8s.io/apiserver/pkg/util/feature"
3738
"k8s.io/client-go/tools/cache"
3839
"k8s.io/component-base/tracing"
@@ -498,7 +499,8 @@ func (s sortableStoreElements) Swap(i, j int) {
498499
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
499500
// with their ResourceVersion and the name of the index, if any, that was used.
500501
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
501-
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
502+
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
503+
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
502504
w.waitingUntilFresh.Add()
503505
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
504506
w.waitingUntilFresh.Remove()

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,12 @@ import (
3939
"k8s.io/apimachinery/pkg/watch"
4040
"k8s.io/apiserver/pkg/audit"
4141
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
42+
"k8s.io/apiserver/pkg/features"
4243
"k8s.io/apiserver/pkg/storage"
4344
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
45+
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
4446
"k8s.io/apiserver/pkg/storage/value"
47+
utilfeature "k8s.io/apiserver/pkg/util/feature"
4548
"k8s.io/component-base/tracing"
4649
"k8s.io/klog/v2"
4750
)
@@ -139,6 +142,9 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
139142
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
140143
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
141144
}
145+
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
146+
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
147+
}
142148
return s
143149
}
144150

staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go renamed to staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

2425
clientv3 "go.etcd.io/etcd/client/v3"
26+
"k8s.io/apimachinery/pkg/util/runtime"
2527
"k8s.io/apimachinery/pkg/util/version"
28+
"k8s.io/apimachinery/pkg/util/wait"
2629
"k8s.io/apiserver/pkg/storage"
2730
"k8s.io/klog/v2"
2831
"k8s.io/utils/ptr"
@@ -43,84 +46,98 @@ type FeatureSupportChecker interface {
4346
// Supports check if the feature is supported or not by checking internal cache.
4447
// By default all calls to this function before calling CheckClient returns false.
4548
// Returns true if all endpoints in etcd clients are supporting the feature.
46-
Supports(feature storage.Feature) (bool, error)
49+
// If client A supports and client B doesn't support the feature, the `Supports` will
50+
// first return true at client A initializtion and then return false on client B
51+
// initialzation, it can flip the support at runtime.
52+
Supports(feature storage.Feature) bool
4753
// CheckClient works with etcd client to recalcualte feature support and cache it internally.
4854
// All etcd clients should support feature to cause `Supports` return true.
4955
// If client A supports and client B doesn't support the feature, the `Supports` will
5056
// first return true at client A initializtion and then return false on client B
5157
// initialzation, it can flip the support at runtime.
52-
CheckClient(ctx context.Context, c client, feature storage.Feature) error
58+
CheckClient(ctx context.Context, c client, feature storage.Feature)
5359
}
5460

5561
type defaultFeatureSupportChecker struct {
56-
lock sync.Mutex
57-
progressNotifySupported *bool
58-
progresNotifyEndpointCache map[string]bool
62+
lock sync.Mutex
63+
progressNotifySupported *bool
64+
checkingEndpoint map[string]struct{}
5965
}
6066

6167
func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker {
6268
return &defaultFeatureSupportChecker{
63-
progresNotifyEndpointCache: make(map[string]bool),
69+
checkingEndpoint: make(map[string]struct{}),
6470
}
6571
}
6672

6773
// Supports can check the featue from anywhere without storage if it was cached before.
68-
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) {
74+
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool {
6975
switch feature {
7076
case storage.RequestWatchProgress:
7177
f.lock.Lock()
7278
defer f.lock.Unlock()
7379

74-
return ptr.Deref(f.progressNotifySupported, false), nil
80+
return ptr.Deref(f.progressNotifySupported, false)
7581
default:
76-
return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
82+
runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
83+
return false
7784
}
7885
}
7986

8087
// CheckClient accepts client and calculate the support per endpoint and caches it.
81-
// It will return at any point if error happens or one endpoint is not supported.
82-
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error {
88+
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) {
8389
switch feature {
8490
case storage.RequestWatchProgress:
85-
return f.clientSupportsRequestWatchProgress(ctx, c)
91+
f.checkClient(ctx, c)
8692
default:
87-
return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
88-
93+
runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
8994
}
9095
}
9196

92-
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error {
97+
func (f *defaultFeatureSupportChecker) checkClient(ctx context.Context, c client) {
98+
// start with 10 ms, multiply by 2 each step, until 15 s and stays on 15 seconds.
99+
delayFunc := wait.Backoff{
100+
Duration: 10 * time.Millisecond,
101+
Cap: 15 * time.Second,
102+
Factor: 2.0,
103+
Steps: 11}.DelayFunc()
93104
f.lock.Lock()
94105
defer f.lock.Unlock()
95-
96106
for _, ep := range c.Endpoints() {
97-
supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep)
98-
if err != nil {
99-
return err
107+
if _, found := f.checkingEndpoint[ep]; found {
108+
continue
100109
}
101-
if !supported {
102-
f.progressNotifySupported = ptr.To(false)
103-
return nil
104-
}
105-
}
106-
if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 {
107-
f.progressNotifySupported = ptr.To(true)
110+
f.checkingEndpoint[ep] = struct{}{}
111+
go func(ep string) {
112+
defer runtime.HandleCrash()
113+
err := delayFunc.Until(ctx, true, true, func(ctx context.Context) (done bool, err error) {
114+
internalErr := f.clientSupportsRequestWatchProgress(ctx, c, ep)
115+
return internalErr == nil, nil
116+
})
117+
if err != nil {
118+
klog.ErrorS(err, "Failed to check if RequestWatchProgress is supported by etcd after retrying")
119+
}
120+
}(ep)
108121
}
109-
return nil
110122
}
111123

112-
func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) {
113-
if supported, ok := f.progresNotifyEndpointCache[ep]; ok {
114-
return supported, nil
115-
}
116-
124+
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client, ep string) error {
117125
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
118126
if err != nil {
119-
return false, err
127+
return err
120128
}
129+
f.lock.Lock()
130+
defer f.lock.Unlock()
121131

122-
f.progresNotifyEndpointCache[ep] = supported
123-
return supported, nil
132+
if !supported {
133+
klog.Infof("RequestWatchProgress feature is not supported by %q endpoint", ep)
134+
f.progressNotifySupported = ptr.To(false)
135+
return nil
136+
}
137+
if f.progressNotifySupported == nil {
138+
f.progressNotifySupported = ptr.To(true)
139+
}
140+
return nil
124141
}
125142

126143
// Sub interface of etcd client.
Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,17 @@ func TestSupports(t *testing.T) {
7676
testName string
7777
featureName string
7878
expectedResult bool
79-
expectedError error
8079
}{
8180
{
82-
testName: "Error with unknown feature",
83-
featureName: "some unknown feature",
84-
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", "some unknown feature"),
81+
testName: "Disabled - with unknown feature",
82+
featureName: "some unknown feature",
8583
},
8684
{
87-
testName: "Error with empty feature",
88-
featureName: "",
89-
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", ""),
85+
testName: "Disabled - with empty feature",
86+
featureName: "",
9087
},
9188
{
92-
testName: "No error but disabled by default",
89+
testName: "Disabled - default",
9390
featureName: storage.RequestWatchProgress,
9491
expectedResult: false,
9592
},
@@ -99,10 +96,9 @@ func TestSupports(t *testing.T) {
9996
t.Run(tt.testName, func(t *testing.T) {
10097
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
10198

102-
supported, err := testFeatureSupportChecker.Supports(tt.featureName)
99+
supported := testFeatureSupportChecker.Supports(tt.featureName)
103100

104101
assert.Equal(t, tt.expectedResult, supported)
105-
assert.Equal(t, tt.expectedError, err)
106102
})
107103
}
108104
}
@@ -254,18 +250,19 @@ func TestSupportsRequestWatchProgress(t *testing.T) {
254250
}
255251
for _, tt := range tests {
256252
t.Run(tt.testName, func(t *testing.T) {
257-
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
253+
var testFeatureSupportChecker = newDefaultFeatureSupportChecker()
258254
for _, round := range tt.rounds {
259255
// Mock Etcd client
260256
mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion}
261257
ctx := context.Background()
262258

263-
err := testFeatureSupportChecker.CheckClient(ctx, mockClient, storage.RequestWatchProgress)
264-
assert.Equal(t, err, round.expectedError)
259+
for _, ep := range mockClient.Endpoints() {
260+
err := testFeatureSupportChecker.clientSupportsRequestWatchProgress(ctx, mockClient, ep)
261+
assert.Equal(t, round.expectedError, err)
262+
}
265263

266-
// Error of Supports already tested in TestSupports.
267-
supported, _ := testFeatureSupportChecker.Supports(storage.RequestWatchProgress)
268-
assert.Equal(t, supported, round.expectedResult)
264+
supported := testFeatureSupportChecker.Supports(storage.RequestWatchProgress)
265+
assert.Equal(t, round.expectedResult, supported)
269266
}
270267
})
271268
}

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"k8s.io/apimachinery/pkg/runtime/schema"
2626
apirequest "k8s.io/apiserver/pkg/endpoints/request"
2727
"k8s.io/apiserver/pkg/features"
28+
"k8s.io/apiserver/pkg/storage"
29+
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
2830
utilfeature "k8s.io/apiserver/pkg/util/feature"
2931
"k8s.io/klog/v2"
3032
)
@@ -165,9 +167,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
165167
resourceVersion := opts.ResourceVersion
166168
match := opts.ResourceVersionMatch
167169
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
170+
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
168171

169172
// Serve consistent reads from storage if ConsistentListFromCache is disabled
170-
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
173+
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
171174
// Watch cache doesn't support continuations, so serve them from etcd.
172175
hasContinuation := len(opts.Continue) > 0
173176
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.

0 commit comments

Comments
 (0)