@@ -20,9 +20,12 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"sync"
23
+ "time"
23
24
24
25
clientv3 "go.etcd.io/etcd/client/v3"
26
+ "k8s.io/apimachinery/pkg/util/runtime"
25
27
"k8s.io/apimachinery/pkg/util/version"
28
+ "k8s.io/apimachinery/pkg/util/wait"
26
29
"k8s.io/apiserver/pkg/storage"
27
30
"k8s.io/klog/v2"
28
31
"k8s.io/utils/ptr"
@@ -43,84 +46,98 @@ type FeatureSupportChecker interface {
43
46
// Supports check if the feature is supported or not by checking internal cache.
44
47
// By default all calls to this function before calling CheckClient returns false.
45
48
// Returns true if all endpoints in etcd clients are supporting the feature.
46
- Supports (feature storage.Feature ) (bool , error )
49
+ // If client A supports and client B doesn't support the feature, the `Supports` will
50
+ // first return true at client A initializtion and then return false on client B
51
+ // initialzation, it can flip the support at runtime.
52
+ Supports (feature storage.Feature ) bool
47
53
// CheckClient works with etcd client to recalcualte feature support and cache it internally.
48
54
// All etcd clients should support feature to cause `Supports` return true.
49
55
// If client A supports and client B doesn't support the feature, the `Supports` will
50
56
// first return true at client A initializtion and then return false on client B
51
57
// initialzation, it can flip the support at runtime.
52
- CheckClient (ctx context.Context , c client , feature storage.Feature ) error
58
+ CheckClient (ctx context.Context , c client , feature storage.Feature )
53
59
}
54
60
55
61
type defaultFeatureSupportChecker struct {
56
- lock sync.Mutex
57
- progressNotifySupported * bool
58
- progresNotifyEndpointCache map [string ]bool
62
+ lock sync.Mutex
63
+ progressNotifySupported * bool
64
+ checkingEndpoint map [string ]struct {}
59
65
}
60
66
61
67
func newDefaultFeatureSupportChecker () * defaultFeatureSupportChecker {
62
68
return & defaultFeatureSupportChecker {
63
- progresNotifyEndpointCache : make (map [string ]bool ),
69
+ checkingEndpoint : make (map [string ]struct {} ),
64
70
}
65
71
}
66
72
67
73
// Supports can check the featue from anywhere without storage if it was cached before.
68
- func (f * defaultFeatureSupportChecker ) Supports (feature storage.Feature ) ( bool , error ) {
74
+ func (f * defaultFeatureSupportChecker ) Supports (feature storage.Feature ) bool {
69
75
switch feature {
70
76
case storage .RequestWatchProgress :
71
77
f .lock .Lock ()
72
78
defer f .lock .Unlock ()
73
79
74
- return ptr .Deref (f .progressNotifySupported , false ), nil
80
+ return ptr .Deref (f .progressNotifySupported , false )
75
81
default :
76
- return false , fmt .Errorf ("feature %q is not implemented in DefaultFeatureSupportChecker" , feature )
82
+ runtime .HandleError (fmt .Errorf ("feature %q is not implemented in DefaultFeatureSupportChecker" , feature ))
83
+ return false
77
84
}
78
85
}
79
86
80
87
// CheckClient accepts client and calculate the support per endpoint and caches it.
81
- // It will return at any point if error happens or one endpoint is not supported.
82
- func (f * defaultFeatureSupportChecker ) CheckClient (ctx context.Context , c client , feature storage.Feature ) error {
88
+ func (f * defaultFeatureSupportChecker ) CheckClient (ctx context.Context , c client , feature storage.Feature ) {
83
89
switch feature {
84
90
case storage .RequestWatchProgress :
85
- return f . clientSupportsRequestWatchProgress (ctx , c )
91
+ f . checkClient (ctx , c )
86
92
default :
87
- return fmt .Errorf ("feature %q is not implemented in DefaultFeatureSupportChecker" , feature )
88
-
93
+ runtime .HandleError (fmt .Errorf ("feature %q is not implemented in DefaultFeatureSupportChecker" , feature ))
89
94
}
90
95
}
91
96
92
- func (f * defaultFeatureSupportChecker ) clientSupportsRequestWatchProgress (ctx context.Context , c client ) error {
97
+ func (f * defaultFeatureSupportChecker ) checkClient (ctx context.Context , c client ) {
98
+ // start with 10 ms, multiply by 2 each step, until 15 s and stays on 15 seconds.
99
+ delayFunc := wait.Backoff {
100
+ Duration : 10 * time .Millisecond ,
101
+ Cap : 15 * time .Second ,
102
+ Factor : 2.0 ,
103
+ Steps : 11 }.DelayFunc ()
93
104
f .lock .Lock ()
94
105
defer f .lock .Unlock ()
95
-
96
106
for _ , ep := range c .Endpoints () {
97
- supported , err := f .supportsProgressNotifyEndpointLocked (ctx , c , ep )
98
- if err != nil {
99
- return err
107
+ if _ , found := f .checkingEndpoint [ep ]; found {
108
+ continue
100
109
}
101
- if ! supported {
102
- f .progressNotifySupported = ptr .To (false )
103
- return nil
104
- }
105
- }
106
- if f .progressNotifySupported == nil && len (c .Endpoints ()) > 0 {
107
- f .progressNotifySupported = ptr .To (true )
110
+ f .checkingEndpoint [ep ] = struct {}{}
111
+ go func (ep string ) {
112
+ defer runtime .HandleCrash ()
113
+ err := delayFunc .Until (ctx , true , true , func (ctx context.Context ) (done bool , err error ) {
114
+ internalErr := f .clientSupportsRequestWatchProgress (ctx , c , ep )
115
+ return internalErr == nil , nil
116
+ })
117
+ if err != nil {
118
+ klog .ErrorS (err , "Failed to check if RequestWatchProgress is supported by etcd after retrying" )
119
+ }
120
+ }(ep )
108
121
}
109
- return nil
110
122
}
111
123
112
- func (f * defaultFeatureSupportChecker ) supportsProgressNotifyEndpointLocked (ctx context.Context , c client , ep string ) (bool , error ) {
113
- if supported , ok := f .progresNotifyEndpointCache [ep ]; ok {
114
- return supported , nil
115
- }
116
-
124
+ func (f * defaultFeatureSupportChecker ) clientSupportsRequestWatchProgress (ctx context.Context , c client , ep string ) error {
117
125
supported , err := endpointSupportsRequestWatchProgress (ctx , c , ep )
118
126
if err != nil {
119
- return false , err
127
+ return err
120
128
}
129
+ f .lock .Lock ()
130
+ defer f .lock .Unlock ()
121
131
122
- f .progresNotifyEndpointCache [ep ] = supported
123
- return supported , nil
132
+ if ! supported {
133
+ klog .Infof ("RequestWatchProgress feature is not supported by %q endpoint" , ep )
134
+ f .progressNotifySupported = ptr .To (false )
135
+ return nil
136
+ }
137
+ if f .progressNotifySupported == nil {
138
+ f .progressNotifySupported = ptr .To (true )
139
+ }
140
+ return nil
124
141
}
125
142
126
143
// Sub interface of etcd client.
0 commit comments