Skip to content

Commit 70e65ee

Browse files
committed
Add FeatureSupportChecker for etcd storage
1 parent 17854f0 commit 70e65ee

File tree

3 files changed

+433
-0
lines changed

3 files changed

+433
-0
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package feature
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
24+
clientv3 "go.etcd.io/etcd/client/v3"
25+
"k8s.io/apimachinery/pkg/util/version"
26+
"k8s.io/apiserver/pkg/storage"
27+
"k8s.io/klog/v2"
28+
"k8s.io/utils/ptr"
29+
)
30+
31+
var (
32+
// Define these static versions to use for checking version of etcd, issue on kubernetes #123192
33+
v3_4_31 = version.MustParseSemantic("3.4.31")
34+
v3_5_0 = version.MustParseSemantic("3.5.0")
35+
v3_5_13 = version.MustParseSemantic("3.5.13")
36+
37+
// DefaultFeatureSupportChecker is a shared global etcd FeatureSupportChecker.
38+
DefaultFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
39+
)
40+
41+
// FeatureSupportChecker to define Supports functions.
42+
type FeatureSupportChecker interface {
43+
// Supports check if the feature is supported or not by checking internal cache.
44+
// By default all calls to this function before calling CheckClient returns false.
45+
// Returns true if all endpoints in etcd clients are supporting the feature.
46+
Supports(feature storage.Feature) (bool, error)
47+
// CheckClient works with etcd client to recalcualte feature support and cache it internally.
48+
// All etcd clients should support feature to cause `Supports` return true.
49+
// If client A supports and client B doesn't support the feature, the `Supports` will
50+
// first return true at client A initializtion and then return false on client B
51+
// initialzation, it can flip the support at runtime.
52+
CheckClient(ctx context.Context, c client, feature storage.Feature) error
53+
}
54+
55+
type defaultFeatureSupportChecker struct {
56+
lock sync.Mutex
57+
progressNotifySupported *bool
58+
progresNotifyEndpointCache map[string]bool
59+
}
60+
61+
func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker {
62+
return &defaultFeatureSupportChecker{
63+
progresNotifyEndpointCache: make(map[string]bool),
64+
}
65+
}
66+
67+
// Supports can check the featue from anywhere without storage if it was cached before.
68+
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) {
69+
switch feature {
70+
case storage.RequestWatchProgress:
71+
f.lock.Lock()
72+
defer f.lock.Unlock()
73+
74+
return ptr.Deref(f.progressNotifySupported, false), nil
75+
default:
76+
return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
77+
}
78+
}
79+
80+
// CheckClient accepts client and calculate the support per endpoint and caches it.
81+
// It will return at any point if error happens or one endpoint is not supported.
82+
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error {
83+
switch feature {
84+
case storage.RequestWatchProgress:
85+
return f.clientSupportsRequestWatchProgress(ctx, c)
86+
default:
87+
return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
88+
89+
}
90+
}
91+
92+
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error {
93+
f.lock.Lock()
94+
defer f.lock.Unlock()
95+
96+
for _, ep := range c.Endpoints() {
97+
supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep)
98+
if err != nil {
99+
return err
100+
}
101+
if !supported {
102+
f.progressNotifySupported = ptr.To(false)
103+
return nil
104+
}
105+
}
106+
if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 {
107+
f.progressNotifySupported = ptr.To(true)
108+
}
109+
return nil
110+
}
111+
112+
func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) {
113+
if supported, ok := f.progresNotifyEndpointCache[ep]; ok {
114+
return supported, nil
115+
}
116+
117+
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
118+
if err != nil {
119+
return false, err
120+
}
121+
122+
f.progresNotifyEndpointCache[ep] = supported
123+
return supported, nil
124+
}
125+
126+
// Sub interface of etcd client.
127+
type client interface {
128+
// Endpoints returns list of endpoints in etcd client.
129+
Endpoints() []string
130+
// Status retrieves the status information from the etcd client connected to the specified endpoint.
131+
// It takes a context.Context parameter for cancellation or timeout control.
132+
// It returns a clientv3.StatusResponse containing the status information or an error if the operation fails.
133+
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
134+
}
135+
136+
// endpointSupportsRequestWatchProgress evaluates whether RequestWatchProgress supported by current version of etcd endpoint.
137+
// Based on this issues:
138+
// - https://github.com/etcd-io/etcd/issues/15220 - Fixed in etcd v3.4.25+ and v3.5.8+
139+
// - https://github.com/etcd-io/etcd/issues/17507 - Fixed in etcd v3.4.31+ and v3.5.13+
140+
func endpointSupportsRequestWatchProgress(ctx context.Context, c client, endpoint string) (bool, error) {
141+
resp, err := c.Status(ctx, endpoint)
142+
if err != nil {
143+
return false, fmt.Errorf("failed checking etcd version, endpoint: %q: %w", endpoint, err)
144+
}
145+
ver, err := version.ParseSemantic(resp.Version)
146+
if err != nil {
147+
// Assume feature is not supported if etcd version cannot be parsed.
148+
klog.ErrorS(err, "Failed to parse etcd version", "version", resp.Version)
149+
return false, nil
150+
}
151+
if ver.LessThan(v3_4_31) || ver.AtLeast(v3_5_0) && ver.LessThan(v3_5_13) {
152+
return false, nil
153+
}
154+
return true, nil
155+
}

0 commit comments

Comments
 (0)