Skip to content

Commit 97d3ddc

Browse files
author
Yongli Chen
authored
Honoring xtables lock (#315)
1 parent 83945fd commit 97d3ddc

File tree

11 files changed

+160
-33
lines changed

11 files changed

+160
-33
lines changed

npm/iptm/iptm.go

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
1+
/*
2+
3+
Part of this file is modified from iptables package from Kuberenetes.
4+
https://github.com/kubernetes/kubernetes/blob/master/pkg/util/iptables
5+
6+
*/
17
package iptm
28

39
import (
410
"os"
511
"os/exec"
612
"syscall"
13+
"time"
14+
15+
"golang.org/x/sys/unix"
716

817
"github.com/Azure/azure-container-networking/log"
918
"github.com/Azure/azure-container-networking/npm/util"
19+
"k8s.io/apimachinery/pkg/util/wait"
1020
)
1121

1222
// IptEntry represents an iptables rule.
@@ -320,7 +330,7 @@ func (iptMgr *IptablesManager) Delete(entry *IptEntry) error {
320330
// Run execute an iptables command to update iptables.
321331
func (iptMgr *IptablesManager) Run(entry *IptEntry) (int, error) {
322332
cmdName := util.Iptables
323-
cmdArgs := append([]string{iptMgr.OperationFlag, entry.Chain}, entry.Specs...)
333+
cmdArgs := append([]string{util.IptablesWaitFlag, iptMgr.OperationFlag, entry.Chain}, entry.Specs...)
324334

325335
cmdOut, err := exec.Command(cmdName, cmdArgs...).Output()
326336
log.Printf("%s\n", string(cmdOut))
@@ -343,6 +353,17 @@ func (iptMgr *IptablesManager) Save(configFile string) error {
343353
configFile = util.IptablesConfigFile
344354
}
345355

356+
l, err := grabIptablesLocks()
357+
if err != nil {
358+
return err
359+
}
360+
361+
defer func(l *os.File) {
362+
if err = l.Close(); err != nil {
363+
log.Printf("Failed to close iptables locks")
364+
}
365+
}(l)
366+
346367
// create the config file for writing
347368
f, err := os.Create(configFile)
348369
if err != nil {
@@ -354,7 +375,7 @@ func (iptMgr *IptablesManager) Save(configFile string) error {
354375
cmd := exec.Command(util.IptablesSave)
355376
cmd.Stdout = f
356377
if err := cmd.Start(); err != nil {
357-
log.Printf("Error running iptables-save.\n")
378+
log.Printf("Error running iptables-save.")
358379
return err
359380
}
360381
cmd.Wait()
@@ -368,6 +389,17 @@ func (iptMgr *IptablesManager) Restore(configFile string) error {
368389
configFile = util.IptablesConfigFile
369390
}
370391

392+
l, err := grabIptablesLocks()
393+
if err != nil {
394+
return err
395+
}
396+
397+
defer func(l *os.File) {
398+
if err = l.Close(); err != nil {
399+
log.Printf("Failed to close iptables locks")
400+
}
401+
}(l)
402+
371403
// open the config file for reading
372404
f, err := os.Open(configFile)
373405
if err != nil {
@@ -386,3 +418,41 @@ func (iptMgr *IptablesManager) Restore(configFile string) error {
386418

387419
return nil
388420
}
421+
422+
// grabs iptables v1.6 xtable lock
423+
func grabIptablesLocks() (*os.File, error) {
424+
var success bool
425+
426+
l := &os.File{}
427+
defer func(l *os.File) {
428+
// Clean up immediately on failure
429+
if !success {
430+
l.Close()
431+
}
432+
}(l)
433+
434+
// Grab 1.6.x style lock.
435+
l, err := os.OpenFile(util.IptablesLockFile, os.O_CREATE, 0600)
436+
if err != nil {
437+
log.Printf("failed to open iptables lock")
438+
return nil, err
439+
}
440+
441+
if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) {
442+
if err := grabIptablesFileLock(l); err != nil {
443+
return false, nil
444+
}
445+
446+
return true, nil
447+
}); err != nil {
448+
log.Printf("failed to acquire new iptables lock: %v", err)
449+
return nil, err
450+
}
451+
452+
success = true
453+
return l, nil
454+
}
455+
456+
func grabIptablesFileLock(f *os.File) error {
457+
return unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB)
458+
}

npm/namespace.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ func (npMgr *NetworkPolicyManager) AddNamespace(nsObj *corev1.Namespace) error {
121121
}
122122
npMgr.nsMap[nsName] = ns
123123

124-
npMgr.clusterState.NsCount++
125-
126124
return nil
127125
}
128126

@@ -203,7 +201,5 @@ func (npMgr *NetworkPolicyManager) DeleteNamespace(nsObj *corev1.Namespace) erro
203201

204202
delete(npMgr.nsMap, nsName)
205203

206-
npMgr.clusterState.NsCount--
207-
208204
return nil
209205
}

npm/namespace_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ func TestAllNsList(t *testing.T) {
4545

4646
func TestAddNamespace(t *testing.T) {
4747
npMgr := &NetworkPolicyManager{
48-
nsMap: make(map[string]*namespace),
48+
nsMap: make(map[string]*namespace),
49+
TelemetryEnabled: false,
4950
reportManager: &telemetry.ReportManager{
5051
HostNetAgentURL: hostNetAgentURLForNpm,
5152
ContentType: contentType,
@@ -86,7 +87,8 @@ func TestAddNamespace(t *testing.T) {
8687

8788
func TestUpdateNamespace(t *testing.T) {
8889
npMgr := &NetworkPolicyManager{
89-
nsMap: make(map[string]*namespace),
90+
nsMap: make(map[string]*namespace),
91+
TelemetryEnabled: false,
9092
reportManager: &telemetry.ReportManager{
9193
HostNetAgentURL: hostNetAgentURLForNpm,
9294
ContentType: contentType,
@@ -140,7 +142,8 @@ func TestUpdateNamespace(t *testing.T) {
140142

141143
func TestDeleteNamespace(t *testing.T) {
142144
npMgr := &NetworkPolicyManager{
143-
nsMap: make(map[string]*namespace),
145+
nsMap: make(map[string]*namespace),
146+
TelemetryEnabled: false,
144147
reportManager: &telemetry.ReportManager{
145148
HostNetAgentURL: hostNetAgentURLForNpm,
146149
ContentType: contentType,

npm/npm.go

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/Azure/azure-container-networking/telemetry"
1515
corev1 "k8s.io/api/core/v1"
1616
networkingv1 "k8s.io/api/networking/v1"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1718
"k8s.io/apimachinery/pkg/version"
1819
"k8s.io/client-go/informers"
1920
coreinformers "k8s.io/client-go/informers/core/v1"
@@ -22,9 +23,10 @@ import (
2223
"k8s.io/client-go/tools/cache"
2324
)
2425

25-
var (
26-
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
27-
contentType = "application/json"
26+
const (
27+
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
28+
contentType = "application/json"
29+
telemetryRetryWaitTimeInSeconds = 60
2830
)
2931

3032
// NetworkPolicyManager contains informers for pod, namespace and networkpolicy.
@@ -44,17 +46,41 @@ type NetworkPolicyManager struct {
4446
clusterState telemetry.ClusterState
4547
reportManager *telemetry.ReportManager
4648

47-
serverVersion *version.Info
49+
serverVersion *version.Info
50+
TelemetryEnabled bool
4851
}
4952

5053
// GetClusterState returns current cluster state.
5154
func (npMgr *NetworkPolicyManager) GetClusterState() telemetry.ClusterState {
55+
pods, err := npMgr.clientset.CoreV1().Pods("").List(metav1.ListOptions{})
56+
if err != nil {
57+
log.Printf("Error Listing pods in GetClusterState")
58+
}
59+
60+
namespaces, err := npMgr.clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
61+
if err != nil {
62+
log.Printf("Error Listing namespaces in GetClusterState")
63+
}
64+
65+
networkpolicies, err := npMgr.clientset.NetworkingV1().NetworkPolicies("").List(metav1.ListOptions{})
66+
if err != nil {
67+
log.Printf("Error Listing networkpolicies in GetClusterState")
68+
}
69+
70+
npMgr.clusterState.PodCount = len(pods.Items)
71+
npMgr.clusterState.NsCount = len(namespaces.Items)
72+
npMgr.clusterState.NwPolicyCount = len(networkpolicies.Items)
73+
5274
return npMgr.clusterState
5375
}
5476

5577
// UpdateAndSendReport updates the npm report then send it.
5678
// This function should only be called when npMgr is locked.
5779
func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg string) error {
80+
if !npMgr.TelemetryEnabled {
81+
return nil
82+
}
83+
5884
clusterState := npMgr.GetClusterState()
5985
v := reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("ClusterState")
6086
if v.CanSet() {
@@ -69,7 +95,10 @@ func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg strin
6995
reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("EventMessage").SetString(err.Error())
7096
}
7197

72-
return npMgr.reportManager.SendReport(nil)
98+
var telemetryBuffer *telemetry.TelemetryBuffer
99+
connectToTelemetryServer(telemetryBuffer)
100+
101+
return npMgr.reportManager.SendReport(telemetryBuffer)
73102
}
74103

75104
// Run starts shared informers and waits for the shared informer cache to sync.
@@ -93,8 +122,33 @@ func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error {
93122
return nil
94123
}
95124

125+
func connectToTelemetryServer(telemetryBuffer *telemetry.TelemetryBuffer) {
126+
for {
127+
telemetryBuffer = telemetry.NewTelemetryBuffer("")
128+
err := telemetryBuffer.StartServer()
129+
if err == nil || telemetryBuffer.FdExists {
130+
connErr := telemetryBuffer.Connect()
131+
if connErr == nil {
132+
break
133+
}
134+
135+
log.Printf("[NPM-Telemetry] Failed to establish telemetry manager connection.")
136+
time.Sleep(time.Second * telemetryRetryWaitTimeInSeconds)
137+
}
138+
}
139+
}
140+
96141
// RunReportManager starts NPMReportManager and send telemetry periodically.
97142
func (npMgr *NetworkPolicyManager) RunReportManager() {
143+
if !npMgr.TelemetryEnabled {
144+
return
145+
}
146+
147+
var telemetryBuffer *telemetry.TelemetryBuffer
148+
connectToTelemetryServer(telemetryBuffer)
149+
150+
go telemetryBuffer.BufferAndPushData(time.Duration(0))
151+
98152
for {
99153
clusterState := npMgr.GetClusterState()
100154
v := reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("ClusterState")
@@ -104,11 +158,12 @@ func (npMgr *NetworkPolicyManager) RunReportManager() {
104158
v.FieldByName("NwPolicyCount").SetInt(int64(clusterState.NwPolicyCount))
105159
}
106160

107-
if err := npMgr.reportManager.SendReport(nil); err != nil {
108-
log.Printf("Error sending NPM telemetry report")
161+
if err := npMgr.reportManager.SendReport(telemetryBuffer); err != nil {
162+
log.Printf("[NPM-Telemetry] Error sending NPM telemetry report")
163+
connectToTelemetryServer(telemetryBuffer)
109164
}
110165

111-
time.Sleep(1 * time.Minute)
166+
time.Sleep(5 * time.Minute)
112167
}
113168
}
114169

@@ -150,7 +205,8 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in
150205
ContentType: contentType,
151206
Report: &telemetry.NPMReport{},
152207
},
153-
serverVersion: serverVersion,
208+
serverVersion: serverVersion,
209+
TelemetryEnabled: true,
154210
}
155211

156212
clusterID := util.GetClusterID(npMgr.nodeName)

npm/nwpolicy.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ func (npMgr *NetworkPolicyManager) AddNetworkPolicy(npObj *networkingv1.NetworkP
7272

7373
allNs.npMap[npName] = npObj
7474

75-
npMgr.clusterState.NwPolicyCount++
76-
7775
ns, err := newNs(npNs)
7876
if err != nil {
7977
log.Printf("Error creating namespace %s\n", npNs)
@@ -141,8 +139,6 @@ func (npMgr *NetworkPolicyManager) DeleteNetworkPolicy(npObj *networkingv1.Netwo
141139

142140
delete(allNs.npMap, npName)
143141

144-
npMgr.clusterState.NwPolicyCount--
145-
146142
if len(allNs.npMap) == 0 {
147143
if err = iptMgr.UninitNpmChains(); err != nil {
148144
log.Printf("Error uninitialize azure-npm chains.\n")

npm/nwpolicy_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import (
1818

1919
func TestAddNetworkPolicy(t *testing.T) {
2020
npMgr := &NetworkPolicyManager{
21-
nsMap: make(map[string]*namespace),
21+
nsMap: make(map[string]*namespace),
22+
TelemetryEnabled: false,
2223
reportManager: &telemetry.ReportManager{
2324
HostNetAgentURL: hostNetAgentURLForNpm,
2425
ContentType: contentType,
@@ -97,7 +98,8 @@ func TestAddNetworkPolicy(t *testing.T) {
9798

9899
func TestUpdateNetworkPolicy(t *testing.T) {
99100
npMgr := &NetworkPolicyManager{
100-
nsMap: make(map[string]*namespace),
101+
nsMap: make(map[string]*namespace),
102+
TelemetryEnabled: false,
101103
reportManager: &telemetry.ReportManager{
102104
HostNetAgentURL: hostNetAgentURLForNpm,
103105
ContentType: contentType,
@@ -204,7 +206,8 @@ func TestUpdateNetworkPolicy(t *testing.T) {
204206

205207
func TestDeleteNetworkPolicy(t *testing.T) {
206208
npMgr := &NetworkPolicyManager{
207-
nsMap: make(map[string]*namespace),
209+
nsMap: make(map[string]*namespace),
210+
TelemetryEnabled: false,
208211
reportManager: &telemetry.ReportManager{
209212
HostNetAgentURL: hostNetAgentURLForNpm,
210213
ContentType: contentType,

npm/parse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ func parseEgress(ns string, targetSets []string, rules []networkingv1.NetworkPol
573573
util.IptablesSetFlag,
574574
util.IptablesMatchSetFlag,
575575
hashedTargetSetName,
576-
util.IptablesDstFlag,
576+
util.IptablesSrcFlag,
577577
util.IptablesJumpFlag,
578578
util.IptablesAzureEgressToNsChain,
579579
},

npm/plugin/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ func main() {
6363
panic(err.Error)
6464
}
6565

66+
// Disable Azure-NPM telemetry for now since it might throttle wireserver.
67+
npMgr.TelemetryEnabled = false
6668
go npMgr.RunReportManager()
6769

6870
select {}

npm/pod.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ func (npMgr *NetworkPolicyManager) AddPod(podObj *corev1.Pod) error {
7272
labelKeys = append(labelKeys, labelKey)
7373
}
7474

75-
npMgr.clusterState.PodCount++
76-
7775
ns, err := newNs(podNs)
7876
if err != nil {
7977
log.Printf("Error creating namespace %s\n", podNs)
@@ -172,7 +170,5 @@ func (npMgr *NetworkPolicyManager) DeletePod(podObj *corev1.Pod) error {
172170
}
173171
}
174172

175-
npMgr.clusterState.PodCount--
176-
177173
return nil
178174
}

0 commit comments

Comments
 (0)