Skip to content

Commit 23d4362

Browse files
authored
split routing controllers to smaller modules by function (#406)
* split routing controllers to smaller modules by function * review comments
1 parent 05bec8b commit 23d4362

File tree

9 files changed

+1216
-1125
lines changed

9 files changed

+1216
-1125
lines changed

pkg/controllers/routing/aws.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package routing
2+
3+
import (
4+
"net/url"
5+
"strings"
6+
"time"
7+
8+
"github.com/aws/aws-sdk-go/aws"
9+
"github.com/aws/aws-sdk-go/aws/awserr"
10+
"github.com/aws/aws-sdk-go/aws/ec2metadata"
11+
"github.com/aws/aws-sdk-go/aws/session"
12+
"github.com/aws/aws-sdk-go/service/ec2"
13+
"github.com/golang/glog"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
)
16+
17+
// disableSourceDestinationCheck disables src-dst check of all the VM's when cluster
18+
// is provisioned on AWS. EC2 by default drops any packets originating or destination
19+
// to a VM with IP other than that of VM's ip. This check needs to be disabled so that
20+
// cross node pod-to-pod traffic can be sent and recived by a VM.
21+
func (nrc *NetworkRoutingController) disableSourceDestinationCheck() {
22+
nodes, err := nrc.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
23+
if err != nil {
24+
glog.Errorf("Failed to list nodes from API server due to: %s. Can not perform BGP peer sync", err.Error())
25+
return
26+
}
27+
28+
for _, node := range nodes.Items {
29+
if node.Spec.ProviderID == "" || !strings.HasPrefix(node.Spec.ProviderID, "aws") {
30+
return
31+
}
32+
providerID := strings.Replace(node.Spec.ProviderID, "///", "//", 1)
33+
URL, err := url.Parse(providerID)
34+
instanceID := URL.Path
35+
instanceID = strings.Trim(instanceID, "/")
36+
37+
sess, _ := session.NewSession(aws.NewConfig().WithMaxRetries(5))
38+
metadataClient := ec2metadata.New(sess)
39+
region, err := metadataClient.Region()
40+
if err != nil {
41+
glog.Errorf("Failed to disable source destination check due to: " + err.Error())
42+
return
43+
}
44+
sess.Config.Region = aws.String(region)
45+
ec2Client := ec2.New(sess)
46+
_, err = ec2Client.ModifyInstanceAttribute(
47+
&ec2.ModifyInstanceAttributeInput{
48+
InstanceId: aws.String(instanceID),
49+
SourceDestCheck: &ec2.AttributeBooleanValue{
50+
Value: aws.Bool(false),
51+
},
52+
},
53+
)
54+
if err != nil {
55+
awserr := err.(awserr.Error)
56+
if awserr.Code() == "UnauthorizedOperation" {
57+
nrc.ec2IamAuthorized = false
58+
glog.Errorf("Node does not have necessary IAM creds to modify instance attribute. So skipping disabling src-dst check.")
59+
return
60+
}
61+
glog.Errorf("Failed to disable source destination check due to: %v", err.Error())
62+
} else {
63+
glog.Infof("Disabled source destination check for the instance: " + instanceID)
64+
}
65+
66+
// to prevent EC2 rejecting API call due to API throttling give a delay between the calls
67+
time.Sleep(1000 * time.Millisecond)
68+
}
69+
}
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
package routing
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net"
7+
"strconv"
8+
"strings"
9+
"time"
10+
11+
"github.com/cloudnativelabs/kube-router/pkg/metrics"
12+
"github.com/cloudnativelabs/kube-router/pkg/utils"
13+
"github.com/golang/glog"
14+
"github.com/osrg/gobgp/config"
15+
gobgp "github.com/osrg/gobgp/server"
16+
v1core "k8s.io/api/core/v1"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/client-go/tools/cache"
19+
)
20+
21+
// Refresh the peer relationship with rest of the nodes in the cluster (iBGP peers). Node add/remove
22+
// events should ensure peer relationship with only currently active nodes. In case
23+
// we miss any events from API server this method which is called periodically
24+
// ensures peer relationship with removed nodes is deleted.
25+
func (nrc *NetworkRoutingController) syncInternalPeers() {
26+
nrc.mu.Lock()
27+
defer nrc.mu.Unlock()
28+
29+
start := time.Now()
30+
defer func() {
31+
endTime := time.Since(start)
32+
metrics.ControllerBGPInternalPeersSyncTime.WithLabelValues().Set(float64(endTime))
33+
glog.V(2).Infof("Syncing BGP peers for the node took %v", endTime)
34+
}()
35+
36+
// get the current list of the nodes from API server
37+
nodes, err := nrc.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
38+
if err != nil {
39+
glog.Errorf("Failed to list nodes from API server due to: %s. Can not perform BGP peer sync", err.Error())
40+
return
41+
}
42+
43+
metrics.ControllerBPGpeers.WithLabelValues().Set(float64(len(nodes.Items)))
44+
// establish peer and add Pod CIDRs with current set of nodes
45+
currentNodes := make([]string, 0)
46+
for _, node := range nodes.Items {
47+
nodeIP, _ := utils.GetNodeIP(&node)
48+
49+
// skip self
50+
if nodeIP.String() == nrc.nodeIP.String() {
51+
continue
52+
}
53+
54+
// we are rr-client peer only with rr-server
55+
if nrc.bgpRRClient {
56+
if _, ok := node.ObjectMeta.Annotations[rrServerAnnotation]; !ok {
57+
continue
58+
}
59+
}
60+
61+
// if node full mesh is not requested then just peer with nodes with same ASN
62+
// (run iBGP among same ASN peers)
63+
if !nrc.bgpFullMeshMode {
64+
nodeasn, ok := node.ObjectMeta.Annotations[nodeASNAnnotation]
65+
if !ok {
66+
glog.Infof("Not peering with the Node %s as ASN number of the node is unknown.",
67+
nodeIP.String())
68+
continue
69+
}
70+
71+
asnNo, err := strconv.ParseUint(nodeasn, 0, 32)
72+
if err != nil {
73+
glog.Infof("Not peering with the Node %s as ASN number of the node is invalid.",
74+
nodeIP.String())
75+
continue
76+
}
77+
78+
// if the nodes ASN number is different from ASN number of current node skip peering
79+
if nrc.nodeAsnNumber != uint32(asnNo) {
80+
glog.Infof("Not peering with the Node %s as ASN number of the node is different.",
81+
nodeIP.String())
82+
continue
83+
}
84+
}
85+
86+
currentNodes = append(currentNodes, nodeIP.String())
87+
nrc.activeNodes[nodeIP.String()] = true
88+
n := &config.Neighbor{
89+
Config: config.NeighborConfig{
90+
NeighborAddress: nodeIP.String(),
91+
PeerAs: nrc.nodeAsnNumber,
92+
},
93+
}
94+
95+
if nrc.bgpGracefulRestart {
96+
n.GracefulRestart = config.GracefulRestart{
97+
Config: config.GracefulRestartConfig{
98+
Enabled: true,
99+
},
100+
State: config.GracefulRestartState{
101+
LocalRestarting: true,
102+
},
103+
}
104+
105+
n.AfiSafis = []config.AfiSafi{
106+
{
107+
Config: config.AfiSafiConfig{
108+
AfiSafiName: config.AFI_SAFI_TYPE_IPV4_UNICAST,
109+
Enabled: true,
110+
},
111+
MpGracefulRestart: config.MpGracefulRestart{
112+
Config: config.MpGracefulRestartConfig{
113+
Enabled: true,
114+
},
115+
},
116+
},
117+
}
118+
}
119+
120+
// we are rr-server peer with other rr-client with reflection enabled
121+
if nrc.bgpRRServer {
122+
if _, ok := node.ObjectMeta.Annotations[rrClientAnnotation]; ok {
123+
//add rr options with clusterId
124+
n.RouteReflector = config.RouteReflector{
125+
Config: config.RouteReflectorConfig{
126+
RouteReflectorClient: true,
127+
RouteReflectorClusterId: config.RrClusterIdType(nrc.bgpClusterID),
128+
},
129+
State: config.RouteReflectorState{
130+
RouteReflectorClient: true,
131+
RouteReflectorClusterId: config.RrClusterIdType(nrc.bgpClusterID),
132+
},
133+
}
134+
}
135+
}
136+
137+
// TODO: check if a node is alredy added as nieighbour in a better way than add and catch error
138+
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
139+
if !strings.Contains(err.Error(), "Can't overwrite the existing peer") {
140+
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP.String(), err)
141+
}
142+
}
143+
}
144+
145+
// find the list of the node removed, from the last known list of active nodes
146+
removedNodes := make([]string, 0)
147+
for ip := range nrc.activeNodes {
148+
stillActive := false
149+
for _, node := range currentNodes {
150+
if ip == node {
151+
stillActive = true
152+
break
153+
}
154+
}
155+
if !stillActive {
156+
removedNodes = append(removedNodes, ip)
157+
}
158+
}
159+
160+
// delete the neighbor for the nodes that are removed
161+
for _, ip := range removedNodes {
162+
n := &config.Neighbor{
163+
Config: config.NeighborConfig{
164+
NeighborAddress: ip,
165+
PeerAs: nrc.defaultNodeAsnNumber,
166+
},
167+
}
168+
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
169+
glog.Errorf("Failed to remove node %s as peer due to %s", ip, err)
170+
}
171+
delete(nrc.activeNodes, ip)
172+
}
173+
}
174+
175+
// connectToExternalBGPPeers adds all the configured eBGP peers (global or node specific) as neighbours
176+
func connectToExternalBGPPeers(server *gobgp.BgpServer, peerConfigs []*config.NeighborConfig, bgpGracefulRestart bool, peerMultihopTtl uint8) error {
177+
for _, peerConfig := range peerConfigs {
178+
n := &config.Neighbor{
179+
Config: *peerConfig,
180+
}
181+
182+
if bgpGracefulRestart {
183+
n.GracefulRestart = config.GracefulRestart{
184+
Config: config.GracefulRestartConfig{
185+
Enabled: true,
186+
},
187+
State: config.GracefulRestartState{
188+
LocalRestarting: true,
189+
},
190+
}
191+
192+
n.AfiSafis = []config.AfiSafi{
193+
{
194+
Config: config.AfiSafiConfig{
195+
AfiSafiName: config.AFI_SAFI_TYPE_IPV4_UNICAST,
196+
Enabled: true,
197+
},
198+
MpGracefulRestart: config.MpGracefulRestart{
199+
Config: config.MpGracefulRestartConfig{
200+
Enabled: true,
201+
},
202+
},
203+
},
204+
}
205+
}
206+
if peerMultihopTtl > 1 {
207+
n.EbgpMultihop = config.EbgpMultihop{
208+
Config: config.EbgpMultihopConfig{
209+
Enabled: true,
210+
MultihopTtl: peerMultihopTtl,
211+
},
212+
State: config.EbgpMultihopState{
213+
Enabled: true,
214+
MultihopTtl: peerMultihopTtl,
215+
},
216+
}
217+
}
218+
err := server.AddNeighbor(n)
219+
if err != nil {
220+
return fmt.Errorf("Error peering with peer router "+
221+
"%q due to: %s", peerConfig.NeighborAddress, err)
222+
}
223+
glog.V(2).Infof("Successfully configured %s in ASN %v as BGP peer to the node",
224+
peerConfig.NeighborAddress, peerConfig.PeerAs)
225+
}
226+
return nil
227+
}
228+
229+
// Does validation and returns neighbor configs
230+
func newGlobalPeers(ips []net.IP, asns []uint32, passwords []string) (
231+
[]*config.NeighborConfig, error) {
232+
peers := make([]*config.NeighborConfig, 0)
233+
234+
// Validations
235+
if len(ips) != len(asns) {
236+
return nil, errors.New("Invalid peer router config. " +
237+
"The number of IPs and ASN numbers must be equal.")
238+
}
239+
240+
if len(ips) != len(passwords) && len(passwords) != 0 {
241+
return nil, errors.New("Invalid peer router config. " +
242+
"The number of passwords should either be zero, or one per peer router." +
243+
" Use blank items if a router doesn't expect a password.\n" +
244+
"Example: \"pass,,pass\" OR [\"pass\",\"\",\"pass\"].")
245+
}
246+
247+
for i := 0; i < len(ips); i++ {
248+
if !((asns[i] >= 64512 && asns[i] <= 65535) ||
249+
(asns[i] >= 4200000000 && asns[i] <= 4294967294)) {
250+
return nil, fmt.Errorf("Invalid ASN number \"%d\" for global BGP peer",
251+
asns[i])
252+
}
253+
254+
peer := &config.NeighborConfig{
255+
NeighborAddress: ips[i].String(),
256+
PeerAs: asns[i],
257+
}
258+
259+
if len(passwords) != 0 {
260+
peer.AuthPassword = passwords[i]
261+
}
262+
263+
peers = append(peers, peer)
264+
}
265+
266+
return peers, nil
267+
}
268+
269+
func (nrc *NetworkRoutingController) newNodeEventHandler() cache.ResourceEventHandler {
270+
return cache.ResourceEventHandlerFuncs{
271+
AddFunc: func(obj interface{}) {
272+
node := obj.(*v1core.Node)
273+
nodeIP, _ := utils.GetNodeIP(node)
274+
275+
glog.V(2).Infof("Received node %s added update from watch API so peer with new node", nodeIP)
276+
nrc.OnNodeUpdate(obj)
277+
},
278+
UpdateFunc: func(oldObj, newObj interface{}) {
279+
// we are interested only node add/delete, so skip update
280+
return
281+
282+
},
283+
DeleteFunc: func(obj interface{}) {
284+
node := obj.(*v1core.Node)
285+
nodeIP, _ := utils.GetNodeIP(node)
286+
287+
glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
288+
nrc.OnNodeUpdate(obj)
289+
},
290+
}
291+
}
292+
293+
// OnNodeUpdate Handle updates from Node watcher. Node watcher calls this method whenever there is
294+
// new node is added or old node is deleted. So peer up with new node and drop peering
295+
// from old node
296+
func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) {
297+
if !nrc.bgpServerStarted {
298+
return
299+
}
300+
301+
if nrc.bgpEnableInternal {
302+
nrc.syncInternalPeers()
303+
}
304+
305+
// skip if first round of disableSourceDestinationCheck() is not done yet, this is to prevent
306+
// all the nodes for all the node add update trying to perfrom disableSourceDestinationCheck
307+
if nrc.initSrcDstCheckDone && nrc.ec2IamAuthorized {
308+
nrc.disableSourceDestinationCheck()
309+
}
310+
}

0 commit comments

Comments
 (0)