Skip to content

Commit 3763b20

Browse files
authored
refactor to use shared informers (#373)
* delete package app/watchers since we're now using shared informers * used shared informers for events and listing resources * install moq in travis test script
1 parent ed0dc39 commit 3763b20

14 files changed

+340
-940
lines changed

app/controllers/metrics_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (mc *MetricsController) Run(healthChan chan<- *ControllerHeartbeat, stopCh
154154
}
155155

156156
// NewMetricsController returns new MetricController object
157-
func NewMetricsController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*MetricsController, error) {
157+
func NewMetricsController(clientset kubernetes.Interface, config *options.KubeRouterConfig) (*MetricsController, error) {
158158
mc := MetricsController{}
159159
mc.MetricsPath = config.MetricsPath
160160
mc.MetricsPort = config.MetricsPort

app/controllers/network_policy_controller.go

Lines changed: 134 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package controllers
33
import (
44
"crypto/sha256"
55
"encoding/base32"
6-
"encoding/json"
76
"errors"
87
"fmt"
98
"net"
@@ -13,15 +12,18 @@ import (
1312
"time"
1413

1514
"github.com/cloudnativelabs/kube-router/app/options"
16-
"github.com/cloudnativelabs/kube-router/app/watchers"
1715
"github.com/cloudnativelabs/kube-router/utils"
1816
"github.com/coreos/go-iptables/iptables"
1917
"github.com/golang/glog"
2018
"github.com/prometheus/client_golang/prometheus"
19+
2120
api "k8s.io/api/core/v1"
2221
apiextensions "k8s.io/api/extensions/v1beta1"
2322
networking "k8s.io/api/networking/v1"
23+
"k8s.io/apimachinery/pkg/labels"
2424
"k8s.io/client-go/kubernetes"
25+
listers "k8s.io/client-go/listers/core/v1"
26+
"k8s.io/client-go/tools/cache"
2527
)
2628

2729
const (
@@ -51,6 +53,14 @@ type NetworkPolicyController struct {
5153
// list of all active network policies expressed as networkPolicyInfo
5254
networkPoliciesInfo *[]networkPolicyInfo
5355
ipSetHandler *utils.IPSet
56+
57+
podLister cache.Indexer
58+
npLister cache.Indexer
59+
nsLister cache.Indexer
60+
61+
PodEventHandler cache.ResourceEventHandler
62+
NamespaceEventHandler cache.ResourceEventHandler
63+
NetworkPolicyEventHandler cache.ResourceEventHandler
5464
}
5565

5666
// internal structure to represent a network policy
@@ -120,16 +130,12 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
120130
default:
121131
}
122132

123-
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
124-
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
125-
err := npc.Sync()
126-
if err != nil {
127-
glog.Errorf("Error during periodic sync: " + err.Error())
128-
} else {
129-
sendHeartBeat(healthChan, "NPC")
130-
}
133+
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
134+
err := npc.Sync()
135+
if err != nil {
136+
glog.Errorf("Error during periodic sync: " + err.Error())
131137
} else {
132-
continue
138+
sendHeartBeat(healthChan, "NPC")
133139
}
134140

135141
select {
@@ -142,46 +148,37 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
142148
}
143149

144150
// OnPodUpdate handles updates to pods from the Kubernetes api server
145-
func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
146-
glog.V(2).Infof("Received pod update namespace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name)
147-
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
148-
err := npc.Sync()
149-
if err != nil {
150-
glog.Errorf("Error syncing on pod update: %s", err)
151-
}
152-
} else {
153-
glog.V(2).Infof("Received pod update, but controller not in sync")
151+
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
152+
pod := obj.(*api.Pod)
153+
glog.V(2).Infof("Received pod update namespace:%s pod name:%s", pod.Namespace, pod.Name)
154+
155+
err := npc.Sync()
156+
if err != nil {
157+
glog.Errorf("Error syncing on pod update: %s", err)
154158
}
155159
}
156160

157161
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
158-
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
159-
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
160-
err := npc.Sync()
161-
if err != nil {
162-
glog.Errorf("Error syncing on network policy update: %s", err)
163-
}
164-
} else {
165-
glog.V(2).Info("Received network policy update, but controller not in sync")
162+
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
163+
err := npc.Sync()
164+
if err != nil {
165+
glog.Errorf("Error syncing on network policy update: %s", err)
166166
}
167167
}
168168

169169
// OnNamespaceUpdate handles updates to namespace from kubernetes api server
170-
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {
171-
170+
func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) {
172171
// namespace (and annotations on it) has no significance in GA ver of network policy
173172
if npc.v1NetworkPolicy {
174173
return
175174
}
176175

177-
glog.V(2).Infof("Received namespace update namespace:%s", namespaceUpdate.Namespace.Name)
178-
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
179-
err := npc.Sync()
180-
if err != nil {
181-
glog.Errorf("Error syncing on namespace update: %s", err)
182-
}
183-
} else {
184-
glog.V(2).Info("Received namespace update, but controller not in sync")
176+
namespace := obj.(*api.Namespace)
177+
glog.V(2).Infof("Received update for namespace: %s", namespace.Name)
178+
179+
err := npc.Sync()
180+
if err != nil {
181+
glog.Errorf("Error syncing on namespace update: %s", err)
185182
}
186183
}
187184

@@ -204,13 +201,13 @@ func (npc *NetworkPolicyController) Sync() error {
204201
glog.V(1).Info("Starting periodic sync of iptables")
205202

206203
if npc.v1NetworkPolicy {
207-
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
204+
npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
208205
if err != nil {
209206
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
210207
}
211208
} else {
212209
// TODO remove the Beta support
213-
npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo()
210+
npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo()
214211
if err != nil {
215212
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
216213
}
@@ -948,7 +945,9 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
948945
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) {
949946
nodePods := make(map[string]podInfo)
950947

951-
for _, pod := range watchers.PodWatcher.List() {
948+
for _, obj := range npc.podLister.List() {
949+
pod := obj.(*api.Pod)
950+
952951
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
953952
continue
954953
}
@@ -975,7 +974,9 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str
975974

976975
nodePods := make(map[string]podInfo)
977976

978-
for _, pod := range watchers.PodWatcher.List() {
977+
for _, obj := range npc.podLister.List() {
978+
pod := obj.(*api.Pod)
979+
979980
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
980981
continue
981982
}
@@ -997,11 +998,11 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str
997998
return &nodePods, nil
998999
}
9991000

1000-
func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
1001+
func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
10011002

10021003
NetworkPolicies := make([]networkPolicyInfo, 0)
10031004

1004-
for _, policyObj := range watchers.NetworkPolicyWatcher.List() {
1005+
for _, policyObj := range npc.npLister.List() {
10051006

10061007
policy, ok := policyObj.(*networking.NetworkPolicy)
10071008
if !ok {
@@ -1042,7 +1043,7 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
10421043
}
10431044
}
10441045

1045-
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
1046+
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
10461047
newPolicy.targetPods = make(map[string]podInfo)
10471048
if err == nil {
10481049
for _, matchingPod := range matchingPods {
@@ -1094,15 +1095,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
10941095
for _, peer := range specIngressRule.From {
10951096
// spec must have either of PodSelector or NamespaceSelector
10961097
if peer.PodSelector != nil {
1097-
matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
1098+
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace,
10981099
peer.PodSelector.MatchLabels)
10991100
} else if peer.NamespaceSelector != nil {
1100-
namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels)
1101+
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
11011102
if err != nil {
11021103
return nil, errors.New("Failed to build network policies info due to " + err.Error())
11031104
}
11041105
for _, namespace := range namespaces {
1105-
namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil)
1106+
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil)
11061107
if err != nil {
11071108
return nil, errors.New("Failed to build network policies info due to " + err.Error())
11081109
}
@@ -1155,15 +1156,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
11551156
for _, peer := range specEgressRule.To {
11561157
// spec must have either of PodSelector or NamespaceSelector
11571158
if peer.PodSelector != nil {
1158-
matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
1159+
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace,
11591160
peer.PodSelector.MatchLabels)
11601161
} else if peer.NamespaceSelector != nil {
1161-
namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels)
1162+
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
11621163
if err != nil {
11631164
return nil, errors.New("Failed to build network policies info due to " + err.Error())
11641165
}
11651166
for _, namespace := range namespaces {
1166-
namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil)
1167+
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil)
11671168
if err != nil {
11681169
return nil, errors.New("Failed to build network policies info due to " + err.Error())
11691170
}
@@ -1192,19 +1193,37 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
11921193
return &NetworkPolicies, nil
11931194
}
11941195

1195-
func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
1196+
func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {
1197+
podLister := listers.NewPodLister(npc.podLister)
1198+
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector())
1199+
if err != nil {
1200+
return nil, err
1201+
}
1202+
return allMatchedNameSpacePods, nil
1203+
}
1204+
1205+
func (npc *NetworkPolicyController) ListNamespaceByLabels(set labels.Set) ([]*api.Namespace, error) {
1206+
namespaceLister := listers.NewNamespaceLister(npc.npLister)
1207+
matchedNamespaces, err := namespaceLister.List(set.AsSelector())
1208+
if err != nil {
1209+
return nil, err
1210+
}
1211+
return matchedNamespaces, nil
1212+
}
1213+
1214+
func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
11961215

11971216
NetworkPolicies := make([]networkPolicyInfo, 0)
11981217

1199-
for _, policyObj := range watchers.NetworkPolicyWatcher.List() {
1218+
for _, policyObj := range npc.npLister.List() {
12001219

12011220
policy, _ := policyObj.(*apiextensions.NetworkPolicy)
12021221
newPolicy := networkPolicyInfo{
12031222
name: policy.Name,
12041223
namespace: policy.Namespace,
12051224
labels: policy.Spec.PodSelector.MatchLabels,
12061225
}
1207-
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
1226+
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
12081227
newPolicy.targetPods = make(map[string]podInfo)
12091228
newPolicy.ingressRules = make([]ingressRule, 0)
12101229
if err == nil {
@@ -1227,7 +1246,7 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
12271246

12281247
ingressRule.srcPods = make([]podInfo, 0)
12291248
for _, peer := range specIngressRule.From {
1230-
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
1249+
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
12311250
if err == nil {
12321251
for _, matchingPod := range matchingPods {
12331252
ingressRule.srcPods = append(ingressRule.srcPods,
@@ -1246,25 +1265,6 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
12461265
return &NetworkPolicies, nil
12471266
}
12481267

1249-
func getNameSpaceDefaultPolicy(namespace string) (string, error) {
1250-
for _, nspw := range watchers.NamespaceWatcher.List() {
1251-
if strings.Compare(namespace, nspw.Name) == 0 {
1252-
networkPolicy, ok := nspw.ObjectMeta.Annotations[networkPolicyAnnotation]
1253-
var annot map[string]map[string]string
1254-
if ok {
1255-
err := json.Unmarshal([]byte(networkPolicy), &annot)
1256-
if err == nil {
1257-
return annot["ingress"]["isolation"], nil
1258-
}
1259-
glog.Errorf("Skipping invalid network-policy for namespace \"%s\": %s", namespace, err)
1260-
return "DefaultAllow", errors.New("Invalid NetworkPolicy")
1261-
}
1262-
return "DefaultAllow", nil
1263-
}
1264-
}
1265-
return "", errors.New("Failed to get the default ingress policy for the namespace: " + namespace)
1266-
}
1267-
12681268
func podFirewallChainName(namespace, podName string) string {
12691269
hash := sha256.Sum256([]byte(namespace + podName))
12701270
encoded := base32.StdEncoding.EncodeToString(hash[:])
@@ -1385,8 +1385,59 @@ func (npc *NetworkPolicyController) Cleanup() {
13851385
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
13861386
}
13871387

1388+
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
1389+
return cache.ResourceEventHandlerFuncs{
1390+
AddFunc: func(obj interface{}) {
1391+
npc.OnPodUpdate(obj)
1392+
1393+
},
1394+
UpdateFunc: func(oldObj, newObj interface{}) {
1395+
npc.OnPodUpdate(newObj)
1396+
1397+
},
1398+
DeleteFunc: func(obj interface{}) {
1399+
npc.OnPodUpdate(obj)
1400+
},
1401+
}
1402+
}
1403+
1404+
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
1405+
return cache.ResourceEventHandlerFuncs{
1406+
AddFunc: func(obj interface{}) {
1407+
npc.OnNamespaceUpdate(obj)
1408+
1409+
},
1410+
UpdateFunc: func(oldObj, newObj interface{}) {
1411+
npc.OnNamespaceUpdate(newObj)
1412+
1413+
},
1414+
DeleteFunc: func(obj interface{}) {
1415+
npc.OnNamespaceUpdate(obj)
1416+
1417+
},
1418+
}
1419+
}
1420+
1421+
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
1422+
return cache.ResourceEventHandlerFuncs{
1423+
AddFunc: func(obj interface{}) {
1424+
npc.OnNetworkPolicyUpdate(obj)
1425+
1426+
},
1427+
UpdateFunc: func(oldObj, newObj interface{}) {
1428+
npc.OnNetworkPolicyUpdate(newObj)
1429+
},
1430+
DeleteFunc: func(obj interface{}) {
1431+
npc.OnNetworkPolicyUpdate(obj)
1432+
1433+
},
1434+
}
1435+
}
1436+
13881437
// NewNetworkPolicyController returns new NetworkPolicyController object
1389-
func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) {
1438+
func NewNetworkPolicyController(clientset kubernetes.Interface,
1439+
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
1440+
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
13901441
npc := NetworkPolicyController{}
13911442

13921443
if config.MetricsEnabled {
@@ -1427,9 +1478,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options
14271478
}
14281479
npc.ipSetHandler = ipset
14291480

1430-
watchers.PodWatcher.RegisterHandler(&npc)
1431-
watchers.NetworkPolicyWatcher.RegisterHandler(&npc)
1432-
watchers.NamespaceWatcher.RegisterHandler(&npc)
1481+
npc.podLister = podInformer.GetIndexer()
1482+
npc.PodEventHandler = npc.newPodEventHandler()
1483+
1484+
npc.nsLister = nsInformer.GetIndexer()
1485+
npc.NamespaceEventHandler = npc.newNamespaceEventHandler()
1486+
1487+
npc.npLister = npInformer.GetIndexer()
1488+
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()
14331489

14341490
return &npc, nil
14351491
}

0 commit comments

Comments
 (0)