Skip to content

Commit e2d7153

Browse files
committed
set nil cache entry based on old cache
fix tests update old entry logic
1 parent e92a7cf commit e2d7153

File tree

4 files changed

+107
-30
lines changed

4 files changed

+107
-30
lines changed

staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ const (
3838
// active/expired. If entry doesn't exist in cache, then data is fetched
3939
// using getter, saved in cache and returned
4040
cacheReadTypeUnsafe
41+
// cacheReadTypeForceRefresh force refreshes the cache even if the cache entry
42+
// is not expired
43+
cacheReadTypeForceRefresh
4144
)
4245

4346
// getFunc defines a getter function for timedCache.
@@ -120,20 +123,20 @@ func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) {
120123
entry.lock.Lock()
121124
defer entry.lock.Unlock()
122125

123-
// entry exists
124-
if entry.data != nil {
126+
// entry exists and if cache is not force refreshed
127+
if entry.data != nil && crt != cacheReadTypeForceRefresh {
125128
// allow unsafe read, so return data even if expired
126129
if crt == cacheReadTypeUnsafe {
127130
return entry.data, nil
128131
}
129132
// if cached data is not expired, return cached data
130-
if time.Since(entry.createdOn) < t.ttl {
133+
if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl {
131134
return entry.data, nil
132135
}
133136
}
134-
// Data is not cached yet or cache data is expired, cache it by getter.
135-
// entry is locked before getting to ensure concurrent gets don't result in
136-
// multiple ARM calls.
137+
// Data is not cached yet, cache data is expired or requested force refresh
138+
// cache it by getter. entry is locked before getting to ensure concurrent
139+
// gets don't result in multiple ARM calls.
137140
data, err := t.getter(key)
138141
if err != nil {
139142
return nil, err

staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,23 @@ func TestCacheNoConcurrentGet(t *testing.T) {
204204
assert.Equal(t, 1, dataSource.called)
205205
assert.Equal(t, val, v, "cache should get correct data")
206206
}
207+
208+
func TestCacheForceRefresh(t *testing.T) {
209+
key := "key1"
210+
val := &fakeDataObj{}
211+
data := map[string]*fakeDataObj{
212+
key: val,
213+
}
214+
dataSource, cache := newFakeCache(t)
215+
dataSource.set(data)
216+
217+
v, err := cache.Get(key, cacheReadTypeDefault)
218+
assert.NoError(t, err)
219+
assert.Equal(t, 1, dataSource.called)
220+
assert.Equal(t, val, v, "cache should get correct data")
221+
222+
v, err = cache.Get(key, cacheReadTypeForceRefresh)
223+
assert.NoError(t, err)
224+
assert.Equal(t, 2, dataSource.called)
225+
assert.Equal(t, val, v, "should refetch unexpired data as forced refresh")
226+
}

staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -130,42 +130,46 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua
130130
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
131131
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
132132
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
133-
getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
133+
getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
134+
var found bool
134135
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
135136
if err != nil {
136-
return "", "", nil, err
137+
return "", "", nil, found, err
137138
}
138139

139140
virtualMachines := cached.(*sync.Map)
140141
if vm, ok := virtualMachines.Load(nodeName); ok {
141142
result := vm.(*vmssVirtualMachinesEntry)
142-
return result.vmssName, result.instanceID, result.virtualMachine, nil
143+
found = true
144+
return result.vmssName, result.instanceID, result.virtualMachine, found, nil
143145
}
144146

145-
return "", "", nil, nil
147+
return "", "", nil, found, nil
146148
}
147149

148150
_, err := getScaleSetVMInstanceID(nodeName)
149151
if err != nil {
150152
return "", "", nil, err
151153
}
152154

153-
vmssName, instanceID, vm, err := getter(nodeName)
155+
vmssName, instanceID, vm, found, err := getter(nodeName, crt)
154156
if err != nil {
155157
return "", "", nil, err
156158
}
157-
if vm != nil {
158-
return vmssName, instanceID, vm, nil
159+
160+
if !found {
161+
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
162+
vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh)
163+
if err != nil {
164+
return "", "", nil, err
165+
}
159166
}
160167

161-
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
162-
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
163-
vmssName, instanceID, vm, err = getter(nodeName)
164-
if err != nil {
165-
return "", "", nil, err
168+
if found && vm != nil {
169+
return vmssName, instanceID, vm, nil
166170
}
167171

168-
if vm == nil {
172+
if !found || vm == nil {
169173
return "", "", nil, cloudprovider.InstanceNotFound
170174
}
171175
return vmssName, instanceID, vm, nil
@@ -196,7 +200,7 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
196200
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
197201
// The node must belong to one of scale sets.
198202
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
199-
getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
203+
getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
200204
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
201205
if err != nil {
202206
return nil, false, err
@@ -219,21 +223,21 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
219223
return vm, found, nil
220224
}
221225

222-
vm, found, err := getter()
226+
vm, found, err := getter(crt)
223227
if err != nil {
224228
return nil, err
225229
}
226-
if found {
227-
return vm, nil
230+
if !found {
231+
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
232+
vm, found, err = getter(cacheReadTypeForceRefresh)
233+
if err != nil {
234+
return nil, err
235+
}
228236
}
229-
230-
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
231-
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
232-
vm, found, err = getter()
233-
if err != nil {
234-
return nil, err
237+
if found && vm != nil {
238+
return vm, nil
235239
}
236-
if !found {
240+
if !found || vm == nil {
237241
return nil, cloudprovider.InstanceNotFound
238242
}
239243

staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,26 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
111111
getter := func(key string) (interface{}, error) {
112112
localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
113113

114+
oldCache := make(map[string]vmssVirtualMachinesEntry)
115+
116+
if ss.vmssVMCache != nil {
117+
// get old cache before refreshing the cache
118+
entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey)
119+
if err != nil {
120+
return nil, err
121+
}
122+
if exists {
123+
cached := entry.(*cacheEntry).data
124+
if cached != nil {
125+
virtualMachines := cached.(*sync.Map)
126+
virtualMachines.Range(func(key, value interface{}) bool {
127+
oldCache[key.(string)] = *value.(*vmssVirtualMachinesEntry)
128+
return true
129+
})
130+
}
131+
}
132+
}
133+
114134
allResourceGroups, err := ss.GetResourceGroups()
115135
if err != nil {
116136
return nil, err
@@ -143,8 +163,38 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
143163
virtualMachine: &vm,
144164
lastUpdate: time.Now().UTC(),
145165
})
166+
167+
if _, exists := oldCache[computerName]; exists {
168+
delete(oldCache, computerName)
169+
}
146170
}
147171
}
172+
173+
// add old missing cache data with nil entries to prevent aggressive
174+
// ARM calls during cache invalidation
175+
for name, vmEntry := range oldCache {
176+
// if the nil cache entry has existed for 15 minutes in the cache
177+
// then it should not be added back to the cache
178+
if vmEntry.virtualMachine == nil || time.Since(vmEntry.lastUpdate) > 15*time.Minute {
179+
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
180+
continue
181+
}
182+
lastUpdate := time.Now().UTC()
183+
if vmEntry.virtualMachine == nil {
184+
// if this is already a nil entry then keep the time the nil
185+
// entry was first created, so we can cleanup unwanted entries
186+
lastUpdate = vmEntry.lastUpdate
187+
}
188+
189+
klog.V(5).Infof("adding old entries to new cache for %s", name)
190+
localCache.Store(name, &vmssVirtualMachinesEntry{
191+
resourceGroup: vmEntry.resourceGroup,
192+
vmssName: vmEntry.vmssName,
193+
instanceID: vmEntry.instanceID,
194+
virtualMachine: nil,
195+
lastUpdate: lastUpdate,
196+
})
197+
}
148198
}
149199

150200
return localCache, nil

0 commit comments

Comments
 (0)