Skip to content

Commit e0d2c99

Browse files
committed
Add weighted address strategy for multi-key listeners (#2394)
Signed-off-by: Gabor Dozsa <gabor.dozsa@ibm.com>
1 parent 89b9d15 commit e0d2c99

File tree

9 files changed

+283
-37
lines changed

9 files changed

+283
-37
lines changed

config/crd/bases/skupper_multikeylistener_crd.yaml

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,36 @@ spec:
105105
required:
106106
- routingKeys
107107
type: object
108+
weighted:
109+
description: |-
110+
WeightedStrategySpec specifies a map of routing keys to route traffic to.
111+
Each routingKey has a weight value.
112+
113+
With this strategy traffic is distributed randomly among the reachable
114+
routing keys. The larger the weight of a routing key - relative to the
115+
weights of the other routing keys - the higher the likelihood of
116+
receiving more traffic. E.g. if all routing keys have equal weights then the
117+
traffic is distributed in a random uniform fashion among the reachable
118+
routing keys.
119+
properties:
120+
routingKeys:
121+
additionalProperties:
122+
type: integer
123+
description: routingKeys to route traffic to according to
124+
their weight values
125+
maxProperties: 256
126+
minProperties: 1
127+
type: object
128+
x-kubernetes-map-type: granular
129+
required:
130+
- routingKeys
131+
type: object
108132
type: object
109133
x-kubernetes-validations:
110-
- message: exactly one of the fields in [priority] must be
134+
- message: exactly one of the fields in [priority weighted] must be
111135
set
112-
rule: '[has(self.priority)].filter(x,x==true).size() ==
113-
1'
136+
rule: '[has(self.priority),has(self.weighted)].filter(x,x==true).size()
137+
== 1'
114138
tlsCredentials:
115139
description: tlsCredentials for client-to-listener
116140
type: string
@@ -214,12 +238,27 @@ spec:
214238
required:
215239
- routingKeysReachable
216240
type: object
241+
weighted:
242+
description: weighted status
243+
properties:
244+
routingKeysReachable:
245+
additionalProperties:
246+
type: integer
247+
description: |-
248+
routingKeysReachable is a map of routingKeys with at least one
249+
reachable connector. The value of each routingKey is the weight in
250+
the map.
251+
type: object
252+
x-kubernetes-map-type: granular
253+
required:
254+
- routingKeysReachable
255+
type: object
217256
type: object
218257
x-kubernetes-validations:
219-
- message: exactly one of the fields in [priority] must be
258+
- message: exactly one of the fields in [priority weighted] must be
220259
set
221-
rule: '[has(self.priority)].filter(x,x==true).size() ==
222-
1'
260+
rule: '[has(self.priority),has(self.weighted)].filter(x,x==true).size()
261+
== 1'
223262
type: object
224263
required:
225264
- spec
Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,26 @@
11
apiVersion: skupper.io/v2alpha1
22
kind: MultiKeyListener
33
metadata:
4-
name: backend
4+
name: backend-priority
55
spec:
6-
host: backend
6+
host: backend-priority
77
port: 8080
88
strategy:
99
priority:
1010
routingKeys:
1111
- backend-primary
1212
- backend-secondary
13+
14+
---
15+
apiVersion: skupper.io/v2alpha1
16+
kind: MultiKeyListener
17+
metadata:
18+
name: backend-weighted
19+
spec:
20+
host: backend-weighted
21+
port: 8081
22+
strategy:
23+
weighted:
24+
routingKeys:
25+
backend-single: 1
26+
backend-double: 2

internal/nonkube/common/fs_config_renderer_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,24 @@ func fakeSiteState() *api.SiteState {
314314
},
315315
},
316316
},
317+
"mkl-two": {
318+
TypeMeta: metav1.TypeMeta{
319+
Kind: "MultiKeyListener",
320+
APIVersion: "skupper.io/v2alpha1",
321+
},
322+
ObjectMeta: metav1.ObjectMeta{
323+
Name: "mkl-two",
324+
},
325+
Spec: v2alpha1.MultiKeyListenerSpec{
326+
Host: "10.0.0.3",
327+
Port: 5679,
328+
Strategy: v2alpha1.MultiKeyListenerStrategy{
329+
Weighted: &v2alpha1.WeightedStrategySpec{
330+
RoutingKeys: map[string]uint{"key-primary": 1, "key-secondary": 2},
331+
},
332+
},
333+
},
334+
},
317335
},
318336
ConfigMaps: make(map[string]*corev1.ConfigMap),
319337
}

internal/nonkube/common/site_state_validator.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,30 @@ func (s *SiteStateValidator) validateMultiKeyListeners(multiKeyListeners map[str
198198
return fmt.Errorf("port %d is already mapped for host %q (multikeylistener: %q)", mkl.Spec.Port, mkl.Spec.Host, name)
199199
}
200200
hostPorts[mkl.Spec.Host] = append(hostPorts[mkl.Spec.Host], mkl.Spec.Port)
201-
if mkl.Spec.Strategy.Priority == nil {
202-
return fmt.Errorf("invalid multikeylistener: %s - strategy.priority is required", mkl.Name)
201+
if mkl.Spec.Strategy.Priority == nil && mkl.Spec.Strategy.Weighted == nil {
202+
return fmt.Errorf("invalid multikeylistener: %s - strategy.priority or strategy.weighted is required", mkl.Name)
203203
}
204-
if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 {
205-
return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name)
204+
if mkl.Spec.Strategy.Priority != nil {
205+
if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 {
206+
return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name)
207+
}
208+
for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys {
209+
if key == "" {
210+
return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name)
211+
}
212+
}
206213
}
207-
for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys {
208-
if key == "" {
209-
return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name)
214+
if mkl.Spec.Strategy.Weighted != nil {
215+
if len(mkl.Spec.Strategy.Weighted.RoutingKeys) == 0 {
216+
return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name)
217+
}
218+
for key, weight := range mkl.Spec.Strategy.Weighted.RoutingKeys {
219+
if key == "" {
220+
return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name)
221+
}
222+
if weight <= 0 {
223+
return fmt.Errorf("invalid multikeylistener: %s - weight value must not be positive", mkl.Name)
224+
}
210225
}
211226
}
212227
}

internal/nonkube/common/site_state_validator_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,21 +268,41 @@ func TestSiteStateValidator_Validate(t *testing.T) {
268268
siteState: customize(func(siteState *api.SiteState) {
269269
for _, mkl := range siteState.MultiKeyListeners {
270270
mkl.Spec.Strategy.Priority = nil
271+
mkl.Spec.Strategy.Weighted = nil
271272
}
272273
}),
273274
valid: false,
274-
errorContains: "strategy.priority is required",
275+
errorContains: "strategy.priority or strategy.weighted is required",
275276
},
276277
{
277278
info: "invalid-multikeylistener-empty-routing-keys",
278279
siteState: customize(func(siteState *api.SiteState) {
279280
for _, mkl := range siteState.MultiKeyListeners {
280-
mkl.Spec.Strategy.Priority.RoutingKeys = []string{}
281+
if mkl.Spec.Strategy.Priority != nil {
282+
mkl.Spec.Strategy.Priority.RoutingKeys = []string{}
283+
}
284+
if mkl.Spec.Strategy.Weighted != nil {
285+
mkl.Spec.Strategy.Weighted.RoutingKeys = map[string]uint{}
286+
}
281287
}
282288
}),
283289
valid: false,
284290
errorContains: "routingKeys must not be empty",
285291
},
292+
{
293+
info: "invalid-multikeylistener-zero-weight",
294+
siteState: customize(func(siteState *api.SiteState) {
295+
for _, mkl := range siteState.MultiKeyListeners {
296+
if mkl.Spec.Strategy.Weighted != nil {
297+
for key := range mkl.Spec.Strategy.Weighted.RoutingKeys {
298+
mkl.Spec.Strategy.Weighted.RoutingKeys[key] = 0
299+
}
300+
}
301+
}
302+
}),
303+
valid: false,
304+
errorContains: "weight value must not be positive",
305+
},
286306
{
287307
info: "invalid-multikeylistener-port-conflict-with-listener",
288308
siteState: customize(func(siteState *api.SiteState) {

internal/site/bindings.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,17 @@ func (b *Bindings) updateMultiKeyListener(mkl *skupperv2alpha1.MultiKeyListener)
196196
if ok && reflect.DeepEqual(existing.Spec, mkl.Spec) {
197197
return nil
198198
}
199+
200+
if mkl.Spec.Strategy.Weighted != nil && mkl.Status.Strategy != nil {
201+
// update weight values in status if they changed in the spec
202+
for k, w := range mkl.Status.Strategy.Weighted.RoutingKeysReachable {
203+
if ws, ok := mkl.Spec.Strategy.Weighted.RoutingKeys[k]; ok {
204+
if w != ws {
205+
mkl.Status.Strategy.Weighted.RoutingKeysReachable[k] = ws
206+
}
207+
}
208+
}
209+
}
199210
return b
200211
}
201212

internal/site/multikeylistener.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,18 @@ func UpdateBridgeConfigForMultiKeyListener(siteId string, mkl *skupperv2alpha1.M
3030
func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *skupperv2alpha1.MultiKeyListener, host string, port int, config *qdr.BridgeConfig) {
3131
name := mkl.Name
3232
tcpListenerName := multiAddressTcpListenerName(name)
33-
34-
config.AddTcpListener(qdr.TcpEndpoint{
35-
Name: tcpListenerName,
36-
SiteId: siteId,
37-
Host: host,
38-
Port: strconv.Itoa(port),
39-
SslProfile: mkl.Spec.TlsCredentials,
40-
MultiAddressStrategy: "priority",
41-
AuthenticatePeer: mkl.Spec.RequireClientCert,
42-
})
33+
tcpListenerConfig := qdr.TcpEndpoint{
34+
Name: tcpListenerName,
35+
SiteId: siteId,
36+
Host: host,
37+
Port: strconv.Itoa(port),
38+
SslProfile: mkl.Spec.TlsCredentials,
39+
AuthenticatePeer: mkl.Spec.RequireClientCert,
40+
}
4341

4442
// Create listenerAddress entities for each routing key in the strategy
4543
if mkl.Spec.Strategy.Priority != nil {
44+
tcpListenerConfig.MultiAddressStrategy = "priority"
4645
numKeys := len(mkl.Spec.Strategy.Priority.RoutingKeys)
4746
for i, routingKey := range mkl.Spec.Strategy.Priority.RoutingKeys {
4847
laName := listenerAddressName(name, routingKey)
@@ -53,7 +52,19 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk
5352
Listener: tcpListenerName,
5453
})
5554
}
55+
} else if mkl.Spec.Strategy.Weighted != nil {
56+
tcpListenerConfig.MultiAddressStrategy = "weighted"
57+
for routingKey, weight := range mkl.Spec.Strategy.Weighted.RoutingKeys {
58+
laName := listenerAddressName(name, routingKey)
59+
config.AddListenerAddress(qdr.ListenerAddress{
60+
Name: laName,
61+
Address: routingKey,
62+
Value: int(weight),
63+
Listener: tcpListenerName,
64+
})
65+
}
5666
}
67+
config.AddTcpListener(tcpListenerConfig)
5768
}
5869

5970
// RemoveBridgeConfigForMultiKeyListener removes the tcpListener and listenerAddress

0 commit comments

Comments
 (0)