Skip to content

Commit 4836171

Browse files
committed
add the storageversion.Manager interface
1 parent 00a3db0 commit 4836171

File tree

3 files changed

+173
-142
lines changed

3 files changed

+173
-142
lines changed

staging/src/k8s.io/apiserver/pkg/storageversion/manager.go

Lines changed: 127 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -17,178 +17,203 @@ limitations under the License.
1717
package storageversion
1818

1919
import (
20+
"fmt"
2021
"sync"
2122
"sync/atomic"
2223

23-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
"k8s.io/apimachinery/pkg/runtime"
2525
"k8s.io/apimachinery/pkg/runtime/schema"
26-
apiserverclientset "k8s.io/apiserver/pkg/client/clientset_generated/clientset"
26+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27+
"k8s.io/client-go/kubernetes"
2728
"k8s.io/client-go/rest"
2829
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
29-
"k8s.io/klog"
30+
"k8s.io/klog/v2"
3031
)
3132

3233
// ResourceInfo contains the information to register the resource to the
3334
// storage version API.
3435
type ResourceInfo struct {
35-
Resource metav1.APIResource
36-
// We use a standalone Group instead of reusing the Resource.Group
37-
// because Resource.Group is often omitted, see the comment on
38-
// Resource.Group for why it's omitted.
39-
Group string
40-
EncodingVersion string
41-
DecodableVersions []string
36+
GroupResource schema.GroupResource
37+
38+
EncodingVersion string
39+
// Used to calculate decodable versions. Can only be used after all
40+
// equivalent versions are registered by InstallREST.
4241
EquivalentResourceMapper runtime.EquivalentResourceRegistry
4342
}
4443

4544
// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions.
4645
type Manager interface {
47-
// AddResourceInfo adds ResourceInfo to the manager.
46+
// AddResourceInfo records resources whose StorageVersions need updates
4847
AddResourceInfo(resources ...*ResourceInfo)
49-
// RemoveResourceInfo removes ResourceInfo from the manager.
50-
RemoveResourceInfo(r *ResourceInfo)
51-
// UpdatesPending returns if the StorageVersion of a resource is still wait to be updated.
52-
UpdatesPending(group, resource string) bool
53-
54-
// UpdateStorageVersions updates the StorageVersions.
55-
UpdateStorageVersions(loopbackClientConfig *rest.Config, apiserverID string)
56-
// Completed returns if updating StorageVersions has completed.
48+
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
49+
UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string)
50+
// PendingUpdate returns true if the StorageVersion of the given resource is still pending update.
51+
PendingUpdate(gr schema.GroupResource) bool
52+
// LastUpdateError returns the last error hit when updating the storage version of the given resource.
53+
LastUpdateError(gr schema.GroupResource) error
54+
// Completed returns true if updating StorageVersions of all recorded resources has completed.
5755
Completed() bool
5856
}
5957

60-
var _ Manager = &DefaultManager{}
58+
var _ Manager = &defaultManager{}
59+
60+
// defaultManager indicates if an apiserver has completed reporting its storage versions.
61+
type defaultManager struct {
62+
completed atomic.Value
63+
64+
mu sync.RWMutex
65+
// managedResourceInfos records the ResourceInfos whose StorageVersions will get updated in the next
66+
// UpdateStorageVersions call
67+
managedResourceInfos map[*ResourceInfo]struct{}
68+
// managedStatus records the update status of StorageVersion for each GroupResource. Since one
69+
// ResourceInfo may expand into multiple GroupResource (e.g. ingresses.networking.k8s.io and ingresses.extensions),
70+
// this map allows quick status lookup for a GroupResource, during API request handling.
71+
managedStatus map[schema.GroupResource]*updateStatus
72+
}
73+
74+
type updateStatus struct {
75+
done bool
76+
lastErr error
77+
}
6178

62-
// NewDefaultManager creates a new DefaultManager.
63-
func NewDefaultManager() *DefaultManager {
64-
s := &DefaultManager{}
79+
// NewDefaultManager creates a new defaultManager.
80+
func NewDefaultManager() Manager {
81+
s := &defaultManager{}
6582
s.completed.Store(false)
66-
s.groupResources = make(map[string]map[string]struct{})
67-
s.resources = make(map[*ResourceInfo]struct{})
83+
s.managedResourceInfos = make(map[*ResourceInfo]struct{})
84+
s.managedStatus = make(map[schema.GroupResource]*updateStatus)
6885
return s
6986
}
7087

7188
// AddResourceInfo adds ResourceInfo to the manager.
72-
// This is not thread-safe. It is expected to be called when the apiserver is installing the endpoints, which is done serially.
73-
func (s *DefaultManager) AddResourceInfo(resources ...*ResourceInfo) {
89+
func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) {
90+
s.mu.Lock()
91+
defer s.mu.Unlock()
7492
for _, r := range resources {
75-
s.resources[r] = struct{}{}
76-
s.addGroupResourceFor(r)
93+
s.managedResourceInfos[r] = struct{}{}
94+
s.addPendingManagedStatusLocked(r)
7795
}
7896
}
7997

80-
func (s *DefaultManager) addGroupResourceFor(r *ResourceInfo) {
81-
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(schema.GroupVersionResource{
82-
Group: r.Group,
83-
Resource: r.Resource.Name,
84-
}, "")
98+
func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) {
99+
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
85100
for _, gvr := range gvrs {
86-
s.addGroupResource(gvr.Group, gvr.Resource)
101+
s.managedStatus[gvr.GroupResource()] = &updateStatus{}
87102
}
88103
}
89104

90-
func (s *DefaultManager) addGroupResource(group, resource string) {
91-
if _, ok := s.groupResources[group]; !ok {
92-
s.groupResources[group] = make(map[string]struct{})
105+
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
106+
func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) {
107+
clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
108+
if err != nil {
109+
utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err))
110+
return
111+
}
112+
sc := clientset.InternalV1alpha1().StorageVersions()
113+
114+
s.mu.RLock()
115+
resources := make([]*ResourceInfo, len(s.managedResourceInfos))
116+
for resource := range s.managedResourceInfos {
117+
resources = append(resources, resource)
93118
}
94-
s.groupResources[group][resource] = struct{}{}
119+
s.mu.RUnlock()
120+
hasFailure := false
121+
for _, r := range resources {
122+
dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource)
123+
if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil {
124+
utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err))
125+
s.recordStatusFailure(r, err)
126+
hasFailure = true
127+
continue
128+
}
129+
klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource)
130+
s.recordStatusSuccess(r)
131+
}
132+
if hasFailure {
133+
return
134+
}
135+
klog.V(2).Infof("storage version updates complete")
136+
s.setComplete()
95137
}
96138

97-
// RemoveResourceInfo removes ResourceInfo from the manager.
98-
// It is not safe to call this function concurrently with AddResourceInfo.
99-
func (s *DefaultManager) RemoveResourceInfo(r *ResourceInfo) {
139+
// recordStatusSuccess marks updated ResourceInfo as completed.
140+
func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) {
100141
s.mu.Lock()
101142
defer s.mu.Unlock()
102-
delete(s.resources, r)
103-
s.removeGroupResourceFor(r)
143+
s.recordStatusSuccessLocked(r)
104144
}
105145

106-
func (s *DefaultManager) removeGroupResourceFor(r *ResourceInfo) {
107-
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(schema.GroupVersionResource{
108-
Group: r.Group,
109-
Resource: r.Resource.Name,
110-
}, "")
146+
func (s *defaultManager) recordStatusSuccessLocked(r *ResourceInfo) {
147+
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
111148
for _, gvr := range gvrs {
112-
s.removeGroupResource(gvr.Group, gvr.Version)
149+
s.recordSuccessGroupResourceLocked(gvr.GroupResource())
113150
}
114151
}
115152

116-
func (s *DefaultManager) removeGroupResource(group, resource string) {
117-
if _, ok := s.groupResources[group]; !ok {
153+
func (s *defaultManager) recordSuccessGroupResourceLocked(gr schema.GroupResource) {
154+
if _, ok := s.managedStatus[gr]; !ok {
118155
return
119156
}
120-
delete(s.groupResources[group], resource)
121-
if len(s.groupResources[group]) == 0 {
122-
delete(s.groupResources, group)
157+
s.managedStatus[gr].done = true
158+
s.managedStatus[gr].lastErr = nil
159+
}
160+
161+
// recordStatusFailure records latest error updating ResourceInfo.
162+
func (s *defaultManager) recordStatusFailure(r *ResourceInfo, err error) {
163+
s.mu.Lock()
164+
defer s.mu.Unlock()
165+
s.recordStatusFailureLocked(r, err)
166+
}
167+
168+
func (s *defaultManager) recordStatusFailureLocked(r *ResourceInfo, err error) {
169+
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
170+
for _, gvr := range gvrs {
171+
s.recordErrorGroupResourceLocked(gvr.GroupResource(), err)
172+
}
173+
}
174+
175+
func (s *defaultManager) recordErrorGroupResourceLocked(gr schema.GroupResource, err error) {
176+
if _, ok := s.managedStatus[gr]; !ok {
177+
return
123178
}
179+
s.managedStatus[gr].lastErr = err
124180
}
125181

126-
// UpdatesPending returns if the StorageVersion of a resource is still wait to be updated.
127-
func (s *DefaultManager) UpdatesPending(group, resource string) bool {
182+
// PendingUpdate returns if the StorageVersion of a resource is still wait to be updated.
183+
func (s *defaultManager) PendingUpdate(gr schema.GroupResource) bool {
128184
s.mu.RLock()
129185
defer s.mu.RUnlock()
130-
if _, ok := s.groupResources[group]; !ok {
186+
if _, ok := s.managedStatus[gr]; !ok {
131187
return false
132188
}
133-
_, ok := s.groupResources[group][resource]
134-
return ok
189+
return !s.managedStatus[gr].done
135190
}
136191

137-
// DefaultManager indicates if the aggregator, kube-apiserver, and the
138-
// apiextensions-apiserver have completed reporting their storage versions.
139-
type DefaultManager struct {
140-
completed atomic.Value
141-
142-
mu sync.RWMutex
143-
resources map[*ResourceInfo]struct{}
144-
groupResources map[string]map[string]struct{}
192+
// LastUpdateError returns the last error hit when updating the storage version of the given resource.
193+
func (s *defaultManager) LastUpdateError(gr schema.GroupResource) error {
194+
s.mu.RLock()
195+
defer s.mu.RUnlock()
196+
if _, ok := s.managedStatus[gr]; !ok {
197+
return fmt.Errorf("couldn't find managed status for %v", gr)
198+
}
199+
return s.managedStatus[gr].lastErr
145200
}
146201

147202
// setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore.
148-
func (s *DefaultManager) setComplete() {
203+
func (s *defaultManager) setComplete() {
149204
s.completed.Store(true)
150205
}
151206

152207
// Completed returns if updating StorageVersions has completed.
153-
func (s *DefaultManager) Completed() bool {
208+
func (s *defaultManager) Completed() bool {
154209
return s.completed.Load().(bool)
155210
}
156211

157-
func decodableVersions(e runtime.EquivalentResourceRegistry, group string, resource string) []string {
212+
func decodableVersions(e runtime.EquivalentResourceRegistry, gr schema.GroupResource) []string {
158213
var versions []string
159-
decodingGVRs := e.EquivalentResourcesFor(schema.GroupVersionResource{
160-
Group: group,
161-
Resource: resource,
162-
}, "")
214+
decodingGVRs := e.EquivalentResourcesFor(gr.WithVersion(""), "")
163215
for _, v := range decodingGVRs {
164216
versions = append(versions, v.GroupVersion().String())
165217
}
166218
return versions
167219
}
168-
169-
// UpdateStorageVersions updates the StorageVersions. If the updates are
170-
// successful, following calls to Completed() returns true.
171-
func (s *DefaultManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string) {
172-
cfg := rest.AddUserAgent(loopbackClientConfig, "system:kube-apiserver")
173-
clientset, err := apiserverclientset.NewForConfig(cfg)
174-
if err != nil {
175-
klog.Fatalf("failed to get clientset: %v", err)
176-
return
177-
}
178-
sc := clientset.InternalV1alpha1().StorageVersions()
179-
180-
s.mu.RLock()
181-
resources := s.resources
182-
s.mu.RUnlock()
183-
for r := range resources {
184-
r.DecodableVersions = decodableVersions(r.EquivalentResourceMapper, r.Group, r.Resource.Name)
185-
if err := updateStorageVersionFor(sc, serverID, r.Group+"."+r.Resource.Name, r.EncodingVersion, r.DecodableVersions); err != nil {
186-
klog.Fatalf("failed to update storage version for %v", r.Resource.Name)
187-
return
188-
}
189-
klog.V(2).Infof("successfully updated storage version for %v", r.Resource.Name)
190-
s.RemoveResourceInfo(r)
191-
}
192-
klog.V(2).Infof("storage version updates complete")
193-
s.setComplete()
194-
}

0 commit comments

Comments
 (0)