diff --git a/config/crd/bases/skupper_multikeylistener_crd.yaml b/config/crd/bases/skupper_multikeylistener_crd.yaml index 1c565773c..09cec0bb7 100644 --- a/config/crd/bases/skupper_multikeylistener_crd.yaml +++ b/config/crd/bases/skupper_multikeylistener_crd.yaml @@ -105,12 +105,33 @@ spec: required: - routingKeys type: object + weighted: + description: |- + WeightedStrategySpec defines a mapping of routing keys to weights. + + The listener distributes traffic among reachable routing keys according to + their weights. Routing keys with higher weights receive a larger portion of + the traffic. If all keys are assigned the same weight, traffic is + split equally between them. + properties: + routingKeys: + additionalProperties: + type: integer + description: routingKeys to route traffic to according to + their weight values + maxProperties: 256 + minProperties: 1 + type: object + x-kubernetes-map-type: granular + required: + - routingKeys + type: object type: object x-kubernetes-validations: - - message: exactly one of the fields in [priority] must be + - message: exactly one of the fields in [priority weighted] must be set - rule: '[has(self.priority)].filter(x,x==true).size() == - 1' + rule: '[has(self.priority),has(self.weighted)].filter(x,x==true).size() + == 1' tlsCredentials: description: tlsCredentials for client-to-listener type: string @@ -214,12 +235,27 @@ spec: required: - routingKeysReachable type: object + weighted: + description: weighted status + properties: + routingKeysReachable: + additionalProperties: + type: integer + description: |- + routingKeysReachable is a mapping of routingKeys to weights with at + least one reachable connector. The value of each routingKey is the + weight in the map. + type: object + x-kubernetes-map-type: granular + required: + - routingKeysReachable + type: object type: object x-kubernetes-validations: - - message: exactly one of the fields in [priority] must be + - message: exactly one of the fields in [priority weighted] must be set - rule: '[has(self.priority)].filter(x,x==true).size() == - 1' + rule: '[has(self.priority),has(self.weighted)].filter(x,x==true).size() + == 1' type: object required: - spec diff --git a/config/samples/skupper_v2alpha1_multikeylistener.yaml b/config/samples/skupper_v2alpha1_multikeylistener.yaml index 9d1588ff2..022235c31 100644 --- a/config/samples/skupper_v2alpha1_multikeylistener.yaml +++ b/config/samples/skupper_v2alpha1_multikeylistener.yaml @@ -1,12 +1,26 @@ apiVersion: skupper.io/v2alpha1 kind: MultiKeyListener metadata: - name: backend + name: backend-priority spec: - host: backend + host: backend-priority port: 8080 strategy: priority: routingKeys: - backend-primary - backend-secondary + +--- +apiVersion: skupper.io/v2alpha1 +kind: MultiKeyListener +metadata: + name: backend-weighted +spec: + host: backend-weighted + port: 8081 + strategy: + weighted: + routingKeys: + backend-single: 1 + backend-double: 2 \ No newline at end of file diff --git a/internal/nonkube/common/fs_config_renderer_test.go b/internal/nonkube/common/fs_config_renderer_test.go index a5da25021..b7c5d44e8 100644 --- a/internal/nonkube/common/fs_config_renderer_test.go +++ b/internal/nonkube/common/fs_config_renderer_test.go @@ -314,6 +314,24 @@ func fakeSiteState() *api.SiteState { }, }, }, + "mkl-two": { + TypeMeta: metav1.TypeMeta{ + Kind: "MultiKeyListener", + APIVersion: "skupper.io/v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "mkl-two", + }, + Spec: v2alpha1.MultiKeyListenerSpec{ + Host: "10.0.0.3", + Port: 5679, + Strategy: v2alpha1.MultiKeyListenerStrategy{ + Weighted: &v2alpha1.WeightedStrategySpec{ + RoutingKeys: map[string]uint{"key-primary": 1, "key-secondary": 2}, + }, + }, + }, + }, }, ConfigMaps: make(map[string]*corev1.ConfigMap), } diff --git a/internal/nonkube/common/site_state_validator.go b/internal/nonkube/common/site_state_validator.go index c609c0361..8c9df8d0d 100644 --- a/internal/nonkube/common/site_state_validator.go +++ b/internal/nonkube/common/site_state_validator.go @@ -198,15 +198,32 @@ func (s *SiteStateValidator) validateMultiKeyListeners(multiKeyListeners map[str return fmt.Errorf("port %d is already mapped for host %q (multikeylistener: %q)", mkl.Spec.Port, mkl.Spec.Host, name) } hostPorts[mkl.Spec.Host] = append(hostPorts[mkl.Spec.Host], mkl.Spec.Port) - if mkl.Spec.Strategy.Priority == nil { - return fmt.Errorf("invalid multikeylistener: %s - strategy.priority is required", mkl.Name) - } - if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 { - return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name) + if mkl.Spec.Strategy.Priority == nil && mkl.Spec.Strategy.Weighted == nil { + return fmt.Errorf("invalid multikeylistener: %s - either strategy.priority or strategy.weighted is required", mkl.Name) + } else if mkl.Spec.Strategy.Priority != nil && mkl.Spec.Strategy.Weighted != nil { + return fmt.Errorf("invalid multikeylistener: %s - only one of strategy.priority or strategy.weighted must be defined", mkl.Name) + } + if mkl.Spec.Strategy.Priority != nil { + if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 { + return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name) + } + for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys { + if key == "" { + return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name) + } + } } - for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys { - if key == "" { - return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name) + if mkl.Spec.Strategy.Weighted != nil { + if len(mkl.Spec.Strategy.Weighted.RoutingKeys) == 0 { + return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name) + } + for key, weight := range mkl.Spec.Strategy.Weighted.RoutingKeys { + if key == "" { + return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name) + } + if weight <= 0 { + return fmt.Errorf("invalid multikeylistener: %s - weight value must be positive", mkl.Name) + } } } } diff --git a/internal/nonkube/common/site_state_validator_test.go b/internal/nonkube/common/site_state_validator_test.go index 3be0f9be1..c26ccdc8d 100644 --- a/internal/nonkube/common/site_state_validator_test.go +++ b/internal/nonkube/common/site_state_validator_test.go @@ -268,21 +268,52 @@ func TestSiteStateValidator_Validate(t *testing.T) { siteState: customize(func(siteState *api.SiteState) { for _, mkl := range siteState.MultiKeyListeners { mkl.Spec.Strategy.Priority = nil + mkl.Spec.Strategy.Weighted = nil } }), valid: false, - errorContains: "strategy.priority is required", + errorContains: "either strategy.priority or strategy.weighted is required", + }, + { + info: "invalid-multikeylistener-too-many-strategies", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Spec.Strategy.Priority = &v2alpha1.PriorityStrategySpec{} + mkl.Spec.Strategy.Weighted = &v2alpha1.WeightedStrategySpec{} + } + }), + valid: false, + errorContains: "only one of strategy.priority or strategy.weighted must be defined", }, { info: "invalid-multikeylistener-empty-routing-keys", siteState: customize(func(siteState *api.SiteState) { for _, mkl := range siteState.MultiKeyListeners { - mkl.Spec.Strategy.Priority.RoutingKeys = []string{} + if mkl.Spec.Strategy.Priority != nil { + mkl.Spec.Strategy.Priority.RoutingKeys = []string{} + } + if mkl.Spec.Strategy.Weighted != nil { + mkl.Spec.Strategy.Weighted.RoutingKeys = map[string]uint{} + } } }), valid: false, errorContains: "routingKeys must not be empty", }, + { + info: "invalid-multikeylistener-zero-weight", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + if mkl.Spec.Strategy.Weighted != nil { + for key := range mkl.Spec.Strategy.Weighted.RoutingKeys { + mkl.Spec.Strategy.Weighted.RoutingKeys[key] = 0 + } + } + } + }), + valid: false, + errorContains: "weight value must be positive", + }, { info: "invalid-multikeylistener-port-conflict-with-listener", siteState: customize(func(siteState *api.SiteState) { diff --git a/internal/site/bindings.go b/internal/site/bindings.go index d23d51c52..72da8de09 100644 --- a/internal/site/bindings.go +++ b/internal/site/bindings.go @@ -196,6 +196,19 @@ func (b *Bindings) updateMultiKeyListener(mkl *skupperv2alpha1.MultiKeyListener) if ok && reflect.DeepEqual(existing.Spec, mkl.Spec) { return nil } + + if mkl.Spec.Strategy.Weighted != nil && mkl.Status.Strategy != nil && mkl.Status.Strategy.Weighted != nil { + // update weight values in status if they changed in the spec + for k, w := range mkl.Status.Strategy.Weighted.RoutingKeysReachable { + if ws, ok := mkl.Spec.Strategy.Weighted.RoutingKeys[k]; ok { + if w != ws { + mkl.Status.Strategy.Weighted.RoutingKeysReachable[k] = ws + } + } else { + delete(mkl.Status.Strategy.Weighted.RoutingKeysReachable, k) + } + } + } return b } diff --git a/internal/site/multikeylistener.go b/internal/site/multikeylistener.go index f212c8ea7..bfcec48df 100644 --- a/internal/site/multikeylistener.go +++ b/internal/site/multikeylistener.go @@ -30,19 +30,18 @@ func UpdateBridgeConfigForMultiKeyListener(siteId string, mkl *skupperv2alpha1.M func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *skupperv2alpha1.MultiKeyListener, host string, port int, config *qdr.BridgeConfig) { name := mkl.Name tcpListenerName := multiAddressTcpListenerName(name) - - config.AddTcpListener(qdr.TcpEndpoint{ - Name: tcpListenerName, - SiteId: siteId, - Host: host, - Port: strconv.Itoa(port), - SslProfile: mkl.Spec.TlsCredentials, - MultiAddressStrategy: "priority", - AuthenticatePeer: mkl.Spec.RequireClientCert, - }) + tcpListenerConfig := qdr.TcpEndpoint{ + Name: tcpListenerName, + SiteId: siteId, + Host: host, + Port: strconv.Itoa(port), + SslProfile: mkl.Spec.TlsCredentials, + AuthenticatePeer: mkl.Spec.RequireClientCert, + } // Create listenerAddress entities for each routing key in the strategy if mkl.Spec.Strategy.Priority != nil { + tcpListenerConfig.MultiAddressStrategy = "priority" numKeys := len(mkl.Spec.Strategy.Priority.RoutingKeys) for i, routingKey := range mkl.Spec.Strategy.Priority.RoutingKeys { laName := listenerAddressName(name, routingKey) @@ -53,7 +52,19 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk Listener: tcpListenerName, }) } + } else if mkl.Spec.Strategy.Weighted != nil { + tcpListenerConfig.MultiAddressStrategy = "weighted" + for routingKey, weight := range mkl.Spec.Strategy.Weighted.RoutingKeys { + laName := listenerAddressName(name, routingKey) + config.AddListenerAddress(qdr.ListenerAddress{ + Name: laName, + Address: routingKey, + Value: int(weight), + Listener: tcpListenerName, + }) + } } + config.AddTcpListener(tcpListenerConfig) } // RemoveBridgeConfigForMultiKeyListener removes the tcpListener and listenerAddress diff --git a/pkg/apis/skupper/v2alpha1/multikeylistener_types.go b/pkg/apis/skupper/v2alpha1/multikeylistener_types.go index ba8c1582d..be67069bc 100644 --- a/pkg/apis/skupper/v2alpha1/multikeylistener_types.go +++ b/pkg/apis/skupper/v2alpha1/multikeylistener_types.go @@ -60,10 +60,12 @@ type MultiKeyListenerStatus struct { Strategy *StrategyStatus `json:"strategy,omitempty"` } -// +kubebuilder:validation:ExactlyOneOf=priority +// +kubebuilder:validation:ExactlyOneOf=priority;weighted type StrategyStatus struct { // priority status Priority *PriorityStrategyStatus `json:"priority,omitempty"` + // weighted status + Weighted *WeightedStrategyStatus `json:"weighted,omitempty"` } type PriorityStrategyStatus struct { @@ -72,6 +74,15 @@ type PriorityStrategyStatus struct { RoutingKeysReachable []string `json:"routingKeysReachable"` } +type WeightedStrategyStatus struct { + // routingKeysReachable is a mapping of routingKeys to weights with at + // least one reachable connector. The value of each routingKey is the + // weight in the map. + // + // +mapType=granular + RoutingKeysReachable map[string]uint `json:"routingKeysReachable"` +} + type MultiKeyListenerSpec struct { // host is the hostname or IP address of the local listener. Clients at // this site use the listener host and port to establish connections to the @@ -100,11 +111,10 @@ type MultiKeyListenerSpec struct { // MultiKeyListenerStrategy contains configuration for each strategy. Only one // strategy can be specified at a time. // -// Presently Priority Failover is the only strategy available. -// -// +kubebuilder:validation:ExactlyOneOf=priority +// +kubebuilder:validation:ExactlyOneOf=priority;weighted type MultiKeyListenerStrategy struct { Priority *PriorityStrategySpec `json:"priority,omitempty"` + Weighted *WeightedStrategySpec `json:"weighted,omitempty"` } // PriorityStrategySpec specifies an ordered set of routing keys to @@ -121,6 +131,21 @@ type PriorityStrategySpec struct { RoutingKeys []string `json:"routingKeys"` } +// WeightedStrategySpec defines a mapping of routing keys to weights. +// +// The listener distributes traffic among reachable routing keys according to +// their weights. Routing keys with higher weights receive a larger portion of +// the traffic. If all keys are assigned the same weight, traffic is +// split equally between them. +type WeightedStrategySpec struct { + // +kubebuilder:validation:MinProperties=1 + // +kubebuilder:validation:MaxProperties=256 + // +mapType=granular + // + // routingKeys to route traffic to according to their weight values + RoutingKeys map[string]uint `json:"routingKeys"` +} + func (s *MultiKeyListenerStatus) SetCondition(conditionType string, state ConditionState, generation int64) bool { condition := metav1.Condition{ Type: conditionType, @@ -198,15 +223,43 @@ func (m *MultiKeyListener) SetRoutingKeysReachable(keys []string) bool { if m.Status.Strategy == nil { m.Status.Strategy = &StrategyStatus{} } - if m.Status.Strategy.Priority == nil { - m.Status.Strategy.Priority = &PriorityStrategyStatus{} - } + if keys == nil { keys = []string{} } - if !reflect.DeepEqual(m.Status.Strategy.Priority.RoutingKeysReachable, keys) { - m.Status.Strategy.Priority.RoutingKeysReachable = keys - return true + + if m.Spec.Strategy.Priority != nil { + if m.Status.Strategy.Priority == nil { + m.Status.Strategy.Priority = &PriorityStrategyStatus{} + } + + if !reflect.DeepEqual(m.Status.Strategy.Priority.RoutingKeysReachable, keys) { + m.Status.Strategy.Priority.RoutingKeysReachable = keys + return true + } + } else if m.Spec.Strategy.Weighted != nil { + if m.Status.Strategy.Weighted == nil { + m.Status.Strategy.Weighted = &WeightedStrategyStatus{} + } + var changed bool = false + if len(keys) != len(m.Status.Strategy.Weighted.RoutingKeysReachable) { + changed = true + } else { + for _, k := range keys { + if _, ok := m.Status.Strategy.Weighted.RoutingKeysReachable[k]; !ok { + changed = true + break + } + } + } + + if changed { + m.Status.Strategy.Weighted.RoutingKeysReachable = make(map[string]uint, len(keys)) + for _, k := range keys { + m.Status.Strategy.Weighted.RoutingKeysReachable[k] = m.Spec.Strategy.Weighted.RoutingKeys[k] + } + return true + } } return false } @@ -214,6 +267,12 @@ func (m *MultiKeyListener) SetRoutingKeysReachable(keys []string) bool { func (m *MultiKeyListener) GetRoutingKeys() []string { if m.Spec.Strategy.Priority != nil { return m.Spec.Strategy.Priority.RoutingKeys + } else if m.Spec.Strategy.Weighted != nil { + routingKeys := []string{} + for k := range m.Spec.Strategy.Weighted.RoutingKeys { + routingKeys = append(routingKeys, k) + } + return routingKeys } return nil } diff --git a/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go index 293b3d0c0..4deb897ce 100644 --- a/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go @@ -1039,6 +1039,11 @@ func (in *MultiKeyListenerStrategy) DeepCopyInto(out *MultiKeyListenerStrategy) *out = new(PriorityStrategySpec) (*in).DeepCopyInto(*out) } + if in.Weighted != nil { + in, out := &in.Weighted, &out.Weighted + *out = new(WeightedStrategySpec) + (*in).DeepCopyInto(*out) + } return } @@ -1595,6 +1600,11 @@ func (in *StrategyStatus) DeepCopyInto(out *StrategyStatus) { *out = new(PriorityStrategyStatus) (*in).DeepCopyInto(*out) } + if in.Weighted != nil { + in, out := &in.Weighted, &out.Weighted + *out = new(WeightedStrategyStatus) + (*in).DeepCopyInto(*out) + } return } @@ -1607,3 +1617,49 @@ func (in *StrategyStatus) DeepCopy() *StrategyStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WeightedStrategySpec) DeepCopyInto(out *WeightedStrategySpec) { + *out = *in + if in.RoutingKeys != nil { + in, out := &in.RoutingKeys, &out.RoutingKeys + *out = make(map[string]uint, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WeightedStrategySpec. +func (in *WeightedStrategySpec) DeepCopy() *WeightedStrategySpec { + if in == nil { + return nil + } + out := new(WeightedStrategySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WeightedStrategyStatus) DeepCopyInto(out *WeightedStrategyStatus) { + *out = *in + if in.RoutingKeysReachable != nil { + in, out := &in.RoutingKeysReachable, &out.RoutingKeysReachable + *out = make(map[string]uint, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WeightedStrategyStatus. +func (in *WeightedStrategyStatus) DeepCopy() *WeightedStrategyStatus { + if in == nil { + return nil + } + out := new(WeightedStrategyStatus) + in.DeepCopyInto(out) + return out +}