Skip to content

Commit 55f53d5

Browse files
authored
[NPM] Periodic Reconciliation of NPM Chain ordering in FORWARD table (#787)
* merge conflict * Adding in wait to get mitigate defunct process * Adding error handling and addressing comments * Addressing some comments * Adding a testcase for GetlineNumber * Adding error message in failures * Adding error message in failures * correcting the name for Kubeservices chain
1 parent f63d8ca commit 55f53d5

File tree

4 files changed

+251
-37
lines changed

4 files changed

+251
-37
lines changed

npm/iptm/iptm.go

Lines changed: 129 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,44 +54,14 @@ func NewIptablesManager() *IptablesManager {
5454
func (iptMgr *IptablesManager) InitNpmChains() error {
5555
log.Logf("Initializing AZURE-NPM chains.")
5656

57-
if err := iptMgr.AddChain(util.IptablesAzureChain); err != nil {
58-
return err
59-
}
60-
61-
// Insert AZURE-NPM chain to FORWARD chain.
62-
entry := &IptEntry{
63-
Chain: util.IptablesForwardChain,
64-
Specs: []string{
65-
util.IptablesJumpFlag,
66-
util.IptablesAzureChain,
67-
},
68-
}
69-
exists, err := iptMgr.Exists(entry)
57+
err := iptMgr.CheckAndAddForwardChain()
7058
if err != nil {
71-
return err
59+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain. %s", err.Error())
7260
}
7361

74-
if !exists {
75-
// retrieve KUBE-SERVICES index
76-
index := "1"
77-
iptFilterEntries := exec.Command(util.Iptables, "-t", "filter", "-n", "--list", "FORWARD", "--line-numbers")
78-
grep := exec.Command("grep", "KUBE-SERVICES")
79-
pipe, _ := iptFilterEntries.StdoutPipe()
80-
grep.Stdin = pipe
81-
iptFilterEntries.Start()
82-
output, err := grep.CombinedOutput()
83-
if err == nil && len(output) > 2 {
84-
tmpIndex, _ := strconv.Atoi(string(output[0]))
85-
index = strconv.Itoa(tmpIndex + 1)
86-
}
87-
pipe.Close()
88-
// position Azure-NPM chain after Kube-Forward and Kube-Service chains if it exists
89-
iptMgr.OperationFlag = util.IptablesInsertionFlag
90-
entry.Specs = append([]string{index}, entry.Specs...)
91-
if _, err = iptMgr.Run(entry); err != nil {
92-
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain.")
93-
return err
94-
}
62+
entry := &IptEntry{
63+
Chain: util.IptablesAzureChain,
64+
Specs: []string{},
9565
}
9666

9767
// Create AZURE-NPM-INGRESS-PORT chain.
@@ -102,7 +72,7 @@ func (iptMgr *IptablesManager) InitNpmChains() error {
10272
// Append AZURE-NPM-INGRESS-PORT chain to AZURE-NPM chain.
10373
entry.Chain = util.IptablesAzureChain
10474
entry.Specs = []string{util.IptablesJumpFlag, util.IptablesAzureIngressPortChain}
105-
exists, err = iptMgr.Exists(entry)
75+
exists, err := iptMgr.Exists(entry)
10676
if err != nil {
10777
return err
10878
}
@@ -437,6 +407,86 @@ func (iptMgr *IptablesManager) InitNpmChains() error {
437407
return nil
438408
}
439409

410+
// CheckAndAddForwardChain initializes and reconciles Azure-NPM chain in right order
411+
func (iptMgr *IptablesManager) CheckAndAddForwardChain() error {
412+
413+
// TODO Adding this chain is printing error messages try to clean it up
414+
if err := iptMgr.AddChain(util.IptablesAzureChain); err != nil {
415+
return err
416+
}
417+
418+
// Insert AZURE-NPM chain to FORWARD chain.
419+
entry := &IptEntry{
420+
Chain: util.IptablesForwardChain,
421+
Specs: []string{
422+
util.IptablesJumpFlag,
423+
util.IptablesAzureChain,
424+
},
425+
}
426+
427+
index := 1
428+
// retrieve KUBE-SERVICES index
429+
kubeServicesLine, err := iptMgr.GetChainLineNumber(util.IptablesKubeServicesChain, util.IptablesForwardChain)
430+
if err != nil {
431+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to get index of KUBE-SERVICES in FORWARD chain with error: %s", err.Error())
432+
return err
433+
}
434+
435+
index = kubeServicesLine + 1
436+
437+
exists, err := iptMgr.Exists(entry)
438+
if err != nil {
439+
return err
440+
}
441+
442+
if !exists {
443+
// position Azure-NPM chain after Kube-Forward and Kube-Service chains if it exists
444+
iptMgr.OperationFlag = util.IptablesInsertionFlag
445+
entry.Specs = append([]string{strconv.Itoa(index)}, entry.Specs...)
446+
if _, err = iptMgr.Run(entry); err != nil {
447+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain.")
448+
return err
449+
}
450+
451+
return nil
452+
}
453+
454+
npmChainLine, err := iptMgr.GetChainLineNumber(util.IptablesAzureChain, util.IptablesForwardChain)
455+
if err != nil {
456+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to get index of AZURE-NPM in FORWARD chain with error: %s", err.Error())
457+
return err
458+
}
459+
460+
// Kube-services line number is less than npm chain line number then all good
461+
if kubeServicesLine < npmChainLine {
462+
return nil
463+
} else if kubeServicesLine <= 0 {
464+
return nil
465+
}
466+
467+
errCode := 0
468+
// NPM Chain number is less than KUBE-SERVICES then
469+
// delete existing NPM chain and add it in the right order
470+
iptMgr.OperationFlag = util.IptablesDeletionFlag
471+
metrics.SendErrorLogAndMetric(util.IptmID, "Info: Reconciler deleting and re-adding AZURE-NPM in FORWARD table.")
472+
if errCode, err = iptMgr.Run(entry); err != nil {
473+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to delete AZURE-NPM chain from FORWARD chain with error code %d.", errCode)
474+
return err
475+
}
476+
iptMgr.OperationFlag = util.IptablesInsertionFlag
477+
// Reduce index for deleted AZURE-NPM chain
478+
if index > 1 {
479+
index--
480+
}
481+
entry.Specs = append([]string{strconv.Itoa(index)}, entry.Specs...)
482+
if errCode, err = iptMgr.Run(entry); err != nil {
483+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain with error code %d.", errCode)
484+
return err
485+
}
486+
487+
return nil
488+
}
489+
440490
// UninitNpmChains uninitializes Azure NPM chains in iptables.
441491
func (iptMgr *IptablesManager) UninitNpmChains() error {
442492
IptablesAzureChainList := []string{
@@ -518,6 +568,44 @@ func (iptMgr *IptablesManager) AddChain(chain string) error {
518568
return nil
519569
}
520570

571+
// GetChainLineNumber given a Chain and its parent chain returns line number
572+
func (iptMgr *IptablesManager) GetChainLineNumber(chain string, parentChain string) (int, error) {
573+
574+
var (
575+
output []byte
576+
err error
577+
)
578+
579+
cmdName := util.Iptables
580+
cmdArgs := []string{"-t", "filter", "-n", "--list", parentChain, "--line-numbers"}
581+
582+
iptFilterEntries := exec.Command(cmdName, cmdArgs...)
583+
grep := exec.Command("grep", chain)
584+
pipe, err := iptFilterEntries.StdoutPipe()
585+
if err != nil {
586+
return 0, err
587+
}
588+
defer pipe.Close()
589+
grep.Stdin = pipe
590+
591+
if err = iptFilterEntries.Start(); err != nil {
592+
return 0, err
593+
}
594+
// Without this wait, defunct iptable child process are created
595+
defer iptFilterEntries.Wait()
596+
597+
if output, err = grep.CombinedOutput(); err != nil {
598+
// grep returns err status 1 if not found
599+
return 0, nil
600+
}
601+
602+
if len(output) > 2 {
603+
lineNum, _ := strconv.Atoi(string(output[0]))
604+
return lineNum, nil
605+
}
606+
return 0, nil
607+
}
608+
521609
// DeleteChain deletes a chain from iptables.
522610
func (iptMgr *IptablesManager) DeleteChain(chain string) error {
523611
entry := &IptEntry{
@@ -605,7 +693,11 @@ func (iptMgr *IptablesManager) Run(entry *IptEntry) (int, error) {
605693
if msg, failed := err.(*exec.ExitError); failed {
606694
errCode := msg.Sys().(syscall.WaitStatus).ExitStatus()
607695
if errCode > 0 && iptMgr.OperationFlag != util.IptablesCheckFlag {
608-
metrics.SendErrorLogAndMetric(util.IptmID, "Error: There was an error running command: [%s %v] Stderr: [%v, %s]", cmdName, strings.Join(cmdArgs, " "), err, strings.TrimSuffix(string(msg.Stderr), "\n"))
696+
msgStr := strings.TrimSuffix(string(msg.Stderr), "\n")
697+
if strings.Contains(msgStr, "Chain already exists") && iptMgr.OperationFlag == util.IptablesChainCreationFlag {
698+
return 0, nil
699+
}
700+
metrics.SendErrorLogAndMetric(util.IptmID, "Error: There was an error running command: [%s %v] Stderr: [%v, %s]", cmdName, strings.Join(cmdArgs, " "), err, msgStr)
609701
}
610702

611703
return errCode, err

npm/iptm/iptm_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,113 @@ func TestRun(t *testing.T) {
225225
}
226226
}
227227

228+
func TestGetChainLineNumber(t *testing.T) {
229+
iptMgr := &IptablesManager{}
230+
231+
var (
232+
lineNum int
233+
err error
234+
kubeExists bool
235+
npmExists bool
236+
)
237+
238+
if err = iptMgr.Save(util.IptablesTestConfigFile); err != nil {
239+
t.Errorf("TestGetChainLineNumber failed @ iptMgr.Save")
240+
}
241+
242+
defer func() {
243+
if err = iptMgr.Restore(util.IptablesTestConfigFile); err != nil {
244+
t.Errorf("TestGetChainLineNumber failed @ iptMgr.Restore")
245+
}
246+
}()
247+
248+
if err = iptMgr.AddChain(util.IptablesKubeServicesChain); err != nil {
249+
t.Errorf("TestGetChainLineNumber failed @ kube-services chain iptMgr.AddChain error: %s", err.Error())
250+
}
251+
252+
iptMgr.OperationFlag = util.IptablesCheckFlag
253+
entry := &IptEntry{
254+
Chain: util.IptablesForwardChain,
255+
Specs: []string{
256+
util.IptablesJumpFlag,
257+
util.IptablesKubeServicesChain,
258+
},
259+
}
260+
261+
if kubeExists, err = iptMgr.Exists(entry); err != nil {
262+
t.Errorf("TestGetChainLineNumber failed @ kube-services chain iptMgr.Exists error: %s", err.Error())
263+
}
264+
265+
entry = &IptEntry{
266+
Chain: util.IptablesForwardChain,
267+
Specs: []string{
268+
util.IptablesJumpFlag,
269+
util.IptablesAzureChain,
270+
},
271+
}
272+
273+
// Ignore not exists errors
274+
npmExists, _ = iptMgr.Exists(entry)
275+
276+
lineNum, err = iptMgr.GetChainLineNumber(util.IptablesAzureChain, util.IptablesForwardChain)
277+
if err != nil {
278+
t.Errorf("TestGetChainLineNumber @ initial iptMgr.GetChainLineNumber error: %s", err.Error())
279+
}
280+
281+
switch {
282+
case (npmExists && kubeExists):
283+
if lineNum != 3 {
284+
t.Errorf("TestGetChainLineNumber @ initial line number check iptMgr.GetChainLineNumber with npmExists: %t kubeExists: %t", npmExists, kubeExists)
285+
}
286+
case npmExists:
287+
if lineNum == 0 {
288+
t.Errorf("TestGetChainLineNumber @ initial line number check iptMgr.GetChainLineNumber with npmExists: %t kubeExists: %t", npmExists, kubeExists)
289+
}
290+
default:
291+
if lineNum != 0 {
292+
t.Errorf("TestGetChainLineNumber @ initial line number check iptMgr.GetChainLineNumber with npmExists: %t kubeExists: %t", npmExists, kubeExists)
293+
}
294+
}
295+
296+
if err = iptMgr.InitNpmChains(); err != nil {
297+
t.Errorf("TestGetChainLineNumber @ iptMgr.InitNpmChains error: %s", err.Error())
298+
}
299+
300+
entry = &IptEntry{
301+
Chain: util.IptablesForwardChain,
302+
Specs: []string{
303+
util.IptablesJumpFlag,
304+
util.IptablesAzureChain,
305+
},
306+
}
307+
308+
if npmExists, err = iptMgr.Exists(entry); err != nil {
309+
t.Errorf("TestGetChainLineNumber failed @ azure-npm chain iptMgr.Exists error: %s", err.Error())
310+
}
311+
312+
lineNum, err = iptMgr.GetChainLineNumber(util.IptablesAzureChain, util.IptablesForwardChain)
313+
if err != nil {
314+
t.Errorf("TestGetChainLineNumber @ after Init chains iptMgr.GetChainLineNumber error: %s", err.Error())
315+
}
316+
317+
switch {
318+
case (npmExists && kubeExists):
319+
if lineNum < 2 {
320+
t.Errorf("TestGetChainLineNumber @ after Init chains line number check iptMgr.GetChainLineNumber with npmExists: %t kubeExists: %t", npmExists, kubeExists)
321+
}
322+
case npmExists:
323+
if lineNum == 0 {
324+
t.Errorf("TestGetChainLineNumber @ after Init chains line number check iptMgr.GetChainLineNumber with npmExists: %t kubeExists: %t", npmExists, kubeExists)
325+
}
326+
case !npmExists:
327+
t.Errorf("TestGetChainLineNumber @ after Init chains line number check iptMgr.GetChainLineNumber with failed to Add chain ")
328+
}
329+
330+
if err = iptMgr.UninitNpmChains(); err != nil {
331+
t.Errorf("TestGetChainLineNumber @ iptMgr.UninitNpmChains")
332+
}
333+
}
334+
228335
func TestMain(m *testing.M) {
229336
metrics.InitializeAll()
230337
iptMgr := NewIptablesManager()

npm/npm.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const (
3535
backupWaitTimeInSeconds = 60
3636
telemetryRetryTimeInSeconds = 60
3737
heartbeatIntervalInMinutes = 30
38+
reconcileChainTimeInMinutes = 5
3839
)
3940

4041
// NetworkPolicyManager contains informers for pod, namespace and networkpolicy.
@@ -183,6 +184,7 @@ func (npMgr *NetworkPolicyManager) Start(stopCh <-chan struct{}) error {
183184
return fmt.Errorf("Network policy informer failed to sync")
184185
}
185186

187+
go npMgr.reconcileChains()
186188
go npMgr.backup()
187189

188190
return nil
@@ -404,3 +406,15 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in
404406

405407
return npMgr
406408
}
409+
410+
// reconcileChains checks for ordering of AZURE-NPM chain in FORWARD chain periodically.
411+
func (npMgr *NetworkPolicyManager) reconcileChains() error {
412+
iptMgr := iptm.NewIptablesManager()
413+
select {
414+
case <-time.After(reconcileChainTimeInMinutes * time.Minute):
415+
if err := iptMgr.CheckAndAddForwardChain(); err != nil {
416+
return err
417+
}
418+
}
419+
return nil
420+
}

npm/util/const.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const (
7272
IptablesAzureEgressPortChain string = "AZURE-NPM-EGRESS-PORT"
7373
IptablesAzureEgressToChain string = "AZURE-NPM-EGRESS-TO"
7474
IptablesAzureTargetSetsChain string = "AZURE-NPM-TARGET-SETS"
75+
IptablesKubeServicesChain string = "KUBE-SERVICES"
7576
IptablesForwardChain string = "FORWARD"
7677
IptablesInputChain string = "INPUT"
7778
// Below chains exists only for before Azure-NPM:v1.0.27

0 commit comments

Comments
 (0)