Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions config/crd/bases/skupper_multikeylistener_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,33 @@ spec:
required:
- routingKeys
type: object
weighted:
description: |-
WeightedStrategySpec defines a mapping of routing keys to weights.

The listener distributes traffic among reachable routing keys according to
their weights. Routing keys with higher weights receive a larger portion of
the traffic. If all keys are assigned the same weight, traffic is
split equally between them.
properties:
routingKeys:
additionalProperties:
type: integer
description: routingKeys to route traffic to according to
their weight values
maxProperties: 256
minProperties: 1
type: object
x-kubernetes-map-type: granular
required:
- routingKeys
type: object
type: object
x-kubernetes-validations:
- message: exactly one of the fields in [priority] must be
- message: exactly one of the fields in [priority weighted] must be
set
rule: '[has(self.priority)].filter(x,x==true).size() ==
1'
rule: '[has(self.priority),has(self.weighted)].filter(x,x==true).size()
== 1'
tlsCredentials:
description: tlsCredentials for client-to-listener
type: string
Expand Down Expand Up @@ -214,12 +235,27 @@ spec:
required:
- routingKeysReachable
type: object
weighted:
description: weighted status
properties:
routingKeysReachable:
additionalProperties:
type: integer
description: |-
routingKeysReachable is a mapping of routingKeys to weights with at
least one reachable connector. The value of each routingKey is the
weight in the map.
type: object
x-kubernetes-map-type: granular
required:
- routingKeysReachable
type: object
type: object
x-kubernetes-validations:
- message: exactly one of the fields in [priority] must be
- message: exactly one of the fields in [priority weighted] must be
set
rule: '[has(self.priority)].filter(x,x==true).size() ==
1'
rule: '[has(self.priority),has(self.weighted)].filter(x,x==true).size()
== 1'
type: object
required:
- spec
Expand Down
18 changes: 16 additions & 2 deletions config/samples/skupper_v2alpha1_multikeylistener.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
apiVersion: skupper.io/v2alpha1
kind: MultiKeyListener
metadata:
name: backend
name: backend-priority
spec:
host: backend
host: backend-priority
port: 8080
strategy:
priority:
routingKeys:
- backend-primary
- backend-secondary

---
apiVersion: skupper.io/v2alpha1
kind: MultiKeyListener
metadata:
name: backend-weighted
spec:
host: backend-weighted
port: 8081
strategy:
weighted:
routingKeys:
backend-single: 1
backend-double: 2
18 changes: 18 additions & 0 deletions internal/nonkube/common/fs_config_renderer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,24 @@ func fakeSiteState() *api.SiteState {
},
},
},
"mkl-two": {
TypeMeta: metav1.TypeMeta{
Kind: "MultiKeyListener",
APIVersion: "skupper.io/v2alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "mkl-two",
},
Spec: v2alpha1.MultiKeyListenerSpec{
Host: "10.0.0.3",
Port: 5679,
Strategy: v2alpha1.MultiKeyListenerStrategy{
Weighted: &v2alpha1.WeightedStrategySpec{
RoutingKeys: map[string]uint{"key-primary": 1, "key-secondary": 2},
},
},
},
},
},
ConfigMaps: make(map[string]*corev1.ConfigMap),
}
Expand Down
33 changes: 25 additions & 8 deletions internal/nonkube/common/site_state_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,32 @@ func (s *SiteStateValidator) validateMultiKeyListeners(multiKeyListeners map[str
return fmt.Errorf("port %d is already mapped for host %q (multikeylistener: %q)", mkl.Spec.Port, mkl.Spec.Host, name)
}
hostPorts[mkl.Spec.Host] = append(hostPorts[mkl.Spec.Host], mkl.Spec.Port)
if mkl.Spec.Strategy.Priority == nil {
return fmt.Errorf("invalid multikeylistener: %s - strategy.priority is required", mkl.Name)
}
if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 {
return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name)
if mkl.Spec.Strategy.Priority == nil && mkl.Spec.Strategy.Weighted == nil {
return fmt.Errorf("invalid multikeylistener: %s - either strategy.priority or strategy.weighted is required", mkl.Name)
} else if mkl.Spec.Strategy.Priority != nil && mkl.Spec.Strategy.Weighted != nil {
return fmt.Errorf("invalid multikeylistener: %s - only one of strategy.priority or strategy.weighted must be defined", mkl.Name)
}
if mkl.Spec.Strategy.Priority != nil {
if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 {
return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name)
}
for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys {
if key == "" {
return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name)
}
}
}
for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys {
if key == "" {
return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name)
if mkl.Spec.Strategy.Weighted != nil {
if len(mkl.Spec.Strategy.Weighted.RoutingKeys) == 0 {
return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name)
}
for key, weight := range mkl.Spec.Strategy.Weighted.RoutingKeys {
if key == "" {
return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name)
}
if weight <= 0 {
return fmt.Errorf("invalid multikeylistener: %s - weight value must be positive", mkl.Name)
}
}
}
}
Expand Down
35 changes: 33 additions & 2 deletions internal/nonkube/common/site_state_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,52 @@ func TestSiteStateValidator_Validate(t *testing.T) {
siteState: customize(func(siteState *api.SiteState) {
for _, mkl := range siteState.MultiKeyListeners {
mkl.Spec.Strategy.Priority = nil
mkl.Spec.Strategy.Weighted = nil
}
}),
valid: false,
errorContains: "strategy.priority is required",
errorContains: "either strategy.priority or strategy.weighted is required",
},
{
info: "invalid-multikeylistener-too-many-strategies",
siteState: customize(func(siteState *api.SiteState) {
for _, mkl := range siteState.MultiKeyListeners {
mkl.Spec.Strategy.Priority = &v2alpha1.PriorityStrategySpec{}
mkl.Spec.Strategy.Weighted = &v2alpha1.WeightedStrategySpec{}
}
}),
valid: false,
errorContains: "only one of strategy.priority or strategy.weighted must be defined",
},
{
info: "invalid-multikeylistener-empty-routing-keys",
siteState: customize(func(siteState *api.SiteState) {
for _, mkl := range siteState.MultiKeyListeners {
mkl.Spec.Strategy.Priority.RoutingKeys = []string{}
if mkl.Spec.Strategy.Priority != nil {
mkl.Spec.Strategy.Priority.RoutingKeys = []string{}
}
if mkl.Spec.Strategy.Weighted != nil {
mkl.Spec.Strategy.Weighted.RoutingKeys = map[string]uint{}
}
}
}),
valid: false,
errorContains: "routingKeys must not be empty",
},
{
info: "invalid-multikeylistener-zero-weight",
siteState: customize(func(siteState *api.SiteState) {
for _, mkl := range siteState.MultiKeyListeners {
if mkl.Spec.Strategy.Weighted != nil {
for key := range mkl.Spec.Strategy.Weighted.RoutingKeys {
mkl.Spec.Strategy.Weighted.RoutingKeys[key] = 0
}
}
}
}),
valid: false,
errorContains: "weight value must be positive",
},
{
info: "invalid-multikeylistener-port-conflict-with-listener",
siteState: customize(func(siteState *api.SiteState) {
Expand Down
13 changes: 13 additions & 0 deletions internal/site/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ func (b *Bindings) updateMultiKeyListener(mkl *skupperv2alpha1.MultiKeyListener)
if ok && reflect.DeepEqual(existing.Spec, mkl.Spec) {
return nil
}

if mkl.Spec.Strategy.Weighted != nil && mkl.Status.Strategy != nil && mkl.Status.Strategy.Weighted != nil {
// update weight values in status if they changed in the spec
for k, w := range mkl.Status.Strategy.Weighted.RoutingKeysReachable {
if ws, ok := mkl.Spec.Strategy.Weighted.RoutingKeys[k]; ok {
if w != ws {
mkl.Status.Strategy.Weighted.RoutingKeysReachable[k] = ws
}
} else {
delete(mkl.Status.Strategy.Weighted.RoutingKeysReachable, k)
}
}
}
return b
}

Expand Down
31 changes: 21 additions & 10 deletions internal/site/multikeylistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@ func UpdateBridgeConfigForMultiKeyListener(siteId string, mkl *skupperv2alpha1.M
func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *skupperv2alpha1.MultiKeyListener, host string, port int, config *qdr.BridgeConfig) {
name := mkl.Name
tcpListenerName := multiAddressTcpListenerName(name)

config.AddTcpListener(qdr.TcpEndpoint{
Name: tcpListenerName,
SiteId: siteId,
Host: host,
Port: strconv.Itoa(port),
SslProfile: mkl.Spec.TlsCredentials,
MultiAddressStrategy: "priority",
AuthenticatePeer: mkl.Spec.RequireClientCert,
})
tcpListenerConfig := qdr.TcpEndpoint{
Name: tcpListenerName,
SiteId: siteId,
Host: host,
Port: strconv.Itoa(port),
SslProfile: mkl.Spec.TlsCredentials,
AuthenticatePeer: mkl.Spec.RequireClientCert,
}

// Create listenerAddress entities for each routing key in the strategy
if mkl.Spec.Strategy.Priority != nil {
tcpListenerConfig.MultiAddressStrategy = "priority"
numKeys := len(mkl.Spec.Strategy.Priority.RoutingKeys)
for i, routingKey := range mkl.Spec.Strategy.Priority.RoutingKeys {
laName := listenerAddressName(name, routingKey)
Expand All @@ -53,7 +52,19 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk
Listener: tcpListenerName,
})
}
} else if mkl.Spec.Strategy.Weighted != nil {
tcpListenerConfig.MultiAddressStrategy = "weighted"
for routingKey, weight := range mkl.Spec.Strategy.Weighted.RoutingKeys {
laName := listenerAddressName(name, routingKey)
config.AddListenerAddress(qdr.ListenerAddress{
Name: laName,
Address: routingKey,
Value: int(weight),
Listener: tcpListenerName,
})
}
}
config.AddTcpListener(tcpListenerConfig)
}

// RemoveBridgeConfigForMultiKeyListener removes the tcpListener and listenerAddress
Expand Down
Loading
Loading