Skip to content

Commit 6d01c5a

Browse files
authored
Merge pull request kubernetes#92459 from roycaihw/storage-version/manager
Add storageversion manager interface
2 parents 8647eec + 184b460 commit 6d01c5a

File tree

6 files changed

+497
-0
lines changed

6 files changed

+497
-0
lines changed

staging/src/k8s.io/apiserver/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ filegroup(
4141
"//staging/src/k8s.io/apiserver/pkg/registry:all-srcs",
4242
"//staging/src/k8s.io/apiserver/pkg/server:all-srcs",
4343
"//staging/src/k8s.io/apiserver/pkg/storage:all-srcs",
44+
"//staging/src/k8s.io/apiserver/pkg/storageversion:all-srcs",
4445
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
4546
"//staging/src/k8s.io/apiserver/pkg/util/cache:all-srcs",
4647
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = [
6+
"manager.go",
7+
"updater.go",
8+
],
9+
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storageversion",
10+
importpath = "k8s.io/apiserver/pkg/storageversion",
11+
visibility = ["//visibility:public"],
12+
deps = [
13+
"//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
16+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
17+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
18+
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
19+
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
20+
"//staging/src/k8s.io/client-go/rest:go_default_library",
21+
"//staging/src/k8s.io/component-base/metrics/prometheus/workqueue:go_default_library",
22+
"//vendor/k8s.io/klog/v2:go_default_library",
23+
],
24+
)
25+
26+
go_test(
27+
name = "go_default_test",
28+
srcs = ["updater_test.go"],
29+
embed = [":go_default_library"],
30+
deps = [
31+
"//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library",
32+
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
33+
],
34+
)
35+
36+
filegroup(
37+
name = "package-srcs",
38+
srcs = glob(["**"]),
39+
tags = ["automanaged"],
40+
visibility = ["//visibility:private"],
41+
)
42+
43+
filegroup(
44+
name = "all-srcs",
45+
srcs = [":package-srcs"],
46+
tags = ["automanaged"],
47+
visibility = ["//visibility:public"],
48+
)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# See the OWNERS docs at https://go.k8s.io/owners
2+
3+
approvers:
4+
- caesarxuchao
5+
- roycaihw
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package storageversion
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
24+
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/runtime/schema"
26+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/rest"
29+
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
30+
"k8s.io/klog/v2"
31+
)
32+
33+
// ResourceInfo contains the information to register the resource to the
34+
// storage version API.
35+
type ResourceInfo struct {
36+
GroupResource schema.GroupResource
37+
38+
EncodingVersion string
39+
// Used to calculate decodable versions. Can only be used after all
40+
// equivalent versions are registered by InstallREST.
41+
EquivalentResourceMapper runtime.EquivalentResourceRegistry
42+
}
43+
44+
// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions.
45+
type Manager interface {
46+
// AddResourceInfo records resources whose StorageVersions need updates
47+
AddResourceInfo(resources ...*ResourceInfo)
48+
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
49+
UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string)
50+
// PendingUpdate returns true if the StorageVersion of the given resource is still pending update.
51+
PendingUpdate(gr schema.GroupResource) bool
52+
// LastUpdateError returns the last error hit when updating the storage version of the given resource.
53+
LastUpdateError(gr schema.GroupResource) error
54+
// Completed returns true if updating StorageVersions of all recorded resources has completed.
55+
Completed() bool
56+
}
57+
58+
var _ Manager = &defaultManager{}
59+
60+
// defaultManager indicates if an apiserver has completed reporting its storage versions.
61+
type defaultManager struct {
62+
completed atomic.Value
63+
64+
mu sync.RWMutex
65+
// managedResourceInfos records the ResourceInfos whose StorageVersions will get updated in the next
66+
// UpdateStorageVersions call
67+
managedResourceInfos map[*ResourceInfo]struct{}
68+
// managedStatus records the update status of StorageVersion for each GroupResource. Since one
69+
// ResourceInfo may expand into multiple GroupResource (e.g. ingresses.networking.k8s.io and ingresses.extensions),
70+
// this map allows quick status lookup for a GroupResource, during API request handling.
71+
managedStatus map[schema.GroupResource]*updateStatus
72+
}
73+
74+
type updateStatus struct {
75+
done bool
76+
lastErr error
77+
}
78+
79+
// NewDefaultManager creates a new defaultManager.
80+
func NewDefaultManager() Manager {
81+
s := &defaultManager{}
82+
s.completed.Store(false)
83+
s.managedResourceInfos = make(map[*ResourceInfo]struct{})
84+
s.managedStatus = make(map[schema.GroupResource]*updateStatus)
85+
return s
86+
}
87+
88+
// AddResourceInfo adds ResourceInfo to the manager.
89+
func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) {
90+
s.mu.Lock()
91+
defer s.mu.Unlock()
92+
for _, r := range resources {
93+
s.managedResourceInfos[r] = struct{}{}
94+
s.addPendingManagedStatusLocked(r)
95+
}
96+
}
97+
98+
func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) {
99+
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
100+
for _, gvr := range gvrs {
101+
s.managedStatus[gvr.GroupResource()] = &updateStatus{}
102+
}
103+
}
104+
105+
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
106+
func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) {
107+
clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
108+
if err != nil {
109+
utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err))
110+
return
111+
}
112+
sc := clientset.InternalV1alpha1().StorageVersions()
113+
114+
s.mu.RLock()
115+
resources := make([]*ResourceInfo, len(s.managedResourceInfos))
116+
for resource := range s.managedResourceInfos {
117+
resources = append(resources, resource)
118+
}
119+
s.mu.RUnlock()
120+
hasFailure := false
121+
for _, r := range resources {
122+
dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource)
123+
if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil {
124+
utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err))
125+
s.recordStatusFailure(r, err)
126+
hasFailure = true
127+
continue
128+
}
129+
klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource)
130+
s.recordStatusSuccess(r)
131+
}
132+
if hasFailure {
133+
return
134+
}
135+
klog.V(2).Infof("storage version updates complete")
136+
s.setComplete()
137+
}
138+
139+
// recordStatusSuccess marks updated ResourceInfo as completed.
140+
func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) {
141+
s.mu.Lock()
142+
defer s.mu.Unlock()
143+
s.recordStatusSuccessLocked(r)
144+
}
145+
146+
func (s *defaultManager) recordStatusSuccessLocked(r *ResourceInfo) {
147+
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
148+
for _, gvr := range gvrs {
149+
s.recordSuccessGroupResourceLocked(gvr.GroupResource())
150+
}
151+
}
152+
153+
func (s *defaultManager) recordSuccessGroupResourceLocked(gr schema.GroupResource) {
154+
if _, ok := s.managedStatus[gr]; !ok {
155+
return
156+
}
157+
s.managedStatus[gr].done = true
158+
s.managedStatus[gr].lastErr = nil
159+
}
160+
161+
// recordStatusFailure records latest error updating ResourceInfo.
162+
func (s *defaultManager) recordStatusFailure(r *ResourceInfo, err error) {
163+
s.mu.Lock()
164+
defer s.mu.Unlock()
165+
s.recordStatusFailureLocked(r, err)
166+
}
167+
168+
func (s *defaultManager) recordStatusFailureLocked(r *ResourceInfo, err error) {
169+
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
170+
for _, gvr := range gvrs {
171+
s.recordErrorGroupResourceLocked(gvr.GroupResource(), err)
172+
}
173+
}
174+
175+
func (s *defaultManager) recordErrorGroupResourceLocked(gr schema.GroupResource, err error) {
176+
if _, ok := s.managedStatus[gr]; !ok {
177+
return
178+
}
179+
s.managedStatus[gr].lastErr = err
180+
}
181+
182+
// PendingUpdate returns if the StorageVersion of a resource is still wait to be updated.
183+
func (s *defaultManager) PendingUpdate(gr schema.GroupResource) bool {
184+
s.mu.RLock()
185+
defer s.mu.RUnlock()
186+
if _, ok := s.managedStatus[gr]; !ok {
187+
return false
188+
}
189+
return !s.managedStatus[gr].done
190+
}
191+
192+
// LastUpdateError returns the last error hit when updating the storage version of the given resource.
193+
func (s *defaultManager) LastUpdateError(gr schema.GroupResource) error {
194+
s.mu.RLock()
195+
defer s.mu.RUnlock()
196+
if _, ok := s.managedStatus[gr]; !ok {
197+
return fmt.Errorf("couldn't find managed status for %v", gr)
198+
}
199+
return s.managedStatus[gr].lastErr
200+
}
201+
202+
// setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore.
203+
func (s *defaultManager) setComplete() {
204+
s.completed.Store(true)
205+
}
206+
207+
// Completed returns if updating StorageVersions has completed.
208+
func (s *defaultManager) Completed() bool {
209+
return s.completed.Load().(bool)
210+
}
211+
212+
func decodableVersions(e runtime.EquivalentResourceRegistry, gr schema.GroupResource) []string {
213+
var versions []string
214+
decodingGVRs := e.EquivalentResourcesFor(gr.WithVersion(""), "")
215+
for _, v := range decodingGVRs {
216+
versions = append(versions, v.GroupVersion().String())
217+
}
218+
return versions
219+
}

0 commit comments

Comments
 (0)