@@ -52,6 +52,8 @@ import (
52
52
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
53
53
"k8s.io/apimachinery/pkg/types"
54
54
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
55
+ "k8s.io/apimachinery/pkg/util/sets"
56
+ utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
55
57
"k8s.io/apiserver/pkg/admission"
56
58
"k8s.io/apiserver/pkg/authorization/authorizer"
57
59
"k8s.io/apiserver/pkg/endpoints/handlers"
@@ -62,6 +64,7 @@ import (
62
64
"k8s.io/apiserver/pkg/features"
63
65
"k8s.io/apiserver/pkg/registry/generic"
64
66
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
67
+ genericfilters "k8s.io/apiserver/pkg/server/filters"
65
68
"k8s.io/apiserver/pkg/storage/storagebackend"
66
69
utilfeature "k8s.io/apiserver/pkg/util/feature"
67
70
"k8s.io/apiserver/pkg/util/webhook"
@@ -100,6 +103,9 @@ type crdHandler struct {
100
103
101
104
// so that we can do create on update.
102
105
authorizer authorizer.Authorizer
106
+
107
+ // request timeout we should delay storage teardown for
108
+ requestTimeout time.Duration
103
109
}
104
110
105
111
// crdInfo stores enough information to serve the storage for the custom resource
@@ -123,6 +129,8 @@ type crdInfo struct {
123
129
124
130
// storageVersion is the CRD version used when storing the object in etcd.
125
131
storageVersion string
132
+
133
+ waitGroup * utilwaitgroup.SafeWaitGroup
126
134
}
127
135
128
136
// crdStorageMap goes from customresourcedefinition to its storage
@@ -139,7 +147,8 @@ func NewCustomResourceDefinitionHandler(
139
147
serviceResolver webhook.ServiceResolver ,
140
148
authResolverWrapper webhook.AuthenticationInfoResolverWrapper ,
141
149
masterCount int ,
142
- authorizer authorizer.Authorizer ) (* crdHandler , error ) {
150
+ authorizer authorizer.Authorizer ,
151
+ requestTimeout time.Duration ) (* crdHandler , error ) {
143
152
ret := & crdHandler {
144
153
versionDiscoveryHandler : versionDiscoveryHandler ,
145
154
groupDiscoveryHandler : groupDiscoveryHandler ,
@@ -151,6 +160,7 @@ func NewCustomResourceDefinitionHandler(
151
160
establishingController : establishingController ,
152
161
masterCount : masterCount ,
153
162
authorizer : authorizer ,
163
+ requestTimeout : requestTimeout ,
154
164
}
155
165
crdInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
156
166
UpdateFunc : ret .updateCustomResourceDefinition ,
@@ -169,6 +179,11 @@ func NewCustomResourceDefinitionHandler(
169
179
return ret , nil
170
180
}
171
181
182
+ // watches are expected to handle storage disruption gracefully,
183
+ // both on the server-side (by terminating the watch connection)
184
+ // and on the client side (by restarting the watch)
185
+ var longRunningFilter = genericfilters .BasicLongRunningRequestCheck (sets .NewString ("watch" ), sets .NewString ())
186
+
172
187
func (r * crdHandler ) ServeHTTP (w http.ResponseWriter , req * http.Request ) {
173
188
ctx := req .Context ()
174
189
requestInfo , ok := apirequest .RequestInfoFrom (ctx )
@@ -238,7 +253,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
238
253
supportedTypes = append (supportedTypes , string (types .ApplyPatchType ))
239
254
}
240
255
241
- var handler http.HandlerFunc
256
+ var handlerFunc http.HandlerFunc
242
257
subresources , err := apiextensions .GetSubresourcesForVersion (crd , requestInfo .APIVersion )
243
258
if err != nil {
244
259
utilruntime .HandleError (err )
@@ -247,18 +262,19 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
247
262
}
248
263
switch {
249
264
case subresource == "status" && subresources != nil && subresources .Status != nil :
250
- handler = r .serveStatus (w , req , requestInfo , crdInfo , terminating , supportedTypes )
265
+ handlerFunc = r .serveStatus (w , req , requestInfo , crdInfo , terminating , supportedTypes )
251
266
case subresource == "scale" && subresources != nil && subresources .Scale != nil :
252
- handler = r .serveScale (w , req , requestInfo , crdInfo , terminating , supportedTypes )
267
+ handlerFunc = r .serveScale (w , req , requestInfo , crdInfo , terminating , supportedTypes )
253
268
case len (subresource ) == 0 :
254
- handler = r .serveResource (w , req , requestInfo , crdInfo , terminating , supportedTypes )
269
+ handlerFunc = r .serveResource (w , req , requestInfo , crdInfo , terminating , supportedTypes )
255
270
default :
256
271
http .Error (w , "the server could not find the requested resource" , http .StatusNotFound )
257
272
}
258
273
259
- if handler != nil {
260
- handler = metrics .InstrumentHandlerFunc (verb , requestInfo .APIGroup , requestInfo .APIVersion , resource , subresource , scope , metrics .APIServerComponent , handler )
261
- handler (w , req )
274
+ if handlerFunc != nil {
275
+ handlerFunc = metrics .InstrumentHandlerFunc (verb , requestInfo .APIGroup , requestInfo .APIVersion , resource , subresource , scope , metrics .APIServerComponent , handlerFunc )
276
+ handler := genericfilters .WithWaitGroup (handlerFunc , longRunningFilter , crdInfo .waitGroup )
277
+ handler .ServeHTTP (w , req )
262
278
return
263
279
}
264
280
}
@@ -365,18 +381,18 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})
365
381
366
382
klog .V (4 ).Infof ("Updating customresourcedefinition %s" , oldCRD .Name )
367
383
368
- // Copy because we cannot write to storageMap without a race
369
- // as it is used without locking elsewhere.
370
- storageMap2 := storageMap .clone ()
371
- if oldInfo , ok := storageMap2 [types .UID (oldCRD .UID )]; ok {
372
- for _ , storage := range oldInfo .storages {
373
- // destroy only the main storage. Those for the subresources share cacher and etcd clients.
374
- storage .CustomResource .DestroyFunc ()
375
- }
384
+ if oldInfo , ok := storageMap [types .UID (oldCRD .UID )]; ok {
385
+ // Copy because we cannot write to storageMap without a race
386
+ // as it is used without locking elsewhere.
387
+ storageMap2 := storageMap .clone ()
388
+
389
+ // Remove from the CRD info map and store the map
376
390
delete (storageMap2 , types .UID (oldCRD .UID ))
377
- }
391
+ r . customStorage . Store ( storageMap2 )
378
392
379
- r .customStorage .Store (storageMap2 )
393
+ // Tear down the old storage
394
+ go r .tearDown (oldInfo )
395
+ }
380
396
}
381
397
382
398
// removeDeadStorage removes REST storage that isn't being used
@@ -390,6 +406,7 @@ func (r *crdHandler) removeDeadStorage() {
390
406
r .customStorageLock .Lock ()
391
407
defer r .customStorageLock .Unlock ()
392
408
409
+ oldInfos := []* crdInfo {}
393
410
storageMap := r .customStorage .Load ().(crdStorageMap )
394
411
// Copy because we cannot write to storageMap without a race
395
412
// as it is used without locking elsewhere
@@ -404,14 +421,38 @@ func (r *crdHandler) removeDeadStorage() {
404
421
}
405
422
if ! found {
406
423
klog .V (4 ).Infof ("Removing dead CRD storage for %s/%s" , s .spec .Group , s .spec .Names .Kind )
407
- for _ , storage := range s .storages {
408
- // destroy only the main storage. Those for the subresources share cacher and etcd clients.
409
- storage .CustomResource .DestroyFunc ()
410
- }
424
+ oldInfos = append (oldInfos , s )
411
425
delete (storageMap2 , uid )
412
426
}
413
427
}
414
428
r .customStorage .Store (storageMap2 )
429
+
430
+ for _ , s := range oldInfos {
431
+ go r .tearDown (s )
432
+ }
433
+ }
434
+
435
+ // Wait up to a minute for requests to drain, then tear down storage
436
+ func (r * crdHandler ) tearDown (oldInfo * crdInfo ) {
437
+ requestsDrained := make (chan struct {})
438
+ go func () {
439
+ defer close (requestsDrained )
440
+ // Allow time for in-flight requests with a handle to the old info to register themselves
441
+ time .Sleep (time .Second )
442
+ // Wait for in-flight requests to drain
443
+ oldInfo .waitGroup .Wait ()
444
+ }()
445
+
446
+ select {
447
+ case <- time .After (r .requestTimeout * 2 ):
448
+ klog .Warningf ("timeout waiting for requests to drain for %s/%s, tearing down storage" , oldInfo .spec .Group , oldInfo .spec .Names .Kind )
449
+ case <- requestsDrained :
450
+ }
451
+
452
+ for _ , storage := range oldInfo .storages {
453
+ // destroy only the main storage. Those for the subresources share cacher and etcd clients.
454
+ storage .CustomResource .DestroyFunc ()
455
+ }
415
456
}
416
457
417
458
// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of
@@ -622,6 +663,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource
622
663
scaleRequestScopes : scaleScopes ,
623
664
statusRequestScopes : statusScopes ,
624
665
storageVersion : storageVersion ,
666
+ waitGroup : & utilwaitgroup.SafeWaitGroup {},
625
667
}
626
668
627
669
// Copy because we cannot write to storageMap without a race
0 commit comments