Skip to content

Commit 9484739

Browse files
authored
occm: implement a support for atomic routes update (#2134)
* occm: implement an optional atomic routes update * Fix IPv6 detection logic in routes
1 parent 2be12b3 commit 9484739

File tree

6 files changed

+242
-65
lines changed

6 files changed

+242
-65
lines changed

docs/openstack-cloud-controller-manager/using-openstack-cloud-controller-manager.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ The options in `Global` section are used for openstack-cloud-controller-manager
174174
For example, this option can be useful when having multiple or dual-stack interfaces attached to a node and needing a user-controlled, deterministic way of sorting the addresses.
175175
Default: ""
176176
177+
### Router
178+
179+
* `router-id`
180+
Specifies the Neutron router ID to manage Kubernetes cluster routes, e.g. for load balancers or compute instances that are not part of the Kubernetes cluster.
181+
177182
### Load Balancer
178183
179184
Although the openstack-cloud-controller-manager was initially implemented with Neutron-LBaaS support, Octavia is recommended now because Neutron-LBaaS has been deprecated since Queens OpenStack release cycle and no longer accepted new feature enhancements. As a result, lots of advanced features in openstack-cloud-controller-manager rely on Octavia, even the CI is running based on Octavia enabled OpenStack environment. Functionalities are not guaranteed if using Neutron-LBaaS.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ replace github.com/onsi/ginkgo/v2 => github.com/onsi/ginkgo/v2 v2.4.0
77
require (
88
github.com/container-storage-interface/spec v1.7.0
99
github.com/go-chi/chi/v5 v5.0.8
10-
github.com/gophercloud/gophercloud v1.2.0
10+
github.com/gophercloud/gophercloud v1.2.1-0.20230227135528-e7de1a394a6e
1111
github.com/gophercloud/utils v0.0.0-20230301065655-769528992f29
1212
github.com/hashicorp/go-version v1.6.0
1313
github.com/kubernetes-csi/csi-lib-utils v0.13.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
218218
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
219219
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
220220
github.com/gophercloud/gophercloud v1.1.1/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM=
221-
github.com/gophercloud/gophercloud v1.2.0 h1:1oXyj4g54KBg/kFtCdMM6jtxSzeIyg8wv4z1HoGPp1E=
222-
github.com/gophercloud/gophercloud v1.2.0/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM=
221+
github.com/gophercloud/gophercloud v1.2.1-0.20230227135528-e7de1a394a6e h1:tzpcnvGylThnTal3tFmmG8zwcmeP1Wicbt698WfUhss=
222+
github.com/gophercloud/gophercloud v1.2.1-0.20230227135528-e7de1a394a6e/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM=
223223
github.com/gophercloud/utils v0.0.0-20230301065655-769528992f29 h1:A2dT0wlliiBwnkWVPtSLXH1m36486l7sOwPRrjEm3hE=
224224
github.com/gophercloud/utils v0.0.0-20230301065655-769528992f29/go.mod h1:z4Dey7xsTUXgcB1C8elMvGRKTjV1ez0eoYQlMrduG1g=
225225
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=

pkg/openstack/openstack.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ type NetworkingOpts struct {
131131

132132
// RouterOpts is used for Neutron routes
133133
type RouterOpts struct {
134-
RouterID string `gcfg:"router-id"` // required
134+
RouterID string `gcfg:"router-id"`
135135
}
136136

137137
type ServerAttributesExt struct {
@@ -458,18 +458,23 @@ func (os *OpenStack) Routes() (cloudprovider.Routes, bool) {
458458
return nil, false
459459
}
460460

461-
if !netExts["extraroute"] {
461+
if !netExts["extraroute"] && !netExts["extraroute-atomic"] {
462462
klog.V(3).Info("Neutron extraroute extension not found, required for Routes support")
463463
return nil, false
464464
}
465465

466-
r, err := NewRoutes(os, network)
466+
r, err := NewRoutes(os, network, netExts["extraroute-atomic"])
467467
if err != nil {
468468
klog.Warningf("Error initialising Routes support: %v", err)
469469
return nil, false
470470
}
471471

472-
klog.V(1).Info("Claiming to support Routes")
472+
if netExts["extraroute-atomic"] {
473+
klog.V(1).Info("Claiming to support Routes with atomic updates")
474+
} else {
475+
klog.V(1).Info("Claiming to support Routes")
476+
}
477+
473478
return r, true
474479
}
475480

pkg/openstack/routes.go

Lines changed: 147 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync"
2323

2424
"github.com/gophercloud/gophercloud"
25+
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/extraroutes"
2526
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/routers"
2627
"github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
2728

@@ -40,21 +41,25 @@ type Routes struct {
4041
os *OpenStack
4142
// router's private network IDs
4243
networkIDs []string
43-
// OpenStack can modify only one route at once
44+
// whether Neutron supports "extraroute-atomic" extension
45+
atomicRoutes bool
46+
// Neutron with no "extraroute-atomic" extension can modify only one route at
47+
// once
4448
sync.Mutex
4549
}
4650

4751
var _ cloudprovider.Routes = &Routes{}
4852

4953
// NewRoutes creates a new instance of Routes
50-
func NewRoutes(os *OpenStack, network *gophercloud.ServiceClient) (cloudprovider.Routes, error) {
54+
func NewRoutes(os *OpenStack, network *gophercloud.ServiceClient, atomicRoutes bool) (cloudprovider.Routes, error) {
5155
if os.routeOpts.RouterID == "" {
5256
return nil, errors.ErrNoRouterID
5357
}
5458

5559
return &Routes{
56-
network: network,
57-
os: os,
60+
network: network,
61+
os: os,
62+
atomicRoutes: atomicRoutes,
5863
}, nil
5964
}
6065

@@ -90,9 +95,7 @@ func (r *Routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr
9095
}
9196

9297
// detect router's private network ID for further VM ports filtering
93-
r.Lock()
9498
r.networkIDs, err = getRouterNetworkIDs(r.network, r.os.routeOpts.RouterID)
95-
r.Unlock()
9699
if err != nil {
97100
return nil, err
98101
}
@@ -146,10 +149,16 @@ func getAddrByNodeName(name types.NodeName, needIPv6 bool, nodes []*v1.Node) str
146149
if ip == nil {
147150
continue
148151
}
149-
if needIPv6 && ip.To4() == nil {
152+
isIPv6 := ip.To4() == nil
153+
if needIPv6 {
154+
if isIPv6 {
155+
return v.Address
156+
}
157+
continue
158+
}
159+
if !isIPv6 {
150160
return v.Address
151161
}
152-
return v.Address
153162
}
154163
}
155164
}
@@ -184,6 +193,52 @@ func updateRoutes(network *gophercloud.ServiceClient, router *routers.Router, ne
184193
return unwinder, nil
185194
}
186195

196+
func addRoute(network *gophercloud.ServiceClient, routerID string, newRoute []routers.Route) (func(), error) {
197+
mc := metrics.NewMetricContext("router", "update")
198+
_, err := extraroutes.Add(network, routerID, extraroutes.Opts{
199+
Routes: &newRoute,
200+
}).Extract()
201+
if mc.ObserveRequest(err) != nil {
202+
return nil, err
203+
}
204+
205+
unwinder := func() {
206+
klog.V(4).Infof("Reverting routes change to router %v", routerID)
207+
mc := metrics.NewMetricContext("router", "update")
208+
_, err := extraroutes.Remove(network, routerID, extraroutes.Opts{
209+
Routes: &newRoute,
210+
}).Extract()
211+
if mc.ObserveRequest(err) != nil {
212+
klog.Warningf("Unable to reset routes during error unwind: %v", err)
213+
}
214+
}
215+
216+
return unwinder, nil
217+
}
218+
219+
func removeRoute(network *gophercloud.ServiceClient, routerID string, oldRoute []routers.Route) (func(), error) {
220+
mc := metrics.NewMetricContext("router", "update")
221+
_, err := extraroutes.Remove(network, routerID, extraroutes.Opts{
222+
Routes: &oldRoute,
223+
}).Extract()
224+
if mc.ObserveRequest(err) != nil {
225+
return nil, err
226+
}
227+
228+
unwinder := func() {
229+
klog.V(4).Infof("Reverting routes change to router %v", routerID)
230+
mc := metrics.NewMetricContext("router", "update")
231+
_, err := extraroutes.Add(network, routerID, extraroutes.Opts{
232+
Routes: &oldRoute,
233+
}).Extract()
234+
if mc.ObserveRequest(err) != nil {
235+
klog.Warningf("Unable to reset routes during error unwind: %v", err)
236+
}
237+
}
238+
239+
return unwinder, nil
240+
}
241+
187242
func updateAllowedAddressPairs(network *gophercloud.ServiceClient, port *ports.Port, newPairs []ports.AddressPair) (func(), error) {
188243
origPairs := port.AllowedAddressPairs // shallow copy
189244

@@ -211,9 +266,6 @@ func updateAllowedAddressPairs(network *gophercloud.ServiceClient, port *ports.P
211266

212267
// CreateRoute creates the described managed route
213268
func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
214-
r.Lock()
215-
defer r.Unlock()
216-
217269
ip, _, _ := net.ParseCIDR(route.DestinationCIDR)
218270
isCIDRv6 := ip.To4() == nil
219271

@@ -232,31 +284,50 @@ func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint s
232284

233285
klog.V(4).Infof("Using nexthop %v for node %v", addr, route.TargetNode)
234286

235-
mc := metrics.NewMetricContext("router", "get")
236-
router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract()
237-
if mc.ObserveRequest(err) != nil {
238-
return err
239-
}
287+
if !r.atomicRoutes {
288+
// classical logic
289+
r.Lock()
290+
defer r.Unlock()
240291

241-
routes := router.Routes
292+
mc := metrics.NewMetricContext("router", "get")
293+
router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract()
294+
if mc.ObserveRequest(err) != nil {
295+
return err
296+
}
242297

243-
for _, item := range routes {
244-
if item.DestinationCIDR == route.DestinationCIDR && item.NextHop == addr {
245-
klog.V(4).Infof("Skipping existing route: %v", route)
246-
return nil
298+
routes := router.Routes
299+
300+
for _, item := range routes {
301+
if item.DestinationCIDR == route.DestinationCIDR && item.NextHop == addr {
302+
klog.V(4).Infof("Skipping existing route: %v", route)
303+
return nil
304+
}
247305
}
248-
}
249306

250-
routes = append(routes, routers.Route{
251-
DestinationCIDR: route.DestinationCIDR,
252-
NextHop: addr,
253-
})
307+
routes = append(routes, routers.Route{
308+
DestinationCIDR: route.DestinationCIDR,
309+
NextHop: addr,
310+
})
254311

255-
unwind, err := updateRoutes(r.network, router, routes)
256-
if err != nil {
257-
return err
312+
unwind, err := updateRoutes(r.network, router, routes)
313+
if err != nil {
314+
return err
315+
}
316+
317+
defer onFailure.call(unwind)
318+
} else {
319+
// atomic route update
320+
route := []routers.Route{{
321+
DestinationCIDR: route.DestinationCIDR,
322+
NextHop: addr,
323+
}}
324+
unwind, err := addRoute(r.network, r.os.routeOpts.RouterID, route)
325+
if err != nil {
326+
return err
327+
}
328+
329+
defer onFailure.call(unwind)
258330
}
259-
defer onFailure.call(unwind)
260331

261332
// get the port of addr on target node.
262333
port, err := getPortByIP(r.network, addr, r.networkIDs)
@@ -291,9 +362,6 @@ func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint s
291362

292363
// DeleteRoute deletes the specified managed route
293364
func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
294-
r.Lock()
295-
defer r.Unlock()
296-
297365
klog.V(4).Infof("DeleteRoute(%v, %v)", clusterName, route)
298366

299367
onFailure := newCaller()
@@ -314,36 +382,57 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo
314382
}
315383
}
316384

317-
mc := metrics.NewMetricContext("router", "get")
318-
router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract()
319-
if mc.ObserveRequest(err) != nil {
320-
return err
321-
}
385+
if !r.atomicRoutes {
386+
// classical logic
387+
r.Lock()
388+
defer r.Unlock()
322389

323-
routes := router.Routes
324-
index := -1
325-
for i, item := range routes {
326-
if item.DestinationCIDR == route.DestinationCIDR && (item.NextHop == addr || route.Blackhole && item.NextHop == string(route.TargetNode)) {
327-
index = i
328-
break
390+
mc := metrics.NewMetricContext("router", "get")
391+
router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract()
392+
if mc.ObserveRequest(err) != nil {
393+
return err
329394
}
330-
}
331395

332-
if index == -1 {
333-
klog.V(4).Infof("Skipping non-existent route: %v", route)
334-
return nil
335-
}
396+
routes := router.Routes
397+
index := -1
398+
for i, item := range routes {
399+
if item.DestinationCIDR == route.DestinationCIDR && (item.NextHop == addr || route.Blackhole && item.NextHop == string(route.TargetNode)) {
400+
index = i
401+
break
402+
}
403+
}
336404

337-
// Delete element `index`
338-
routes[index] = routes[len(routes)-1]
339-
routes = routes[:len(routes)-1]
405+
if index == -1 {
406+
klog.V(4).Infof("Skipping non-existent route: %v", route)
407+
return nil
408+
}
340409

341-
unwind, err := updateRoutes(r.network, router, routes)
342-
// If this was a blackhole route we are done, there are no ports to update
343-
if err != nil || route.Blackhole {
344-
return err
410+
// Delete element `index`
411+
routes[index] = routes[len(routes)-1]
412+
routes = routes[:len(routes)-1]
413+
414+
unwind, err := updateRoutes(r.network, router, routes)
415+
// If this was a blackhole route we are done, there are no ports to update
416+
if err != nil || route.Blackhole {
417+
return err
418+
}
419+
420+
defer onFailure.call(unwind)
421+
} else {
422+
// atomic route update
423+
blackhole := route.Blackhole
424+
route := []routers.Route{{
425+
DestinationCIDR: route.DestinationCIDR,
426+
NextHop: addr,
427+
}}
428+
unwind, err := removeRoute(r.network, r.os.routeOpts.RouterID, route)
429+
// If this was a blackhole route we are done, there are no ports to update
430+
if err != nil || blackhole {
431+
return err
432+
}
433+
434+
defer onFailure.call(unwind)
345435
}
346-
defer onFailure.call(unwind)
347436

348437
// get the port of addr on target node.
349438
port, err := getPortByIP(r.network, addr, r.networkIDs)
@@ -352,7 +441,7 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo
352441
}
353442

354443
addrPairs := port.AllowedAddressPairs
355-
index = -1
444+
index := -1
356445
for i, item := range addrPairs {
357446
if item.IPAddress == route.DestinationCIDR {
358447
index = i

0 commit comments

Comments
 (0)