Skip to content

Commit 61aae03

Browse files
perf: [WIN-NPM] add all cached NetworkPolicies to a Pod at once (#1893)
* cherry-picking stuff from apply in background POC * add all policies poc * add debug prints * fix deadlock * fix other GetPolicy deadlock * update whitespace in yamls * properly merge * properly merge 2 * add ACLs in batches * cleanup errors * lint and log * persist state as we add * refactor into function so we can do UTs on batching * fix lint * batch struct * successful policies * reduce batch limit to 30
1 parent dcd55b3 commit 61aae03

File tree

7 files changed

+176
-52
lines changed

7 files changed

+176
-52
lines changed

npm/azure-npm.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,19 @@ metadata:
148148
data:
149149
azure-npm.json: |
150150
{
151-
"ResyncPeriodInMinutes": 15,
152-
"ListeningPort": 10091,
153-
"ListeningAddress": "0.0.0.0",
154-
"ApplyMaxBatches": 100,
151+
"ResyncPeriodInMinutes": 15,
152+
"ListeningPort": 10091,
153+
"ListeningAddress": "0.0.0.0",
154+
"ApplyMaxBatches": 100,
155155
"ApplyIntervalInMilliseconds": 500,
156+
"MaxBatchedACLsPerPod": 30,
156157
"Toggles": {
157158
"EnablePrometheusMetrics": true,
158159
"EnablePprof": true,
159160
"EnableHTTPDebugAPI": true,
160161
"EnableV2NPM": true,
161162
"PlaceAzureChainFirst": true,
162163
"ApplyIPSetsOnNeed": false,
163-
"ApplyInBackground": true
164+
"ApplyInBackground": true
164165
}
165166
}

npm/cmd/start.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
123123
stopChannel := wait.NeverStop
124124
if config.Toggles.EnableV2NPM {
125125
// update the dataplane config
126+
npmV2DataplaneCfg.MaxBatchedACLsPerPod = config.MaxBatchedACLsPerPod
127+
126128
npmV2DataplaneCfg.ApplyInBackground = config.Toggles.ApplyInBackground
127129
if config.ApplyMaxBatches > 0 {
128130
npmV2DataplaneCfg.ApplyMaxBatches = config.ApplyMaxBatches

npm/config/config.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package npmconfig
33
import "github.com/Azure/azure-container-networking/npm/util"
44

55
const (
6-
defaultResyncPeriod = 15
7-
defaultApplyMaxBatches = 100
8-
defaultApplyInterval = 500
9-
defaultListeningPort = 10091
10-
defaultGrpcPort = 10092
11-
defaultGrpcServicePort = 9002
6+
defaultResyncPeriod = 15
7+
defaultApplyMaxBatches = 100
8+
defaultApplyInterval = 500
9+
defaultMaxBatchedACLsPerPod = 30
10+
defaultListeningPort = 10091
11+
defaultGrpcPort = 10092
12+
defaultGrpcServicePort = 9002
1213
// ConfigEnvPath is what's used by viper to load config path
1314
ConfigEnvPath = "NPM_CONFIG"
1415

@@ -32,6 +33,7 @@ var DefaultConfig = Config{
3233
WindowsNetworkName: util.AzureNetworkName,
3334
ApplyMaxBatches: defaultApplyMaxBatches,
3435
ApplyIntervalInMilliseconds: defaultApplyInterval,
36+
MaxBatchedACLsPerPod: defaultMaxBatchedACLsPerPod,
3537

3638
Toggles: Toggles{
3739
EnablePrometheusMetrics: true,
@@ -62,9 +64,13 @@ type Config struct {
6264
// It can also be the empty string, which results in the default value of 'azure'.
6365
WindowsNetworkName string `json:"WindowsNetworkName,omitempty"`
6466
// Apply options for Windows only. Relevant when ApplyInBackground is true.
65-
ApplyMaxBatches int `json:"ApplyDataPlaneMaxBatches,omitempty"`
66-
ApplyIntervalInMilliseconds int `json:"ApplyDataPlaneMaxWaitInMilliseconds,omitempty"`
67-
Toggles Toggles `json:"Toggles,omitempty"`
67+
ApplyMaxBatches int `json:"ApplyDataPlaneMaxBatches,omitempty"`
68+
ApplyIntervalInMilliseconds int `json:"ApplyDataPlaneMaxWaitInMilliseconds,omitempty"`
69+
// MaxBatchedACLsPerPod is the maximum number of ACLs that can be added to a Pod at once in Windows.
70+
// The zero value is valid.
71+
// A NetworkPolicy's ACLs are always in the same batch, and there will be at least one NetworkPolicy per batch.
72+
MaxBatchedACLsPerPod int `json:"MaxBatchedACLsPerPod,omitempty"`
73+
Toggles Toggles `json:"Toggles,omitempty"`
6874
}
6975

7076
type Toggles struct {

npm/examples/windows/azure-npm.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,19 @@ metadata:
140140
data:
141141
azure-npm.json: |
142142
{
143-
"ResyncPeriodInMinutes": 15,
144-
"ListeningPort": 10091,
145-
"ListeningAddress": "0.0.0.0",
146-
"ApplyMaxBatches": 100,
143+
"ResyncPeriodInMinutes": 15,
144+
"ListeningPort": 10091,
145+
"ListeningAddress": "0.0.0.0",
146+
"ApplyMaxBatches": 100,
147147
"ApplyIntervalInMilliseconds": 500,
148+
"MaxBatchedACLsPerPod": 30,
148149
"Toggles": {
149150
"EnablePrometheusMetrics": true,
150151
"EnablePprof": true,
151152
"EnableHTTPDebugAPI": true,
152153
"EnableV2NPM": true,
153154
"PlaceAzureChainFirst": true,
154155
"ApplyIPSetsOnNeed": false,
155-
"ApplyInBackground": true
156+
"ApplyInBackground": true
156157
}
157158
}

npm/pkg/dataplane/dataplane_windows.go

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,9 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
217217
}
218218

219219
// for every ipset we're adding to the endpoint, consider adding to the endpoint every policy that the set touches
220+
// add policy if:
221+
// 1. it's not already there
222+
// 2. the pod IP is part of every set that the policy requires (every set in the pod selector)
220223
toAddPolicies := make(map[string]struct{})
221224
for _, setName := range pod.IPSetsToAdd {
222225
/*
@@ -239,48 +242,42 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
239242
}
240243

241244
for policyKey := range selectorReference {
242-
if dp.policyMgr.PolicyExists(policyKey) {
243-
toAddPolicies[policyKey] = struct{}{}
244-
} else {
245+
if _, ok := endpoint.netPolReference[policyKey]; ok {
246+
continue
247+
}
248+
249+
policy, ok := dp.policyMgr.GetPolicy(policyKey)
250+
if !ok {
245251
klog.Infof("[DataPlane] while updating pod, policy is referenced but does not exist. pod: [%s], policy: [%s], set [%s]", pod.PodKey, policyKey, setName)
252+
continue
246253
}
247-
}
248-
}
249254

250-
// for all of these policies, add the policy to the endpoint if:
251-
// 1. it's not already there
252-
// 2. the pod IP is part of every set that the policy requires (every set in the pod selector)
253-
for policyKey := range toAddPolicies {
254-
if _, ok := endpoint.netPolReference[policyKey]; ok {
255-
continue
256-
}
255+
selectorIPSets := dp.getSelectorIPSets(policy)
256+
ok, err := dp.ipsetMgr.DoesIPSatisfySelectorIPSets(pod.PodIP, pod.PodKey, selectorIPSets)
257+
if err != nil {
258+
return fmt.Errorf("[DataPlane] error getting IPs satisfying selector ipsets: %w", err)
259+
}
260+
if !ok {
261+
continue
262+
}
257263

258-
// TODO Also check if the endpoint reference in policy for this Ip is right
259-
policy, ok := dp.policyMgr.GetPolicy(policyKey)
260-
if !ok {
261-
return fmt.Errorf("policy with name %s does not exist", policyKey)
262-
}
263-
264-
selectorIPSets := dp.getSelectorIPSets(policy)
265-
ok, err := dp.ipsetMgr.DoesIPSatisfySelectorIPSets(pod.PodIP, pod.PodKey, selectorIPSets)
266-
if err != nil {
267-
return err
268-
}
269-
if !ok {
270-
continue
264+
toAddPolicies[policyKey] = struct{}{}
271265
}
266+
}
272267

273-
// Apply the network policy
274-
endpointList := map[string]string{
275-
endpoint.ip: endpoint.id,
276-
}
277-
err = dp.policyMgr.AddPolicy(policy, endpointList)
278-
if err != nil {
279-
return err
280-
}
268+
if len(toAddPolicies) == 0 {
269+
return nil
270+
}
281271

272+
successfulPolicies, err := dp.policyMgr.AddAllPolicies(toAddPolicies, endpoint.id, endpoint.ip)
273+
for policyKey := range successfulPolicies {
282274
endpoint.netPolReference[policyKey] = struct{}{}
283275
}
276+
if err != nil {
277+
return fmt.Errorf("failed to add all policies while updating pod. endpoint: %+v. policies: %+v. err: %w", endpoint, toAddPolicies, err)
278+
}
279+
280+
klog.Infof("[DataPlane] updatedPod complete. podKey: %s. endpoint: %+v", pod.PodKey, endpoint)
284281

285282
return nil
286283
}

npm/pkg/dataplane/policies/policymanager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type PolicyManagerCfg struct {
3535
PolicyMode PolicyManagerMode
3636
// PlaceAzureChainFirst only affects Linux
3737
PlaceAzureChainFirst bool
38+
// MaxBatchedACLsPerPod is the maximum number of ACLs that can be added to a Pod at once in Windows.
39+
// The zero value is valid.
40+
// A NetworkPolicy's ACLs are always in the same batch, and there will be at least one NetworkPolicy per batch.
41+
MaxBatchedACLsPerPod int
3842
}
3943

4044
type PolicyMap struct {

npm/pkg/dataplane/policies/policymanager_windows.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ var baseACLsForCalicoCNI = []*NPMACLPolSettings{
8787
},
8888
}
8989

90+
type aclBatch struct {
91+
rules []*NPMACLPolSettings
92+
policies []string
93+
}
94+
9095
type staleChains struct{} // unused in Windows
9196

9297
type shouldResetAllACLs bool
@@ -125,6 +130,114 @@ func (pMgr *PolicyManager) reconcile() {
125130
// not implemented
126131
}
127132

133+
// AddAllPolicies is used in Windows to add all NetworkPolicies to an endpoint.
134+
// Will make a series of sequential HNS ADD calls based on MaxBatchedACLsPerPod.
135+
// A NetworkPolicy's ACLs are always in the same batch, and there will be at least one NetworkPolicy per batch.
136+
func (pMgr *PolicyManager) AddAllPolicies(policyKeys map[string]struct{}, epToModifyID, epToModifyIP string) (map[string]struct{}, error) {
137+
pMgr.policyMap.Lock()
138+
defer pMgr.policyMap.Unlock()
139+
140+
klog.Infof("[PolicyManagerWindows] adding all policies. epID: %s. epIP: %s. policyKeys: %+v", epToModifyID, epToModifyIP, policyKeys)
141+
142+
batches, err := pMgr.batchPolicies(policyKeys, epToModifyID, epToModifyIP)
143+
if err != nil {
144+
return nil, fmt.Errorf("error while batching policies for endpoint. err: %w", err)
145+
}
146+
147+
successfulPolicies := make(map[string]struct{})
148+
149+
for i, batch := range batches {
150+
klog.Infof("[PolicyManagerWindows] processing batch %d out of %d for adding all policies to endpoint. endpoint ID: %s. policyBatch: %+v", i+1, len(batches), epToModifyID, batch.policies)
151+
152+
epPolicyRequest, err := getEPPolicyReqFromACLSettings(batch.rules)
153+
if err != nil {
154+
return successfulPolicies, fmt.Errorf("error while applying all policies for batch %d out of %d. ruleBatch: %+v. err: %w", i+1, len(batches), batch, err)
155+
}
156+
157+
klog.Infof("[PolicyManager] applying all rules to endpoint for batch %d out of %d. endpoint ID: %s", i+1, len(batches), epToModifyID)
158+
err = pMgr.applyPoliciesToEndpointID(epToModifyID, epPolicyRequest)
159+
if err != nil {
160+
return successfulPolicies, fmt.Errorf("failed to add all policies on endpoint for batch %d out of %d. ruleBatch: %+v. err: %w", i+1, len(batches), batch, err)
161+
}
162+
163+
klog.Infof("[PolicyManager] finished applying all rules to endpoint for batch %d out of %d. endpoint ID: %s, policyBatch: %+v", i+1, len(batches), epToModifyID, batch.policies)
164+
for _, policyKey := range batch.policies {
165+
policy, ok := pMgr.policyMap.cache[policyKey]
166+
if ok {
167+
policy.PodEndpoints[epToModifyIP] = epToModifyID
168+
successfulPolicies[policyKey] = struct{}{}
169+
} else {
170+
klog.Errorf("[PolicyManagerWindows] unexpected error: policy not found after adding all policies for batch %d out of %d. policyKey: %s. epID: %s",
171+
i+1, len(batches), policyKey, epToModifyID)
172+
metrics.SendErrorLogAndMetric(util.IptmID, "[PolicyManagerWindows] unexpected error: policy not found after adding all policies for batch %d out of %d. policyKey: %s. epID: %s",
173+
i+1, len(batches), policyKey, epToModifyID)
174+
}
175+
}
176+
}
177+
178+
return successfulPolicies, nil
179+
}
180+
181+
// batchPolicies returns a list of batches
182+
func (pMgr *PolicyManager) batchPolicies(policyKeys map[string]struct{}, epToModifyID, epToModifyIP string) ([]*aclBatch, error) {
183+
batches := make([]*aclBatch, 0)
184+
for policyKey := range policyKeys {
185+
policy, ok := pMgr.policyMap.cache[policyKey]
186+
if !ok {
187+
klog.Infof("[PolicyManagerWindows] policy not found while adding all policies. policyKey: %s. epID: %s", policyKey, epToModifyID)
188+
delete(policyKeys, policyKey)
189+
continue
190+
}
191+
192+
// 1. remove stale endpoints from policy.PodEndpoints and skip adding to endpoints that already have the policy
193+
if policy.PodEndpoints == nil {
194+
policy.PodEndpoints = make(map[string]string)
195+
}
196+
197+
epID, ok := policy.PodEndpoints[epToModifyIP]
198+
if ok {
199+
if epID == epToModifyID {
200+
klog.Infof("[PolicyManagerWindows] while adding all policies, will not add policy %s to endpoint since it already exists there. endpoint IP: %s, endpoint ID: %s",
201+
policy.PolicyKey, epToModifyIP, epToModifyID)
202+
delete(policyKeys, policyKey)
203+
continue
204+
}
205+
206+
// If the expected ID is not same as epID, there is a chance that old pod got deleted
207+
// and same IP is used by new pod with new endpoint.
208+
// so we should delete the non-existent endpoint from policy reference
209+
klog.Infof("[PolicyManagerWindows] while adding all policies, removing deleted endpoint from policy's current endpoints. policy: %s, endpoint IP: %s, new ID: %s, previous ID: %s",
210+
policy.PolicyKey, epToModifyIP, epToModifyID, epID)
211+
delete(policy.PodEndpoints, epToModifyIP)
212+
}
213+
214+
// 2. add this policy's rules to a batch
215+
policyRules, err := pMgr.getSettingsFromACL(policy)
216+
if err != nil {
217+
return batches, fmt.Errorf("error while getting settings while applying all policies. err: %w", err)
218+
}
219+
220+
if len(batches) > 0 {
221+
batch := batches[len(batches)-1]
222+
if len(batch.rules)+len(policyRules) <= pMgr.MaxBatchedACLsPerPod {
223+
batch.rules = append(batch.rules, policyRules...)
224+
batch.policies = append(batch.policies, policy.PolicyKey)
225+
continue
226+
}
227+
}
228+
229+
// create a new batch
230+
// either this is the first NetPol we've seen, or adding this NetPol's rules to the previous batch would exceed the max rules per batch
231+
batch := &aclBatch{
232+
rules: policyRules,
233+
policies: []string{policy.PolicyKey},
234+
}
235+
batches = append(batches, batch)
236+
}
237+
238+
return batches, nil
239+
}
240+
128241
// AddBaseACLsForCalicoCNI attempts to add base ACLs for Calico CNI.
129242
func (pMgr *PolicyManager) AddBaseACLsForCalicoCNI(epID string) {
130243
epPolicyRequest, err := getEPPolicyReqFromACLSettings(baseACLsForCalicoCNI)

0 commit comments

Comments
 (0)