Skip to content

Commit c85c0e4

Browse files
authored
Merge pull request kubernetes#77816 from liggitt/graceful-crd
Graceful custom resource storage teardown
2 parents 7446929 + ee215ba commit c85c0e4

File tree

5 files changed

+194
-22
lines changed

5 files changed

+194
-22
lines changed

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ go_library(
5353
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
5454
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
5555
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
56+
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
5657
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
58+
"//staging/src/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
5759
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
5860
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
5961
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
@@ -68,6 +70,7 @@ go_library(
6870
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
6971
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
7072
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
73+
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
7174
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
7275
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
7376
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
187187
c.ExtraConfig.AuthResolverWrapper,
188188
c.ExtraConfig.MasterCount,
189189
s.GenericAPIServer.Authorizer,
190+
c.GenericConfig.RequestTimeout,
190191
)
191192
if err != nil {
192193
return nil, err

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ import (
5252
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
5353
"k8s.io/apimachinery/pkg/types"
5454
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
55+
"k8s.io/apimachinery/pkg/util/sets"
56+
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
5557
"k8s.io/apiserver/pkg/admission"
5658
"k8s.io/apiserver/pkg/authorization/authorizer"
5759
"k8s.io/apiserver/pkg/endpoints/handlers"
@@ -62,6 +64,7 @@ import (
6264
"k8s.io/apiserver/pkg/features"
6365
"k8s.io/apiserver/pkg/registry/generic"
6466
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
67+
genericfilters "k8s.io/apiserver/pkg/server/filters"
6568
"k8s.io/apiserver/pkg/storage/storagebackend"
6669
utilfeature "k8s.io/apiserver/pkg/util/feature"
6770
"k8s.io/apiserver/pkg/util/webhook"
@@ -100,6 +103,9 @@ type crdHandler struct {
100103

101104
// so that we can do create on update.
102105
authorizer authorizer.Authorizer
106+
107+
// request timeout we should delay storage teardown for
108+
requestTimeout time.Duration
103109
}
104110

105111
// crdInfo stores enough information to serve the storage for the custom resource
@@ -123,6 +129,8 @@ type crdInfo struct {
123129

124130
// storageVersion is the CRD version used when storing the object in etcd.
125131
storageVersion string
132+
133+
waitGroup *utilwaitgroup.SafeWaitGroup
126134
}
127135

128136
// crdStorageMap goes from customresourcedefinition to its storage
@@ -139,7 +147,8 @@ func NewCustomResourceDefinitionHandler(
139147
serviceResolver webhook.ServiceResolver,
140148
authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
141149
masterCount int,
142-
authorizer authorizer.Authorizer) (*crdHandler, error) {
150+
authorizer authorizer.Authorizer,
151+
requestTimeout time.Duration) (*crdHandler, error) {
143152
ret := &crdHandler{
144153
versionDiscoveryHandler: versionDiscoveryHandler,
145154
groupDiscoveryHandler: groupDiscoveryHandler,
@@ -151,6 +160,7 @@ func NewCustomResourceDefinitionHandler(
151160
establishingController: establishingController,
152161
masterCount: masterCount,
153162
authorizer: authorizer,
163+
requestTimeout: requestTimeout,
154164
}
155165
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
156166
UpdateFunc: ret.updateCustomResourceDefinition,
@@ -169,6 +179,11 @@ func NewCustomResourceDefinitionHandler(
169179
return ret, nil
170180
}
171181

182+
// watches are expected to handle storage disruption gracefully,
183+
// both on the server-side (by terminating the watch connection)
184+
// and on the client side (by restarting the watch)
185+
var longRunningFilter = genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString())
186+
172187
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
173188
ctx := req.Context()
174189
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@@ -238,7 +253,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
238253
supportedTypes = append(supportedTypes, string(types.ApplyPatchType))
239254
}
240255

241-
var handler http.HandlerFunc
256+
var handlerFunc http.HandlerFunc
242257
subresources, err := apiextensions.GetSubresourcesForVersion(crd, requestInfo.APIVersion)
243258
if err != nil {
244259
utilruntime.HandleError(err)
@@ -247,18 +262,19 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
247262
}
248263
switch {
249264
case subresource == "status" && subresources != nil && subresources.Status != nil:
250-
handler = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
265+
handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
251266
case subresource == "scale" && subresources != nil && subresources.Scale != nil:
252-
handler = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
267+
handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
253268
case len(subresource) == 0:
254-
handler = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
269+
handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
255270
default:
256271
http.Error(w, "the server could not find the requested resource", http.StatusNotFound)
257272
}
258273

259-
if handler != nil {
260-
handler = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handler)
261-
handler(w, req)
274+
if handlerFunc != nil {
275+
handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc)
276+
handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup)
277+
handler.ServeHTTP(w, req)
262278
return
263279
}
264280
}
@@ -365,18 +381,18 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})
365381

366382
klog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
367383

368-
// Copy because we cannot write to storageMap without a race
369-
// as it is used without locking elsewhere.
370-
storageMap2 := storageMap.clone()
371-
if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok {
372-
for _, storage := range oldInfo.storages {
373-
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
374-
storage.CustomResource.DestroyFunc()
375-
}
384+
if oldInfo, ok := storageMap[types.UID(oldCRD.UID)]; ok {
385+
// Copy because we cannot write to storageMap without a race
386+
// as it is used without locking elsewhere.
387+
storageMap2 := storageMap.clone()
388+
389+
// Remove from the CRD info map and store the map
376390
delete(storageMap2, types.UID(oldCRD.UID))
377-
}
391+
r.customStorage.Store(storageMap2)
378392

379-
r.customStorage.Store(storageMap2)
393+
// Tear down the old storage
394+
go r.tearDown(oldInfo)
395+
}
380396
}
381397

382398
// removeDeadStorage removes REST storage that isn't being used
@@ -390,6 +406,7 @@ func (r *crdHandler) removeDeadStorage() {
390406
r.customStorageLock.Lock()
391407
defer r.customStorageLock.Unlock()
392408

409+
oldInfos := []*crdInfo{}
393410
storageMap := r.customStorage.Load().(crdStorageMap)
394411
// Copy because we cannot write to storageMap without a race
395412
// as it is used without locking elsewhere
@@ -404,14 +421,38 @@ func (r *crdHandler) removeDeadStorage() {
404421
}
405422
if !found {
406423
klog.V(4).Infof("Removing dead CRD storage for %s/%s", s.spec.Group, s.spec.Names.Kind)
407-
for _, storage := range s.storages {
408-
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
409-
storage.CustomResource.DestroyFunc()
410-
}
424+
oldInfos = append(oldInfos, s)
411425
delete(storageMap2, uid)
412426
}
413427
}
414428
r.customStorage.Store(storageMap2)
429+
430+
for _, s := range oldInfos {
431+
go r.tearDown(s)
432+
}
433+
}
434+
435+
// Wait up to a minute for requests to drain, then tear down storage
436+
func (r *crdHandler) tearDown(oldInfo *crdInfo) {
437+
requestsDrained := make(chan struct{})
438+
go func() {
439+
defer close(requestsDrained)
440+
// Allow time for in-flight requests with a handle to the old info to register themselves
441+
time.Sleep(time.Second)
442+
// Wait for in-flight requests to drain
443+
oldInfo.waitGroup.Wait()
444+
}()
445+
446+
select {
447+
case <-time.After(r.requestTimeout * 2):
448+
klog.Warningf("timeout waiting for requests to drain for %s/%s, tearing down storage", oldInfo.spec.Group, oldInfo.spec.Names.Kind)
449+
case <-requestsDrained:
450+
}
451+
452+
for _, storage := range oldInfo.storages {
453+
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
454+
storage.CustomResource.DestroyFunc()
455+
}
415456
}
416457

417458
// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of
@@ -622,6 +663,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource
622663
scaleRequestScopes: scaleScopes,
623664
statusRequestScopes: statusScopes,
624665
storageVersion: storageVersion,
666+
waitGroup: &utilwaitgroup.SafeWaitGroup{},
625667
}
626668

627669
// Copy because we cannot write to storageMap without a race

staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_test(
1111
srcs = [
1212
"apply_test.go",
1313
"basic_test.go",
14+
"change_test.go",
1415
"finalization_test.go",
1516
"objectmeta_test.go",
1617
"registration_test.go",
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
26+
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
27+
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
28+
apierrors "k8s.io/apimachinery/pkg/api/errors"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/client-go/dynamic"
31+
)
32+
33+
func TestChangeCRD(t *testing.T) {
34+
tearDown, config, _, err := fixtures.StartDefaultServer(t)
35+
if err != nil {
36+
t.Fatal(err)
37+
}
38+
defer tearDown()
39+
config.QPS = 1000
40+
config.Burst = 1000
41+
apiExtensionsClient, err := clientset.NewForConfig(config)
42+
if err != nil {
43+
t.Fatal(err)
44+
}
45+
dynamicClient, err := dynamic.NewForConfig(config)
46+
if err != nil {
47+
t.Fatal(err)
48+
}
49+
50+
noxuDefinition := fixtures.NewNoxuCustomResourceDefinition(apiextensionsv1beta1.NamespaceScoped)
51+
noxuDefinition, err = fixtures.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionsClient, dynamicClient)
52+
if err != nil {
53+
t.Fatal(err)
54+
}
55+
56+
ns := "default"
57+
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
58+
59+
stopChan := make(chan struct{})
60+
61+
wg := &sync.WaitGroup{}
62+
63+
// Set up loop to modify CRD in the background
64+
wg.Add(1)
65+
go func() {
66+
defer wg.Done()
67+
for {
68+
select {
69+
case <-stopChan:
70+
return
71+
default:
72+
}
73+
74+
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(noxuDefinition.Name, metav1.GetOptions{})
75+
if err != nil {
76+
t.Fatal(err)
77+
}
78+
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
79+
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
80+
v2.Name = "v2"
81+
v2.Served = true
82+
v2.Storage = false
83+
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
84+
} else {
85+
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
86+
}
87+
if _, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(noxuDefinitionToUpdate); err != nil && !apierrors.IsConflict(err) {
88+
t.Fatal(err)
89+
}
90+
time.Sleep(10 * time.Millisecond)
91+
}
92+
}()
93+
94+
// Set up 100 loops creating and reading custom resources
95+
for i := 0; i < 100; i++ {
96+
wg.Add(1)
97+
go func(i int) {
98+
defer wg.Done()
99+
noxuInstanceToCreate := fixtures.NewNoxuInstance(ns, fmt.Sprintf("foo-%d", i))
100+
if _, err := noxuNamespacedResourceClient.Create(noxuInstanceToCreate, metav1.CreateOptions{}); err != nil {
101+
t.Fatal(err)
102+
}
103+
for {
104+
select {
105+
case <-stopChan:
106+
return
107+
default:
108+
if _, err := noxuNamespacedResourceClient.Get(noxuInstanceToCreate.GetName(), metav1.GetOptions{}); err != nil {
109+
t.Fatal(err)
110+
}
111+
}
112+
time.Sleep(10 * time.Millisecond)
113+
}
114+
}(i)
115+
}
116+
117+
// Let all the established get request loops soak
118+
time.Sleep(5 * time.Second)
119+
120+
// Tear down
121+
close(stopChan)
122+
123+
// Let loops drain
124+
wg.Wait()
125+
}

0 commit comments

Comments
 (0)