Skip to content

Commit a7ff712

Browse files
[occm] add a node selector support for loadbalancer services (kubernetes#2601) (kubernetes#2605)
* POC of TargetNodeLabels selector on OpenStack LB * Fix type errors * Update implementation of getKeyValuePropertiesFromServiceAnnotation * gofmt -w -s ./pkg * Polish the code and add documentation --------- Co-authored-by: Ririko Nakamura <[email protected]>
1 parent cf33536 commit a7ff712

File tree

7 files changed

+278
-7
lines changed

7 files changed

+278
-7
lines changed

docs/openstack-cloud-controller-manager/expose-applications-using-loadbalancer-type-service.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,12 @@ Request Body:
236236
This annotation is automatically added and it contains the floating ip address of the load balancer service.
237237
When using `loadbalancer.openstack.org/hostname` annotation it is the only place to see the real address of the load balancer.
238238

239+
- `loadbalancer.openstack.org/node-selector`
240+
241+
A set of key=value annotations used to filter nodes for targeting by the load balancer. When defined, only nodes that match all the specified key=value annotations will be targeted. If an annotation includes only a key without a value, the filter will check only for the existence of the key on the node. If the value is not set, the `node-selector` value defined in the OCCM configuration is applied.
242+
243+
Example: To filter nodes with the labels `env=production` and `region=default`, set the `loadbalancer.openstack.org/node-selector` annotation to `env=production, region=default`
244+
239245
### Switching between Floating Subnets by using preconfigured Classes
240246

241247
If you have multiple `FloatingIPPools` and/or `FloatingIPSubnets` it might be desirable to offer the user logical meanings for `LoadBalancers` like `internetFacing` or `DMZ` instead of requiring the user to select a dedicated network or subnet ID at the service object level as an annotation.

docs/openstack-cloud-controller-manager/using-openstack-cloud-controller-manager.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,23 @@ Although the openstack-cloud-controller-manager was initially implemented with N
237237
* `internal-lb`
238238
Determines whether or not to create an internal load balancer (no floating IP) by default. Default: false.
239239
240+
* `node-selector`
241+
A comma separated list of key=value annotations used to filter nodes for targeting by the load balancer. When defined, only nodes that match all the specified key=value annotations will be targeted. If an annotation includes only a key without a value, the filter will check only for the existence of the key on the node. When node-selector is not set (default value), all nodes will be added as members to a load balancer pool.
242+
243+
Note: This configuration option can be overridden with the `loadbalancer.openstack.org/node-selector` service annotation. Refer to [Exposing applications using services of LoadBalancer type](./expose-applications-using-loadbalancer-type-service.md)
244+
245+
Example: To filter nodes with the labels `env=production` and `region=default`, set the `node-selector` as follows:
246+
247+
```
248+
node-selector="env=production, region=default"
249+
```
250+
251+
Example: To filter nodes that have the key `env` with any value and the key `region` specifically set to `default`, set the `node-selector` as follows:
252+
253+
```
254+
node-selector="env, region=default"
255+
```
256+
240257
* `cascade-delete`
241258
Determines whether or not to perform cascade deletion of load balancers. Default: true.
242259

pkg/openstack/loadbalancer.go

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ const (
6767
annotationXForwardedFor = "X-Forwarded-For"
6868

6969
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
70+
ServiceAnnotationLoadBalancerNodeSelector = "loadbalancer.openstack.org/node-selector"
7071
ServiceAnnotationLoadBalancerConnLimit = "loadbalancer.openstack.org/connection-limit"
7172
ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
7273
ServiceAnnotationLoadBalancerFloatingSubnet = "loadbalancer.openstack.org/floating-subnet"
@@ -340,6 +341,7 @@ type serviceConfig struct {
340341
lbMemberSubnetID string
341342
lbPublicNetworkID string
342343
lbPublicSubnetSpec *floatingSubnetSpec
344+
nodeSelectors map[string]string
343345
keepClientIP bool
344346
enableProxyProtocol bool
345347
timeoutClientData int
@@ -643,6 +645,14 @@ func nodeAddressForLB(node *corev1.Node, preferredIPFamily corev1.IPFamily) (str
643645
return "", cpoerrors.ErrNoAddressFound
644646
}
645647

648+
// getKeyValueFromServiceAnnotation converts a comma-separated list of key-value
649+
// pairs from the specified annotation into a map or returns the specified
650+
// defaultSetting if the annotation is empty
651+
func getKeyValueFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting string) map[string]string {
652+
annotationValue := getStringFromServiceAnnotation(service, annotationKey, defaultSetting)
653+
return cpoutil.StringToMap(annotationValue)
654+
}
655+
646656
// getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
647657
func getStringFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting string) string {
648658
klog.V(4).Infof("getStringFromServiceAnnotation(%s/%s, %v, %v)", service.Namespace, service.Name, annotationKey, defaultSetting)
@@ -1560,6 +1570,16 @@ func (lbaas *LbaasV2) checkServiceUpdate(service *corev1.Service, nodes []*corev
15601570
svcConf.lbID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerID, "")
15611571
svcConf.supportLBTags = openstackutil.IsOctaviaFeatureSupported(lbaas.lb, openstackutil.OctaviaFeatureTags, lbaas.opts.LBProvider)
15621572

1573+
// Get service node-selector annotations
1574+
svcConf.nodeSelectors = getKeyValueFromServiceAnnotation(service, ServiceAnnotationLoadBalancerNodeSelector, lbaas.opts.NodeSelector)
1575+
for key, value := range svcConf.nodeSelectors {
1576+
if value == "" {
1577+
klog.V(3).InfoS("Target node label %s key is set to LoadBalancer service %s", key, serviceName)
1578+
} else {
1579+
klog.V(3).InfoS("Target node label %s=%s is set to LoadBalancer service %s", key, value, serviceName)
1580+
}
1581+
}
1582+
15631583
// Find subnet ID for creating members
15641584
memberSubnetID, err := lbaas.getMemberSubnetID(service, svcConf)
15651585
if err != nil {
@@ -1645,6 +1665,16 @@ func (lbaas *LbaasV2) checkService(service *corev1.Service, nodes []*corev1.Node
16451665
svcConf.lbID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerID, "")
16461666
svcConf.supportLBTags = openstackutil.IsOctaviaFeatureSupported(lbaas.lb, openstackutil.OctaviaFeatureTags, lbaas.opts.LBProvider)
16471667

1668+
// Get service node-selector annotations
1669+
svcConf.nodeSelectors = getKeyValueFromServiceAnnotation(service, ServiceAnnotationLoadBalancerNodeSelector, lbaas.opts.NodeSelector)
1670+
for key, value := range svcConf.nodeSelectors {
1671+
if value == "" {
1672+
klog.V(3).InfoS("Target node label %s key is set to LoadBalancer service %s", key, serviceName)
1673+
} else {
1674+
klog.V(3).InfoS("Target node label %s=%s is set to LoadBalancer service %s", key, value, serviceName)
1675+
}
1676+
}
1677+
16481678
// If in the config file internal-lb=true, user is not allowed to create external service.
16491679
if lbaas.opts.InternalLB {
16501680
if !getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerInternal, false) {
@@ -1949,6 +1979,9 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
19491979
return nil, err
19501980
}
19511981

1982+
// apply node-selector to a list of nodes
1983+
filteredNodes := filterNodes(nodes, svcConf.nodeSelectors)
1984+
19521985
// Use more meaningful name for the load balancer but still need to check the legacy name for backward compatibility.
19531986
lbName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
19541987
svcConf.lbName = lbName
@@ -2002,7 +2035,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
20022035
return nil, fmt.Errorf("error getting loadbalancer for Service %s: %v", serviceName, err)
20032036
}
20042037
klog.InfoS("Creating loadbalancer", "lbName", lbName, "service", klog.KObj(service))
2005-
loadbalancer, err = lbaas.createOctaviaLoadBalancer(lbName, clusterName, service, nodes, svcConf)
2038+
loadbalancer, err = lbaas.createOctaviaLoadBalancer(lbName, clusterName, service, filteredNodes, svcConf)
20062039
if err != nil {
20072040
return nil, fmt.Errorf("error creating loadbalancer %s: %v", lbName, err)
20082041
}
@@ -2045,7 +2078,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
20452078
return nil, err
20462079
}
20472080

2048-
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, lbName), listener, service, port, nodes, svcConf)
2081+
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, lbName), listener, service, port, filteredNodes, svcConf)
20492082
if err != nil {
20502083
return nil, err
20512084
}
@@ -2092,7 +2125,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
20922125
status := lbaas.createLoadBalancerStatus(service, svcConf, addr)
20932126

20942127
if lbaas.opts.ManageSecurityGroups {
2095-
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, nodes, svcConf)
2128+
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, filteredNodes, svcConf)
20962129
if err != nil {
20972130
return status, fmt.Errorf("failed when reconciling security groups for LB service %v/%v: %v", service.Namespace, service.Name, err)
20982131
}
@@ -2160,8 +2193,11 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
21602193
return err
21612194
}
21622195

2196+
// apply node-selector to a list of nodes
2197+
filteredNodes := filterNodes(nodes, svcConf.nodeSelectors)
2198+
21632199
serviceName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
2164-
klog.V(2).Infof("Updating %d nodes for Service %s in cluster %s", len(nodes), serviceName, clusterName)
2200+
klog.V(2).Infof("Updating %d nodes for Service %s in cluster %s", len(filteredNodes), serviceName, clusterName)
21652201

21662202
// Get load balancer
21672203
var loadbalancer *loadbalancers.LoadBalancer
@@ -2208,7 +2244,7 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
22082244
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadbalancer.ID, port.Port, port.Protocol)
22092245
}
22102246

2211-
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, loadbalancer.Name), &listener, service, port, nodes, svcConf)
2247+
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, loadbalancer.Name), &listener, service, port, filteredNodes, svcConf)
22122248
if err != nil {
22132249
return err
22142250
}
@@ -2220,7 +2256,7 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
22202256
}
22212257

22222258
if lbaas.opts.ManageSecurityGroups {
2223-
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, nodes, svcConf)
2259+
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, filteredNodes, svcConf)
22242260
if err != nil {
22252261
return fmt.Errorf("failed to update Security Group for loadbalancer service %s: %v", serviceName, err)
22262262
}
@@ -2728,3 +2764,35 @@ func PreserveGopherError(rawError error) error {
27282764
}
27292765
return rawError
27302766
}
2767+
2768+
// filterNodes uses node labels to filter the nodes that should be targeted by the LB,
2769+
// ensuring that all the labels provided in an annotation are present on the nodes
2770+
func filterNodes(nodes []*corev1.Node, filterLabels map[string]string) []*corev1.Node {
2771+
if len(filterLabels) == 0 {
2772+
return nodes
2773+
}
2774+
2775+
filteredNodes := make([]*corev1.Node, 0, len(nodes))
2776+
for _, node := range nodes {
2777+
if matchNodeLabels(node, filterLabels) {
2778+
filteredNodes = append(filteredNodes, node)
2779+
}
2780+
}
2781+
2782+
return filteredNodes
2783+
}
2784+
2785+
// matchNodeLabels checks if a node has all the labels in filterLabels with matching values
2786+
func matchNodeLabels(node *corev1.Node, filterLabels map[string]string) bool {
2787+
if node == nil || len(node.Labels) == 0 {
2788+
return false
2789+
}
2790+
2791+
for k, v := range filterLabels {
2792+
if nodeLabelValue, ok := node.Labels[k]; !ok || (v != "" && nodeLabelValue != v) {
2793+
return false
2794+
}
2795+
}
2796+
2797+
return true
2798+
}

pkg/openstack/loadbalancer_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
"github.com/gophercloud/gophercloud/openstack/loadbalancer/v2/listeners"
1010
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules"
11+
corev1 "k8s.io/api/core/v1"
12+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1113
)
1214

1315
type testPopListener struct {
@@ -460,3 +462,104 @@ func TestGetRulesToCreateAndDelete(t *testing.T) {
460462
})
461463
}
462464
}
465+
466+
func TestFilterNodes(t *testing.T) {
467+
tests := []struct {
468+
name string
469+
nodeLabels map[string]string
470+
service *corev1.Service
471+
annotationKey string
472+
defaultSetting map[string]string
473+
nodeFiltered bool
474+
}{
475+
{
476+
name: "when no filter is provided, node should be filtered",
477+
nodeLabels: map[string]string{"k1": "v1"},
478+
service: &corev1.Service{
479+
ObjectMeta: v1.ObjectMeta{},
480+
},
481+
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
482+
defaultSetting: make(map[string]string),
483+
nodeFiltered: true,
484+
},
485+
{
486+
name: "when all key-value filters match, node should be filtered",
487+
nodeLabels: map[string]string{"k1": "v1", "k2": "v2"},
488+
service: &corev1.Service{
489+
ObjectMeta: v1.ObjectMeta{
490+
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k1=v1,k2=v2"},
491+
},
492+
},
493+
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
494+
defaultSetting: make(map[string]string),
495+
nodeFiltered: true,
496+
},
497+
{
498+
name: "when all key-value filters match and a key value contains equals sign, node should be filtered",
499+
nodeLabels: map[string]string{"k1": "v1", "k2": "v2=true"},
500+
service: &corev1.Service{
501+
ObjectMeta: v1.ObjectMeta{
502+
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k1=v1,k2=v2=true"},
503+
},
504+
},
505+
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
506+
defaultSetting: make(map[string]string),
507+
nodeFiltered: true,
508+
},
509+
{
510+
name: "when all just-key filter match, node should be filtered",
511+
nodeLabels: map[string]string{"k1": "v1", "k2": "v2"},
512+
service: &corev1.Service{
513+
ObjectMeta: v1.ObjectMeta{
514+
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k1,k2"},
515+
},
516+
},
517+
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
518+
defaultSetting: make(map[string]string),
519+
nodeFiltered: true,
520+
},
521+
{
522+
name: "when some filters do not match, node should not be filtered",
523+
nodeLabels: map[string]string{"k1": "v1"},
524+
service: &corev1.Service{
525+
ObjectMeta: v1.ObjectMeta{
526+
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: " k1=v1, k2 "},
527+
},
528+
},
529+
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
530+
defaultSetting: make(map[string]string),
531+
nodeFiltered: false,
532+
},
533+
{
534+
name: "when no filter matches, node should not be filtered",
535+
nodeLabels: map[string]string{"k1": "v1", "k2": "v2"},
536+
service: &corev1.Service{
537+
ObjectMeta: v1.ObjectMeta{
538+
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k3=v3"},
539+
},
540+
},
541+
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
542+
defaultSetting: make(map[string]string),
543+
nodeFiltered: false,
544+
},
545+
}
546+
547+
for _, test := range tests {
548+
t.Run(test.name, func(t *testing.T) {
549+
node := &corev1.Node{}
550+
node.Labels = test.nodeLabels
551+
552+
// TODO: add testArgs
553+
targetNodeLabels := getKeyValueFromServiceAnnotation(test.service, ServiceAnnotationLoadBalancerNodeSelector, "")
554+
555+
nodes := []*corev1.Node{node}
556+
filteredNodes := filterNodes(nodes, targetNodeLabels)
557+
558+
if test.nodeFiltered {
559+
assert.Equal(t, nodes, filteredNodes)
560+
} else {
561+
assert.Empty(t, filteredNodes)
562+
}
563+
})
564+
}
565+
}

pkg/openstack/openstack.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ type LoadBalancerOpts struct {
114114
MonitorMaxRetries uint `gcfg:"monitor-max-retries"`
115115
MonitorMaxRetriesDown uint `gcfg:"monitor-max-retries-down"`
116116
ManageSecurityGroups bool `gcfg:"manage-security-groups"`
117-
InternalLB bool `gcfg:"internal-lb"` // default false
117+
InternalLB bool `gcfg:"internal-lb"` // default false
118+
NodeSelector string `gcfg:"node-selector"` // If specified, the loadbalancer members will be assined only from nodes list filtered by node-selector labels
118119
CascadeDelete bool `gcfg:"cascade-delete"`
119120
FlavorID string `gcfg:"flavor-id"`
120121
AvailabilityZone string `gcfg:"availability-zone"`
@@ -222,6 +223,7 @@ func ReadConfig(config io.Reader) (Config, error) {
222223
// Set default values explicitly
223224
cfg.LoadBalancer.Enabled = true
224225
cfg.LoadBalancer.InternalLB = false
226+
cfg.LoadBalancer.NodeSelector = ""
225227
cfg.LoadBalancer.LBProvider = "amphora"
226228
cfg.LoadBalancer.LBMethod = "ROUND_ROBIN"
227229
cfg.LoadBalancer.CreateMonitor = false

pkg/util/util.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -77,6 +78,31 @@ func Contains(list []string, strToSearch string) bool {
7778
return false
7879
}
7980

81+
// StringToMap converts a string of comma-separated key-values into a map
82+
func StringToMap(str string) map[string]string {
83+
// break up a "key1=val,key2=val2,key3=,key4" string into a list
84+
values := strings.Split(strings.TrimSpace(str), ",")
85+
keyValues := make(map[string]string, len(values))
86+
87+
for _, kv := range values {
88+
kv := strings.SplitN(strings.TrimSpace(kv), "=", 2)
89+
90+
k := kv[0]
91+
if len(kv) == 1 {
92+
if k != "" {
93+
// process "key=" or "key"
94+
keyValues[k] = ""
95+
}
96+
continue
97+
}
98+
99+
// process "key=val" or "key=val=foo"
100+
keyValues[k] = kv[1]
101+
}
102+
103+
return keyValues
104+
}
105+
80106
// RoundUpSize calculates how many allocation units are needed to accommodate
81107
// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS
82108
// allocates volumes in gibibyte-sized chunks,

0 commit comments

Comments
 (0)