Skip to content

Commit cfe483b

Browse files
fix: [NPM-WIN] lock the policyMap & updatePodCache (#1543)
* add policyMap lock * add updatePodCache lock * only lock policyMap in Windows and remove dead code * defer unlocking and update comments * debugging logs (remove later) * more logs to remove * revert test log commits * aggregate errors while updating pods * update log * lock endpoint cache and remove pendingPolicies map * refresh endpoints only before beginning all updatePod calls * make policyMap lock for Linux too Co-authored-by: Vamsi Kalapala <[email protected]>
1 parent 312bf12 commit cfe483b

File tree

6 files changed

+150
-67
lines changed

6 files changed

+150
-67
lines changed

npm/pkg/dataplane/dataplane.go

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dataplane
22

33
import (
44
"fmt"
5+
"sync"
56
"time"
67

78
"github.com/Azure/azure-container-networking/common"
@@ -23,6 +24,24 @@ type Config struct {
2324
*policies.PolicyManagerCfg
2425
}
2526

27+
type updatePodCache struct {
28+
sync.Mutex
29+
cache map[string]*updateNPMPod
30+
}
31+
32+
func newUpdatePodCache() *updatePodCache {
33+
return &updatePodCache{cache: make(map[string]*updateNPMPod)}
34+
}
35+
36+
type endpointCache struct {
37+
sync.Mutex
38+
cache map[string]*npmEndpoint
39+
}
40+
41+
func newEndpointCache() *endpointCache {
42+
return &endpointCache{cache: make(map[string]*npmEndpoint)}
43+
}
44+
2645
type DataPlane struct {
2746
*Config
2847
policyMgr *policies.PolicyManager
@@ -31,13 +50,10 @@ type DataPlane struct {
3150
nodeName string
3251
// endpointCache stores all endpoints of the network (including off-node)
3352
// Key is PodIP
34-
endpointCache map[string]*npmEndpoint
53+
endpointCache *endpointCache
3554
ioShim *common.IOShim
36-
updatePodCache map[string]*updateNPMPod
37-
// pendingPolicies includes the policy keys of policies which may
38-
// be referenced by ipsets but have not been applied to the kernel yet
39-
pendingPolicies map[string]struct{}
40-
stopChannel <-chan struct{}
55+
updatePodCache *updatePodCache
56+
stopChannel <-chan struct{}
4157
}
4258

4359
func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChannel <-chan struct{}) (*DataPlane, error) {
@@ -47,15 +63,14 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann
4763
cfg.IPSetManagerCfg.AddEmptySetToLists = true
4864
}
4965
dp := &DataPlane{
50-
Config: cfg,
51-
policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg),
52-
ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim),
53-
endpointCache: make(map[string]*npmEndpoint),
54-
nodeName: nodeName,
55-
ioShim: ioShim,
56-
updatePodCache: make(map[string]*updateNPMPod),
57-
pendingPolicies: make(map[string]struct{}),
58-
stopChannel: stopChannel,
66+
Config: cfg,
67+
policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg),
68+
ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim),
69+
endpointCache: newEndpointCache(),
70+
nodeName: nodeName,
71+
ioShim: ioShim,
72+
updatePodCache: newUpdatePodCache(),
73+
stopChannel: stopChannel,
5974
}
6075

6176
err := dp.BootupDataplane()
@@ -121,12 +136,19 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po
121136
}
122137
if dp.shouldUpdatePod() {
123138
klog.Infof("[DataPlane] Updating Sets to Add for pod key %s", podMetadata.PodKey)
124-
if _, ok := dp.updatePodCache[podMetadata.PodKey]; !ok {
139+
140+
// lock updatePodCache while reading/modifying or setting the updatePod in the cache
141+
dp.updatePodCache.Lock()
142+
defer dp.updatePodCache.Unlock()
143+
144+
updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey]
145+
if !ok {
125146
klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey)
126-
dp.updatePodCache[podMetadata.PodKey] = newUpdateNPMPod(podMetadata)
147+
updatePod = newUpdateNPMPod(podMetadata)
148+
dp.updatePodCache.cache[podMetadata.PodKey] = updatePod
127149
}
128150

129-
dp.updatePodCache[podMetadata.PodKey].updateIPSetsToAdd(setNames)
151+
updatePod.updateIPSetsToAdd(setNames)
130152
}
131153

132154
return nil
@@ -142,12 +164,19 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat
142164

143165
if dp.shouldUpdatePod() {
144166
klog.Infof("[DataPlane] Updating Sets to Remove for pod key %s", podMetadata.PodKey)
145-
if _, ok := dp.updatePodCache[podMetadata.PodKey]; !ok {
167+
168+
// lock updatePodCache while reading/modifying or setting the updatePod in the cache
169+
dp.updatePodCache.Lock()
170+
defer dp.updatePodCache.Unlock()
171+
172+
updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey]
173+
if !ok {
146174
klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey)
147-
dp.updatePodCache[podMetadata.PodKey] = newUpdateNPMPod(podMetadata)
175+
updatePod = newUpdateNPMPod(podMetadata)
176+
dp.updatePodCache.cache[podMetadata.PodKey] = updatePod
148177
}
149178

150-
dp.updatePodCache[podMetadata.PodKey].updateIPSetsToRemove(setNames)
179+
updatePod.updateIPSetsToRemove(setNames)
151180
}
152181

153182
return nil
@@ -185,13 +214,33 @@ func (dp *DataPlane) ApplyDataPlane() error {
185214
}
186215

187216
if dp.shouldUpdatePod() {
188-
for podKey, pod := range dp.updatePodCache {
217+
err := dp.refreshAllPodEndpoints()
218+
if err != nil {
219+
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to refresh endpoints while updating pods. err: [%s]", err.Error())
220+
return fmt.Errorf("[DataPlane] failed to refresh endpoints while updating pods. err: [%w]", err)
221+
}
222+
223+
// lock updatePodCache while driving goal state to kernel
224+
// prevents another ApplyDataplane call from updating the same pods
225+
dp.updatePodCache.Lock()
226+
defer dp.updatePodCache.Unlock()
227+
228+
var aggregateErr error
229+
for podKey, pod := range dp.updatePodCache.cache {
189230
err := dp.updatePod(pod)
190231
if err != nil {
191-
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "error: failed to update pods: %s", err.Error())
192-
return fmt.Errorf("[DataPlane] error while updating pod: %w", err)
232+
if aggregateErr == nil {
233+
aggregateErr = fmt.Errorf("failed to update pod while applying the dataplane. key: [%s], err: [%w]", podKey, err)
234+
} else {
235+
aggregateErr = fmt.Errorf("failed to update pod while applying the dataplane. key: [%s], err: [%s]. previous err: [%w]", podKey, err.Error(), aggregateErr)
236+
}
237+
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", podKey, err.Error())
238+
continue
193239
}
194-
delete(dp.updatePodCache, podKey)
240+
delete(dp.updatePodCache.cache, podKey)
241+
}
242+
if aggregateErr != nil {
243+
return fmt.Errorf("[DataPlane] error while updating pods: %w", err)
195244
}
196245
}
197246
return nil
@@ -201,8 +250,6 @@ func (dp *DataPlane) ApplyDataPlane() error {
201250
func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error {
202251
klog.Infof("[DataPlane] Add Policy called for %s", policy.PolicyKey)
203252

204-
dp.pendingPolicies[policy.PolicyKey] = struct{}{}
205-
206253
// Create and add references for Selector IPSets first
207254
err := dp.createIPSetsAndReferences(policy.AllPodSelectorIPSets(), policy.PolicyKey, ipsets.SelectorType)
208255
if err != nil {
@@ -232,7 +279,6 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error {
232279
if err != nil {
233280
return fmt.Errorf("[DataPlane] error while adding policy: %w", err)
234281
}
235-
delete(dp.pendingPolicies, policy.PolicyKey)
236282
return nil
237283
}
238284

@@ -301,8 +347,9 @@ func (dp *DataPlane) GetAllIPSets() map[string]string {
301347
return dp.ipsetMgr.GetAllIPSets()
302348
}
303349

350+
// GetAllPolicies is deprecated and only used in the goalstateprocessor, which is deprecated
304351
func (dp *DataPlane) GetAllPolicies() []string {
305-
return dp.policyMgr.GetAllPolicies()
352+
return nil
306353
}
307354

308355
func (dp *DataPlane) createIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, netpolName string, referenceType ipsets.ReferenceType) error {

npm/pkg/dataplane/dataplane_linux.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import (
66
)
77

88
func (dp *DataPlane) getEndpointsToApplyPolicy(policy *policies.NPMNetworkPolicy) (map[string]string, error) {
9-
// NOOP in Linux at the moment
9+
// NOOP in Linux
1010
return nil, nil
1111
}
1212

1313
func (dp *DataPlane) shouldUpdatePod() bool {
1414
return false
1515
}
1616

17-
// updatePod is no-op in Linux
1817
func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
18+
// NOOP in Linux
1919
return nil
2020
}
2121

@@ -29,3 +29,8 @@ func (dp *DataPlane) bootupDataPlane() error {
2929
}
3030
return nil
3131
}
32+
33+
func (dp *DataPlane) refreshAllPodEndpoints() error {
34+
// NOOP in Linux
35+
return nil
36+
}

npm/pkg/dataplane/dataplane_windows.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ func (dp *DataPlane) initializeDataPlane() error {
4343
}
4444

4545
// reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints
46-
dp.endpointCache = make(map[string]*npmEndpoint)
46+
// no need to lock endpointCache at boot up
47+
dp.endpointCache.cache = make(map[string]*npmEndpoint)
4748
err = dp.refreshAllPodEndpoints()
4849
if err != nil {
4950
return err
@@ -111,14 +112,12 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
111112
return nil
112113
}
113114

114-
err := dp.refreshAllPodEndpoints()
115-
if err != nil {
116-
klog.Infof("[DataPlane] failed to refresh endpoints in updatePod with %s", err.Error())
117-
return err
118-
}
115+
// lock the endpoint cache while we read/modify the endpoint with the pod's IP
116+
dp.endpointCache.Lock()
117+
defer dp.endpointCache.Unlock()
119118

120119
// Check if pod is already present in cache
121-
endpoint, ok := dp.endpointCache[pod.PodIP]
120+
endpoint, ok := dp.endpointCache.cache[pod.PodIP]
122121
if !ok {
123122
// ignore this err and pod endpoint will be deleted in ApplyDP
124123
// if the endpoint is not found, it means the pod is not part of this node or pod got deleted.
@@ -133,7 +132,7 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
133132
// Updates to this pod would not occur. Pod IPs are expected to change on restart though.
134133
// See: https://stackoverflow.com/questions/52362514/when-will-the-kubernetes-pod-ip-change
135134
// If a pod does restart and take up its previous IP, then the pod can be deleted/restarted to mitigate this problem.
136-
klog.Infof("ignoring pod update since pod with key %s is stale and likely was deleted", pod.PodKey)
135+
klog.Infof("[DataPlane] ignoring pod update since pod with key %s is stale and likely was deleted", pod.PodKey)
137136
return nil
138137
}
139138
endpoint.podKey = pod.PodKey
@@ -175,8 +174,10 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
175174
}
176175

177176
for policyKey := range selectorReference {
178-
if _, ok := dp.pendingPolicies[policyKey]; !ok {
177+
if dp.policyMgr.PolicyExists(policyKey) {
179178
toAddPolicies[policyKey] = struct{}{}
179+
} else {
180+
klog.Infof("[DataPlane] while updating pod, policy is referenced but does not exist. pod: [%s], policy: [%s], set [%s]", pod.PodKey, policyKey, setName)
180181
}
181182
}
182183
}
@@ -229,21 +230,19 @@ func (dp *DataPlane) getSelectorIPSets(policy *policies.NPMNetworkPolicy) map[st
229230
}
230231

231232
func (dp *DataPlane) getEndpointsToApplyPolicy(policy *policies.NPMNetworkPolicy) (map[string]string, error) {
232-
err := dp.refreshAllPodEndpoints()
233-
if err != nil {
234-
klog.Infof("[DataPlane] failed to refresh endpoints in getEndpointsToApplyPolicy with %s", err.Error())
235-
return nil, err
236-
}
237-
238233
selectorIPSets := dp.getSelectorIPSets(policy)
239234
netpolSelectorIPs, err := dp.ipsetMgr.GetIPsFromSelectorIPSets(selectorIPSets)
240235
if err != nil {
241236
return nil, err
242237
}
243238

239+
// lock the endpoint cache while we read/modify the endpoints with IPs in the policy's pod selector
240+
dp.endpointCache.Lock()
241+
defer dp.endpointCache.Unlock()
242+
244243
endpointList := make(map[string]string)
245244
for ip := range netpolSelectorIPs {
246-
endpoint, ok := dp.endpointCache[ip]
245+
endpoint, ok := dp.endpointCache.cache[ip]
247246
if !ok {
248247
klog.Infof("[DataPlane] Ignoring endpoint with IP %s since it was not found in the endpoint cache. This IP might not be in the HNS network", ip)
249248
continue
@@ -264,12 +263,32 @@ func (dp *DataPlane) getAllPodEndpoints() ([]hcn.HostComputeEndpoint, error) {
264263
}
265264

266265
// refreshAllPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints
266+
/*
267+
Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint.
268+
From NPM logs, it seems that endpoints are updated far earlier (several seconds) before the pod event comes in.
269+
270+
What we learn from refreshing endpoints:
271+
- an old endpoint doesn't exist anymore
272+
- a new endpoint has come up
273+
274+
Why not refresh when adding a netpol to all required pods?
275+
- It's ok if we try to apply on an endpoint that doesn't exist anymore.
276+
- We won't know the pod associated with a new endpoint even if we refresh.
277+
278+
Why can we refresh only once before updating all pods in the updatePodCache (see ApplyDataplane)?
279+
- Again, it's ok if we try to apply on a non-existent endpoint.
280+
- We won't miss the endpoint (see the assumption). At the time the pod event came in (when AddToSets/RemoveFromSets were called), HNS already knew about the endpoint.
281+
*/
267282
func (dp *DataPlane) refreshAllPodEndpoints() error {
268283
endpoints, err := dp.getAllPodEndpoints()
269284
if err != nil {
270285
return err
271286
}
272287

288+
// lock the endpoint cache while we reconcile with HNS goal state
289+
dp.endpointCache.Lock()
290+
defer dp.endpointCache.Unlock()
291+
273292
currentTime := time.Now().Unix()
274293
existingIPs := make(map[string]struct{})
275294
for _, endpoint := range endpoints {
@@ -285,11 +304,11 @@ func (dp *DataPlane) refreshAllPodEndpoints() error {
285304

286305
existingIPs[ip] = struct{}{}
287306

288-
oldNPMEP, ok := dp.endpointCache[ip]
307+
oldNPMEP, ok := dp.endpointCache.cache[ip]
289308
if !ok {
290309
// add the endpoint to the cache if it's not already there
291310
npmEP := newNPMEndpoint(&endpoint)
292-
dp.endpointCache[ip] = npmEP
311+
dp.endpointCache.cache[ip] = npmEP
293312
// NOTE: TSGs rely on this log line
294313
klog.Infof("updating endpoint cache to include %s: %+v", npmEP.ip, npmEP)
295314
} else if oldNPMEP.id != endpoint.Id {
@@ -299,29 +318,29 @@ func (dp *DataPlane) refreshAllPodEndpoints() error {
299318
npmEP := newNPMEndpoint(&endpoint)
300319
if oldNPMEP.podKey == unspecifiedPodKey {
301320
klog.Infof("updating endpoint cache since endpoint changed for IP which never had a pod key. new endpoint: %s, old endpoint: %s, ip: %s", npmEP.id, oldNPMEP.id, npmEP.ip)
302-
dp.endpointCache[ip] = npmEP
321+
dp.endpointCache.cache[ip] = npmEP
303322
} else {
304323
npmEP.stalePodKey = &staleKey{
305324
key: oldNPMEP.podKey,
306325
timestamp: currentTime,
307326
}
308-
dp.endpointCache[ip] = npmEP
327+
dp.endpointCache.cache[ip] = npmEP
309328
// NOTE: TSGs rely on this log line
310329
klog.Infof("updating endpoint cache for previously cached IP %s: %+v with stalePodKey %+v", npmEP.ip, npmEP, npmEP.stalePodKey)
311330
}
312331
}
313332
}
314333

315334
// garbage collection for the endpoint cache
316-
for ip, ep := range dp.endpointCache {
335+
for ip, ep := range dp.endpointCache.cache {
317336
if _, ok := existingIPs[ip]; !ok {
318337
if ep.podKey == unspecifiedPodKey {
319338
if ep.stalePodKey == nil {
320339
klog.Infof("deleting old endpoint which never had a pod key. ID: %s, IP: %s", ep.id, ip)
321-
delete(dp.endpointCache, ip)
340+
delete(dp.endpointCache.cache, ip)
322341
} else if int(currentTime-ep.stalePodKey.timestamp)/60 > minutesToKeepStalePodKey {
323342
klog.Infof("deleting old endpoint which had a stale pod key. ID: %s, IP: %s, stalePodKey: %+v", ep.id, ip, ep.stalePodKey)
324-
delete(dp.endpointCache, ip)
343+
delete(dp.endpointCache.cache, ip)
325344
}
326345
} else {
327346
ep.stalePodKey = &staleKey{
@@ -349,8 +368,9 @@ func (dp *DataPlane) setNetworkIDByName(networkName string) error {
349368
}
350369

351370
func (dp *DataPlane) getAllEndpointIDs() []string {
352-
endpointIDs := make([]string, 0, len(dp.endpointCache))
353-
for _, endpoint := range dp.endpointCache {
371+
// no need to lock endpointCache at boot up
372+
endpointIDs := make([]string, 0, len(dp.endpointCache.cache))
373+
for _, endpoint := range dp.endpointCache.cache {
354374
endpointIDs = append(endpointIDs, endpoint.id)
355375
}
356376
return endpointIDs

npm/pkg/dataplane/policies/chain-management_linux.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func isBaseChain(chain string) bool {
168168
func (pMgr *PolicyManager) bootup(_ []string) error {
169169
klog.Infof("booting up iptables Azure chains")
170170

171-
// Stop reconciling so we don't centend for iptables, and so we don't update the staleChains at the same time as reconcile()
171+
// Stop reconciling so we don't contend for iptables, and so we don't update the staleChains at the same time as reconcile()
172172
// Reconciling would only be happening if this function were called to reset iptables well into the azure-npm pod lifecycle.
173173
pMgr.reconcileManager.forceLock()
174174
defer pMgr.reconcileManager.forceUnlock()

0 commit comments

Comments
 (0)