@@ -18,7 +18,6 @@ import (
1818 "github.com/cloudnativelabs/kube-router/utils"
1919 "github.com/coreos/go-iptables/iptables"
2020 "github.com/golang/glog"
21- "github.com/janeczku/go-ipset/ipset"
2221 "k8s.io/client-go/kubernetes"
2322 api "k8s.io/client-go/pkg/api/v1"
2423 apiv1 "k8s.io/client-go/pkg/api/v1"
@@ -46,6 +45,7 @@ type NetworkPolicyController struct {
4645
4746 // list of all active network policies expressed as networkPolicyInfo
4847 networkPoliciesInfo * []networkPolicyInfo
48+ ipset * utils.IPSet
4949}
5050
5151// internal structure to represent a network policy
@@ -192,7 +192,7 @@ func (npc *NetworkPolicyController) Sync() error {
192192
193193 }
194194
195- activePolicyChains , err := npc .syncNetworkPolicyChains ()
195+ activePolicyChains , activePolicyIpSets , err := npc .syncNetworkPolicyChains ()
196196 if err != nil {
197197 return errors .New ("Aborting sync. Failed to sync network policy chains: " + err .Error ())
198198 }
@@ -202,7 +202,7 @@ func (npc *NetworkPolicyController) Sync() error {
202202 return errors .New ("Aborting sync. Failed to sync pod firewalls: " + err .Error ())
203203 }
204204
205- err = cleanupStaleRules (activePolicyChains , activePodFwChains )
205+ err = cleanupStaleRules (activePolicyChains , activePodFwChains , activePolicyIpSets )
206206 if err != nil {
207207 return errors .New ("Aborting sync. Failed to cleanup stale iptable rules: " + err .Error ())
208208 }
@@ -215,9 +215,10 @@ func (npc *NetworkPolicyController) Sync() error {
215215// is used for matching destination ip address. Each ingress rule in the network
216216// policyspec is evaluated to set of matching pods, which are grouped in to a
217217// ipset used for source ip addr matching.
218- func (npc * NetworkPolicyController ) syncNetworkPolicyChains () (map [string ]bool , error ) {
218+ func (npc * NetworkPolicyController ) syncNetworkPolicyChains () (map [string ]bool , map [ string ] bool , error ) {
219219
220220 activePolicyChains := make (map [string ]bool )
221+ activePolicyIpSets := make (map [string ]bool )
221222
222223 iptablesCmdHandler , err := iptables .New ()
223224 if err != nil {
@@ -231,31 +232,34 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
231232 policyChainName := networkPolicyChainName (policy .namespace , policy .name )
232233 err := iptablesCmdHandler .NewChain ("filter" , policyChainName )
233234 if err != nil && err .(* iptables.Error ).ExitStatus () != 1 {
234- return nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
235+ return nil , nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
235236 }
236237
237238 activePolicyChains [policyChainName ] = true
238239
239240 // create a ipset for all destination pod ip's matched by the policy spec PodSelector
240241 destPodIpSetName := policyDestinationPodIpSetName (policy .namespace , policy .name )
241- destPodIpSet , err := ipset .New (destPodIpSetName , "hash:ip" , & ipset. Params {} )
242+ destPodIpSet , err := npc . ipset .Create (destPodIpSetName , utils . TypeHashIP , utils . OptionTimeout , "0" )
242243 if err != nil {
243- return nil , fmt .Errorf ("failed to create ipset: %s" , err .Error ())
244+ return nil , nil , fmt .Errorf ("failed to create ipset: %s" , err .Error ())
244245 }
245246
246247 // flush all entries in the set
247248 if destPodIpSet .Flush () != nil {
248- return nil , fmt .Errorf ("failed to flush ipset while syncing iptables: %s" , err .Error ())
249+ return nil , nil , fmt .Errorf ("failed to flush ipset while syncing iptables: %s" , err .Error ())
249250 }
251+
252+ activePolicyIpSets [destPodIpSet .Name ] = true
253+
250254 for k := range policy .destPods {
251255 // TODO restrict ipset to ip's of pods running on the node
252- destPodIpSet .Add (k , 0 )
256+ destPodIpSet .Add (k , utils . OptionTimeout , "0" )
253257 }
254258
255259 // TODO use iptables-restore to better implement the logic, than flush and add rules
256260 err = iptablesCmdHandler .ClearChain ("filter" , policyChainName )
257261 if err != nil && err .(* iptables.Error ).ExitStatus () != 1 {
258- return nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
262+ return nil , nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
259263 }
260264
261265 // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
@@ -270,17 +274,19 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
270274
271275 if len (ingressRule .srcPods ) != 0 {
272276 srcPodIpSetName := policySourcePodIpSetName (policy .namespace , policy .name , i )
273- srcPodIpSet , err := ipset .New (srcPodIpSetName , "hash:ip" , & ipset. Params {} )
277+ srcPodIpSet , err := npc . ipset .Create (srcPodIpSetName , utils . TypeHashIP , utils . OptionTimeout , "0" )
274278 if err != nil {
275- return nil , fmt .Errorf ("failed to create ipset: %s" , err .Error ())
279+ return nil , nil , fmt .Errorf ("failed to create ipset: %s" , err .Error ())
276280 }
277281 // flush all entries in the set
278282 if srcPodIpSet .Flush () != nil {
279- return nil , fmt .Errorf ("failed to flush ipset while syncing iptables: %s" , err .Error ())
283+ return nil , nil , fmt .Errorf ("failed to flush ipset while syncing iptables: %s" , err .Error ())
280284 }
281285
286+ activePolicyIpSets [srcPodIpSet .Name ] = true
287+
282288 for _ , pod := range ingressRule .srcPods {
283- srcPodIpSet .Add (pod .ip , 0 )
289+ srcPodIpSet .Add (pod .ip , utils . OptionTimeout , "0" )
284290 }
285291
286292 if len (ingressRule .ports ) != 0 {
@@ -297,7 +303,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
297303 "-j" , "ACCEPT" }
298304 err := iptablesCmdHandler .AppendUnique ("filter" , policyChainName , args ... )
299305 if err != nil {
300- return nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
306+ return nil , nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
301307 }
302308 }
303309 } else {
@@ -311,7 +317,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
311317 "-j" , "ACCEPT" }
312318 err := iptablesCmdHandler .AppendUnique ("filter" , policyChainName , args ... )
313319 if err != nil {
314- return nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
320+ return nil , nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
315321 }
316322 }
317323 }
@@ -329,7 +335,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
329335 "-j" , "ACCEPT" }
330336 err := iptablesCmdHandler .AppendUnique ("filter" , policyChainName , args ... )
331337 if err != nil {
332- return nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
338+ return nil , nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
333339 }
334340 }
335341 }
@@ -344,15 +350,15 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
344350 "-j" , "ACCEPT" }
345351 err := iptablesCmdHandler .AppendUnique ("filter" , policyChainName , args ... )
346352 if err != nil {
347- return nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
353+ return nil , nil , fmt .Errorf ("Failed to run iptables command: %s" , err .Error ())
348354 }
349355 }
350356 }
351357 }
352358
353359 glog .Infof ("Iptables chains in the filter table are synchronized with the network policies." )
354360
355- return activePolicyChains , nil
361+ return activePolicyChains , activePolicyIpSets , nil
356362}
357363
358364func (npc * NetworkPolicyController ) syncPodFirewallChains () (map [string ]bool , error ) {
@@ -478,15 +484,24 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
478484 return activePodFwChains , nil
479485}
480486
481- func cleanupStaleRules (activePolicyChains , activePodFwChains map [string ]bool ) error {
487+ func cleanupStaleRules (activePolicyChains , activePodFwChains , activePolicyIPSets map [string ]bool ) error {
482488
483489 cleanupPodFwChains := make ([]string , 0 )
484490 cleanupPolicyChains := make ([]string , 0 )
491+ cleanupPolicyIPSets := make ([]* utils.Set , 0 )
485492
486493 iptablesCmdHandler , err := iptables .New ()
487494 if err != nil {
488495 glog .Fatalf ("failed to initialize iptables command executor due to %s" , err .Error ())
489496 }
497+ ipsets , err := utils .NewIPSet ()
498+ if err != nil {
499+ glog .Fatalf ("failed to create ipsets command executor due to %s" , err .Error ())
500+ }
501+ err = ipsets .Save ()
502+ if err != nil {
503+ glog .Fatalf ("failed to initialize ipsets command executor due to %s" , err .Error ())
504+ }
490505
491506 // get the list of chains created for pod firewall and network policies
492507 chains , err := iptablesCmdHandler .ListChains ("filter" )
@@ -502,6 +517,14 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er
502517 }
503518 }
504519 }
520+ for _ , set := range ipsets .Sets {
521+ if strings .HasPrefix (set .Name , "KUBE-SRC-" ) ||
522+ strings .HasPrefix (set .Name , "KUBE-DST-" ) {
523+ if _ , ok := activePolicyIPSets [set .Name ]; ! ok {
524+ cleanupPolicyIPSets = append (cleanupPolicyIPSets , set )
525+ }
526+ }
527+ }
505528
506529 // cleanup FORWARD chain rules to jump to pod firewall
507530 for _ , chain := range cleanupPodFwChains {
@@ -584,7 +607,13 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er
584607 glog .Infof ("Deleted network policy chain: %s from the filter table" , policyChain )
585608 }
586609
587- // TODO delete unused ipsets
610+ // cleanup network policy ipsets
611+ for _ , set := range cleanupPolicyIPSets {
612+ err = set .Destroy ()
613+ if err != nil {
614+ return fmt .Errorf ("Failed to delete ipset %s due to %s" , set .Name , err )
615+ }
616+ }
588617 return nil
589618}
590619
@@ -932,7 +961,7 @@ func (npc *NetworkPolicyController) Cleanup() {
932961 }
933962
934963 // delete all ipsets
935- err = ipset .DestroyAll ()
964+ err = npc . ipset .Destroy ()
936965 if err != nil {
937966 glog .Errorf ("Failed to clean up ipsets: " + err .Error ())
938967 }
@@ -966,6 +995,16 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options
966995 }
967996 npc .nodeIP = nodeIP
968997
998+ ipset , err := utils .NewIPSet ()
999+ if err != nil {
1000+ return nil , err
1001+ }
1002+ err = ipset .Save ()
1003+ if err != nil {
1004+ return nil , err
1005+ }
1006+ npc .ipset = ipset
1007+
9691008 watchers .PodWatcher .RegisterHandler (& npc )
9701009 watchers .NetworkPolicyWatcher .RegisterHandler (& npc )
9711010 watchers .NamespaceWatcher .RegisterHandler (& npc )
0 commit comments