Skip to content

Commit e2f529a

Browse files
authored
Merge pull request kubernetes#87531 from aramase/set-nil-cache
azure: set nil cache entry based on old cache
2 parents 6ecdd22 + e2d7153 commit e2f529a

File tree

4 files changed

+107
-30
lines changed

4 files changed

+107
-30
lines changed

staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ const (
3838
// active/expired. If entry doesn't exist in cache, then data is fetched
3939
// using getter, saved in cache and returned
4040
cacheReadTypeUnsafe
41+
// cacheReadTypeForceRefresh force refreshes the cache even if the cache entry
42+
// is not expired
43+
cacheReadTypeForceRefresh
4144
)
4245

4346
// getFunc defines a getter function for timedCache.
@@ -120,20 +123,20 @@ func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) {
120123
entry.lock.Lock()
121124
defer entry.lock.Unlock()
122125

123-
// entry exists
124-
if entry.data != nil {
126+
// entry exists and if cache is not force refreshed
127+
if entry.data != nil && crt != cacheReadTypeForceRefresh {
125128
// allow unsafe read, so return data even if expired
126129
if crt == cacheReadTypeUnsafe {
127130
return entry.data, nil
128131
}
129132
// if cached data is not expired, return cached data
130-
if time.Since(entry.createdOn) < t.ttl {
133+
if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl {
131134
return entry.data, nil
132135
}
133136
}
134-
// Data is not cached yet or cache data is expired, cache it by getter.
135-
// entry is locked before getting to ensure concurrent gets don't result in
136-
// multiple ARM calls.
137+
// Data is not cached yet, cache data is expired or requested force refresh
138+
// cache it by getter. entry is locked before getting to ensure concurrent
139+
// gets don't result in multiple ARM calls.
137140
data, err := t.getter(key)
138141
if err != nil {
139142
return nil, err

staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,23 @@ func TestCacheNoConcurrentGet(t *testing.T) {
204204
assert.Equal(t, 1, dataSource.called)
205205
assert.Equal(t, val, v, "cache should get correct data")
206206
}
207+
208+
func TestCacheForceRefresh(t *testing.T) {
209+
key := "key1"
210+
val := &fakeDataObj{}
211+
data := map[string]*fakeDataObj{
212+
key: val,
213+
}
214+
dataSource, cache := newFakeCache(t)
215+
dataSource.set(data)
216+
217+
v, err := cache.Get(key, cacheReadTypeDefault)
218+
assert.NoError(t, err)
219+
assert.Equal(t, 1, dataSource.called)
220+
assert.Equal(t, val, v, "cache should get correct data")
221+
222+
v, err = cache.Get(key, cacheReadTypeForceRefresh)
223+
assert.NoError(t, err)
224+
assert.Equal(t, 2, dataSource.called)
225+
assert.Equal(t, val, v, "should refetch unexpired data as forced refresh")
226+
}

staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,42 +133,46 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua
133133
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
134134
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
135135
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
136-
getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
136+
getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
137+
var found bool
137138
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
138139
if err != nil {
139-
return "", "", nil, err
140+
return "", "", nil, found, err
140141
}
141142

142143
virtualMachines := cached.(*sync.Map)
143144
if vm, ok := virtualMachines.Load(nodeName); ok {
144145
result := vm.(*vmssVirtualMachinesEntry)
145-
return result.vmssName, result.instanceID, result.virtualMachine, nil
146+
found = true
147+
return result.vmssName, result.instanceID, result.virtualMachine, found, nil
146148
}
147149

148-
return "", "", nil, nil
150+
return "", "", nil, found, nil
149151
}
150152

151153
_, err := getScaleSetVMInstanceID(nodeName)
152154
if err != nil {
153155
return "", "", nil, err
154156
}
155157

156-
vmssName, instanceID, vm, err := getter(nodeName)
158+
vmssName, instanceID, vm, found, err := getter(nodeName, crt)
157159
if err != nil {
158160
return "", "", nil, err
159161
}
160-
if vm != nil {
161-
return vmssName, instanceID, vm, nil
162+
163+
if !found {
164+
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
165+
vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh)
166+
if err != nil {
167+
return "", "", nil, err
168+
}
162169
}
163170

164-
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
165-
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
166-
vmssName, instanceID, vm, err = getter(nodeName)
167-
if err != nil {
168-
return "", "", nil, err
171+
if found && vm != nil {
172+
return vmssName, instanceID, vm, nil
169173
}
170174

171-
if vm == nil {
175+
if !found || vm == nil {
172176
return "", "", nil, cloudprovider.InstanceNotFound
173177
}
174178
return vmssName, instanceID, vm, nil
@@ -199,7 +203,7 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
199203
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
200204
// The node must belong to one of scale sets.
201205
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
202-
getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
206+
getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
203207
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
204208
if err != nil {
205209
return nil, false, err
@@ -222,21 +226,21 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
222226
return vm, found, nil
223227
}
224228

225-
vm, found, err := getter()
229+
vm, found, err := getter(crt)
226230
if err != nil {
227231
return nil, err
228232
}
229-
if found {
230-
return vm, nil
233+
if !found {
234+
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
235+
vm, found, err = getter(cacheReadTypeForceRefresh)
236+
if err != nil {
237+
return nil, err
238+
}
231239
}
232-
233-
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
234-
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
235-
vm, found, err = getter()
236-
if err != nil {
237-
return nil, err
240+
if found && vm != nil {
241+
return vm, nil
238242
}
239-
if !found {
243+
if !found || vm == nil {
240244
return nil, cloudprovider.InstanceNotFound
241245
}
242246

staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,26 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
111111
getter := func(key string) (interface{}, error) {
112112
localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
113113

114+
oldCache := make(map[string]vmssVirtualMachinesEntry)
115+
116+
if ss.vmssVMCache != nil {
117+
// get old cache before refreshing the cache
118+
entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey)
119+
if err != nil {
120+
return nil, err
121+
}
122+
if exists {
123+
cached := entry.(*cacheEntry).data
124+
if cached != nil {
125+
virtualMachines := cached.(*sync.Map)
126+
virtualMachines.Range(func(key, value interface{}) bool {
127+
oldCache[key.(string)] = *value.(*vmssVirtualMachinesEntry)
128+
return true
129+
})
130+
}
131+
}
132+
}
133+
114134
allResourceGroups, err := ss.GetResourceGroups()
115135
if err != nil {
116136
return nil, err
@@ -143,8 +163,38 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
143163
virtualMachine: &vm,
144164
lastUpdate: time.Now().UTC(),
145165
})
166+
167+
if _, exists := oldCache[computerName]; exists {
168+
delete(oldCache, computerName)
169+
}
146170
}
147171
}
172+
173+
// add old missing cache data with nil entries to prevent aggressive
174+
// ARM calls during cache invalidation
175+
for name, vmEntry := range oldCache {
176+
// if the nil cache entry has existed for 15 minutes in the cache
177+
// then it should not be added back to the cache
178+
if vmEntry.virtualMachine == nil || time.Since(vmEntry.lastUpdate) > 15*time.Minute {
179+
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
180+
continue
181+
}
182+
lastUpdate := time.Now().UTC()
183+
if vmEntry.virtualMachine == nil {
184+
// if this is already a nil entry then keep the time the nil
185+
// entry was first created, so we can cleanup unwanted entries
186+
lastUpdate = vmEntry.lastUpdate
187+
}
188+
189+
klog.V(5).Infof("adding old entries to new cache for %s", name)
190+
localCache.Store(name, &vmssVirtualMachinesEntry{
191+
resourceGroup: vmEntry.resourceGroup,
192+
vmssName: vmEntry.vmssName,
193+
instanceID: vmEntry.instanceID,
194+
virtualMachine: nil,
195+
lastUpdate: lastUpdate,
196+
})
197+
}
148198
}
149199

150200
return localCache, nil

0 commit comments

Comments
 (0)