Skip to content

Commit 3f9673b

Browse files
authored
Merge pull request kubernetes#70994 from mborsz/cache
Refactor the memcached discovery client
2 parents ea6acb3 + c94bee0 commit 3f9673b

File tree

4 files changed

+377
-63
lines changed

4 files changed

+377
-63
lines changed

staging/src/k8s.io/client-go/discovery/cached/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_test(
1111
srcs = ["memcache_test.go"],
1212
embed = [":go_default_library"],
1313
deps = [
14+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
1415
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
1516
"//staging/src/k8s.io/client-go/discovery/fake:go_default_library",
1617
],
@@ -22,6 +23,7 @@ go_library(
2223
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/discovery/cached",
2324
importpath = "k8s.io/client-go/discovery/cached",
2425
deps = [
26+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
2527
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
2628
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
2729
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",

staging/src/k8s.io/client-go/discovery/cached/memcache.go

Lines changed: 96 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,102 @@ package cached
1919
import (
2020
"errors"
2121
"fmt"
22+
"net"
23+
"net/url"
2224
"sync"
25+
"syscall"
2326

2427
"github.com/googleapis/gnostic/OpenAPIv2"
2528

29+
errorsutil "k8s.io/apimachinery/pkg/api/errors"
2630
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2731
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2832
"k8s.io/apimachinery/pkg/version"
2933
"k8s.io/client-go/discovery"
3034
restclient "k8s.io/client-go/rest"
3135
)
3236

37+
type cacheEntry struct {
38+
resourceList *metav1.APIResourceList
39+
err error
40+
}
41+
3342
// memCacheClient can Invalidate() to stay up-to-date with discovery
3443
// information.
3544
//
36-
// TODO: Switch to a watch interface. Right now it will poll anytime
37-
// Invalidate() is called.
45+
// TODO: Switch to a watch interface. Right now it will poll after each
46+
// Invalidate() call.
3847
type memCacheClient struct {
3948
delegate discovery.DiscoveryInterface
4049

4150
lock sync.RWMutex
42-
groupToServerResources map[string]*metav1.APIResourceList
51+
groupToServerResources map[string]*cacheEntry
4352
groupList *metav1.APIGroupList
4453
cacheValid bool
4554
}
4655

4756
// Error Constants
4857
var (
49-
ErrCacheEmpty = errors.New("the cache has not been filled yet")
5058
ErrCacheNotFound = errors.New("not found")
5159
)
5260

5361
var _ discovery.CachedDiscoveryInterface = &memCacheClient{}
5462

63+
// isTransientConnectionError checks whether given error is "Connection refused" or
64+
// "Connection reset" error which usually means that apiserver is temporarily
65+
// unavailable.
66+
func isTransientConnectionError(err error) bool {
67+
urlError, ok := err.(*url.Error)
68+
if !ok {
69+
return false
70+
}
71+
opError, ok := urlError.Err.(*net.OpError)
72+
if !ok {
73+
return false
74+
}
75+
errno, ok := opError.Err.(syscall.Errno)
76+
if !ok {
77+
return false
78+
}
79+
return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET
80+
}
81+
82+
func isTransientError(err error) bool {
83+
if isTransientConnectionError(err) {
84+
return true
85+
}
86+
87+
if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 {
88+
return true
89+
}
90+
91+
return errorsutil.IsTooManyRequests(err)
92+
}
93+
5594
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
5695
func (d *memCacheClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
57-
d.lock.RLock()
58-
defer d.lock.RUnlock()
96+
d.lock.Lock()
97+
defer d.lock.Unlock()
5998
if !d.cacheValid {
60-
return nil, ErrCacheEmpty
99+
if err := d.refreshLocked(); err != nil {
100+
return nil, err
101+
}
61102
}
62103
cachedVal, ok := d.groupToServerResources[groupVersion]
63104
if !ok {
64105
return nil, ErrCacheNotFound
65106
}
66-
return cachedVal, nil
107+
108+
if cachedVal.err != nil && isTransientError(cachedVal.err) {
109+
r, err := d.serverResourcesForGroupVersion(groupVersion)
110+
if err != nil {
111+
utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err))
112+
}
113+
cachedVal = &cacheEntry{r, err}
114+
d.groupToServerResources[groupVersion] = cachedVal
115+
}
116+
117+
return cachedVal.resourceList, cachedVal.err
67118
}
68119

69120
// ServerResources returns the supported resources for all groups and versions.
@@ -72,10 +123,12 @@ func (d *memCacheClient) ServerResources() ([]*metav1.APIResourceList, error) {
72123
}
73124

74125
func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
75-
d.lock.RLock()
76-
defer d.lock.RUnlock()
77-
if d.groupList == nil {
78-
return nil, ErrCacheEmpty
126+
d.lock.Lock()
127+
defer d.lock.Unlock()
128+
if !d.cacheValid {
129+
if err := d.refreshLocked(); err != nil {
130+
return nil, err
131+
}
79132
}
80133
return d.groupList, nil
81134
}
@@ -103,49 +156,59 @@ func (d *memCacheClient) OpenAPISchema() (*openapi_v2.Document, error) {
103156
func (d *memCacheClient) Fresh() bool {
104157
d.lock.RLock()
105158
defer d.lock.RUnlock()
106-
// Fresh is supposed to tell the caller whether or not to retry if the cache
107-
// fails to find something. The idea here is that Invalidate will be called
108-
// periodically and therefore we'll always be returning the latest data. (And
109-
// in the future we can watch and stay even more up-to-date.) So we only
110-
// return false if the cache has never been filled.
159+
// Return whether the cache is populated at all. It is still possible that
160+
// a single entry is missing due to transient errors and the attempt to read
161+
// that entry will trigger retry.
111162
return d.cacheValid
112163
}
113164

114-
// Invalidate refreshes the cache, blocking calls until the cache has been
115-
// refreshed. It would be trivial to make a version that does this in the
116-
// background while continuing to respond to requests if needed.
165+
// Invalidate enforces that no cached data that is older than the current time
166+
// is used.
117167
func (d *memCacheClient) Invalidate() {
118168
d.lock.Lock()
119169
defer d.lock.Unlock()
170+
d.cacheValid = false
171+
d.groupToServerResources = nil
172+
d.groupList = nil
173+
}
120174

175+
// refreshLocked refreshes the state of cache. The caller must hold d.lock for
176+
// writing.
177+
func (d *memCacheClient) refreshLocked() error {
121178
// TODO: Could this multiplicative set of calls be replaced by a single call
122179
// to ServerResources? If it's possible for more than one resulting
123180
// APIResourceList to have the same GroupVersion, the lists would need merged.
124181
gl, err := d.delegate.ServerGroups()
125182
if err != nil || len(gl.Groups) == 0 {
126-
utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list; will keep using cached value. (%v)", err))
127-
return
183+
utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
184+
return err
128185
}
129186

130-
rl := map[string]*metav1.APIResourceList{}
187+
rl := map[string]*cacheEntry{}
131188
for _, g := range gl.Groups {
132189
for _, v := range g.Versions {
133-
r, err := d.delegate.ServerResourcesForGroupVersion(v.GroupVersion)
134-
if err != nil || len(r.APIResources) == 0 {
190+
r, err := d.serverResourcesForGroupVersion(v.GroupVersion)
191+
rl[v.GroupVersion] = &cacheEntry{r, err}
192+
if err != nil {
135193
utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", v.GroupVersion, err))
136-
if cur, ok := d.groupToServerResources[v.GroupVersion]; ok {
137-
// retain the existing list, if we had it.
138-
r = cur
139-
} else {
140-
continue
141-
}
142194
}
143-
rl[v.GroupVersion] = r
144195
}
145196
}
146197

147198
d.groupToServerResources, d.groupList = rl, gl
148199
d.cacheValid = true
200+
return nil
201+
}
202+
203+
func (d *memCacheClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
204+
r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
205+
if err != nil {
206+
return r, err
207+
}
208+
if len(r.APIResources) == 0 {
209+
return r, fmt.Errorf("Got empty response for: %v", groupVersion)
210+
}
211+
return r, nil
149212
}
150213

151214
// NewMemCacheClient creates a new CachedDiscoveryInterface which caches
@@ -156,6 +219,6 @@ func (d *memCacheClient) Invalidate() {
156219
func NewMemCacheClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
157220
return &memCacheClient{
158221
delegate: delegate,
159-
groupToServerResources: map[string]*metav1.APIResourceList{},
222+
groupToServerResources: map[string]*cacheEntry{},
160223
}
161224
}

0 commit comments

Comments
 (0)