diff --git a/CHANGELOG.md b/CHANGELOG.md index 413b68d3fb2..13b6de275cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ * **This version introduces a new API version to our CRDs.** **Before upgrading to Strimzi 0.49 or newer, make sure that you update your `KafkaUser` resources to use the `.spec.authorization.acls[]operations` field instead of the deprecated `.spec.authorization.acls[]operation`.** **Especially when using Helm, make sure that the CRDs are updated when you upgrade the operator.** +* **When rack-awareness is enabled in the `Kafka` custom resource (`.spec.kafka.rack`), Strimzi will not automatically add the best-effort-affinity rules for spreading the Kafka Pods between the zones.** + **Please make sure to set your own `topologySpreadConstraint` or `affinity` rules instead.** * The `.status.kafkaMetadataState` field in the `Kafka` custom resource is deprecated and not used anymore. * The `type: oauth` authentication in Kafka brokers and Kafka clients (Kafka Connect, MirrorMaker 2, and Strimzi HTTP Bridge) has been deprecated. Please use the `type: custom` authentication instead. diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java index fe173cbca0b..99c93e51530 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java @@ -5,8 +5,6 @@ package io.strimzi.operator.cluster.model; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.fabric8.kubernetes.api.model.Affinity; -import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerPort; @@ -370,7 +368,7 @@ public Deployment generateDeployment(Map annotations, boolean is templatePod, DEFAULT_POD_LABELS, annotations, - getMergedAffinity(), + ModelUtils.affinityWithRackLabelSelector(templatePod, rack), ContainerUtils.listOrNull(createInitContainer(imagePullPolicy)), List.of(createContainer(imagePullPolicy)), getVolumes(isOpenShift), @@ -510,18 +508,6 @@ public KafkaBridgeHttpConfig getHttp() { return this.http; } - /** - * Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity. - */ - protected Affinity getMergedAffinity() { - Affinity userAffinity = templatePod != null && templatePod.getAffinity() != null ? templatePod.getAffinity() : new Affinity(); - AffinityBuilder builder = new AffinityBuilder(userAffinity); - if (rack != null) { - builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey()); - } - return builder.build(); - } - /** * Creates the ClusterRoleBinding which is used to bind the Kafka Bridge SA to the ClusterRole * which permissions the Kafka init container to access K8S nodes (necessary for rack-awareness). diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index f5cb549ae13..7708d58751a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -4,8 +4,6 @@ */ package io.strimzi.operator.cluster.model; -import io.fabric8.kubernetes.api.model.Affinity; -import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerPort; @@ -1207,7 +1205,7 @@ public List generatePodSets(boolean isOpenShift, DEFAULT_POD_LABELS, podAnnotationsProvider.apply(node), KafkaResources.brokersServiceName(cluster), - getMergedAffinity(pool), + ModelUtils.affinityWithRackLabelSelector(pool.templatePod, rack), ContainerUtils.listOrNull(createInitContainer(imagePullPolicy, pool)), List.of(createContainer(imagePullPolicy, pool)), getPodSetVolumes(node, pool.storage, pool.templatePod, isOpenShift), @@ -1479,39 +1477,6 @@ private List getInitContainerVolumeMounts(KafkaPool pool) { return volumeMountList; } - /** - * Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity. - * - * @param pool Node pool with custom affinity configuration - * - * @return Combined affinity - */ - private Affinity getMergedAffinity(KafkaPool pool) { - Affinity userAffinity = pool.templatePod != null && pool.templatePod.getAffinity() != null ? pool.templatePod.getAffinity() : new Affinity(); - AffinityBuilder builder = new AffinityBuilder(userAffinity); - if (rack != null) { - // If there's a rack config, we need to add a podAntiAffinity to spread the brokers among the racks - // We add the affinity even for controller only nodes as we prefer them to be spread even if they don't directly use rack awareness - builder = builder - .editOrNewPodAntiAffinity() - .addNewPreferredDuringSchedulingIgnoredDuringExecution() - .withWeight(100) - .withNewPodAffinityTerm() - .withTopologyKey(rack.getTopologyKey()) - .withNewLabelSelector() - .addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, cluster) - .addToMatchLabels(Labels.STRIMZI_NAME_LABEL, componentName) - .endLabelSelector() - .endPodAffinityTerm() - .endPreferredDuringSchedulingIgnoredDuringExecution() - .endPodAntiAffinity(); - - builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey()); - } - - return builder.build(); - } - private List getInitContainerEnvVars(KafkaPool pool) { List varList = new ArrayList<>(); varList.add(ContainerUtils.createEnvVarFromFieldRef(ENV_VAR_KAFKA_INIT_NODE_NAME, "spec.nodeName")); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java index 7ac9852f578..51afbe86f64 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java @@ -4,8 +4,6 @@ */ package io.strimzi.operator.cluster.model; -import io.fabric8.kubernetes.api.model.Affinity; -import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource; import io.fabric8.kubernetes.api.model.Container; @@ -604,18 +602,6 @@ private List getExternalConfigurationVolumeMounts() { return volumeMountList; } - /** - * Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity. - */ - protected Affinity getMergedAffinity() { - Affinity userAffinity = templatePod != null && templatePod.getAffinity() != null ? templatePod.getAffinity() : new Affinity(); - AffinityBuilder builder = new AffinityBuilder(userAffinity); - if (rack != null) { - builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey()); - } - return builder.build(); - } - /** * Generates the StrimziPodSet for the Kafka cluster. * enabled. @@ -659,7 +645,7 @@ public StrimziPodSet generatePodSet(int replicas, defaultPodLabels(), podAnnotations, componentName, - getMergedAffinity(), + ModelUtils.affinityWithRackLabelSelector(templatePod, rack), ContainerUtils.listOrNull(createInitContainer(imagePullPolicy)), List.of(createContainer(imagePullPolicy, customContainerImage)), getVolumes(isOpenShift), diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ModelUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ModelUtils.java index a53d86ed9a8..620fe2ba30f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ModelUtils.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ModelUtils.java @@ -20,6 +20,8 @@ import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; import io.strimzi.api.kafka.model.common.CertificateAuthority; +import io.strimzi.api.kafka.model.common.Rack; +import io.strimzi.api.kafka.model.common.template.PodTemplate; import io.strimzi.api.kafka.model.kafka.Storage; import io.strimzi.operator.common.ReconciliationLogger; import io.strimzi.operator.common.Util; @@ -186,54 +188,65 @@ public static List getLinesWithoutCommentsAndEmptyLines(String config) { } /** - * Adds user-configured affinity to the AffinityBuilder + * Mixes the user-configured affinity with the rack-awareness node selector affinity * - * @param builder the builder which is used to populate the node affinity - * @param userAffinity the userAffinity which is defined by the user - * @param topologyKey the topology key which is used to select the node - * @return the AffinityBuilder which has the node selector with topology key which is needed to make sure - * the pods are scheduled only on nodes with the rack label + * @param podTemplate PodTemplate with user affinity configuration + * @param rack Rack awareness configuration + * + * @return Affinity which has the node selector with the topology-key needed to make sure the pods are scheduled only + * on nodes with the rack label */ - public static AffinityBuilder populateAffinityBuilderWithRackLabelSelector(AffinityBuilder builder, Affinity userAffinity, String topologyKey) { - // We need to add node affinity to make sure the pods are scheduled only on nodes with the rack label - NodeSelectorRequirement selector = new NodeSelectorRequirementBuilder() - .withOperator("Exists") - .withKey(topologyKey) - .build(); + public static Affinity affinityWithRackLabelSelector(PodTemplate podTemplate, Rack rack) { + // Extract the user-defined affinity if set + Affinity userAffinity = podTemplate != null && podTemplate.getAffinity() != null ? podTemplate.getAffinity() : null; + + if (rack != null) { + // When rack-awareness is used, we need to add node affinity to make sure the pods are scheduled only on nodes with the rack label + NodeSelectorRequirement rackAwareSelector = new NodeSelectorRequirementBuilder() + .withOperator("Exists") + .withKey(rack.getTopologyKey()) + .build(); + + AffinityBuilder affinityBuilder = new AffinityBuilder(userAffinity); - if (userAffinity != null + if (userAffinity != null && userAffinity.getNodeAffinity() != null && userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution() != null && userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms() != null) { - // User has specified some Node Selector Terms => we should enhance them - List oldTerms = userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms(); - List enhancedTerms = new ArrayList<>(oldTerms.size()); - - for (NodeSelectorTerm term : oldTerms) { - NodeSelectorTerm enhancedTerm = new NodeSelectorTermBuilder(term) - .addToMatchExpressions(selector) - .build(); - enhancedTerms.add(enhancedTerm); + // User has specified some Node Selector Terms => we should enhance them by injecting our own selector to the matchExpressions + List oldTerms = userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms(); + List enhancedTerms = new ArrayList<>(oldTerms.size()); + + for (NodeSelectorTerm term : oldTerms) { + NodeSelectorTerm enhancedTerm = new NodeSelectorTermBuilder(term) + .addToMatchExpressions(rackAwareSelector) + .build(); + enhancedTerms.add(enhancedTerm); + } + + affinityBuilder + .editOrNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .withNodeSelectorTerms(enhancedTerms) + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity(); + } else { + // User has not specified any selector terms => we add our own + affinityBuilder + .editOrNewNodeAffinity() + .editOrNewRequiredDuringSchedulingIgnoredDuringExecution() + .addNewNodeSelectorTerm() + .withMatchExpressions(rackAwareSelector) + .endNodeSelectorTerm() + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity(); } - builder = builder - .editOrNewNodeAffinity() - .withNewRequiredDuringSchedulingIgnoredDuringExecution() - .withNodeSelectorTerms(enhancedTerms) - .endRequiredDuringSchedulingIgnoredDuringExecution() - .endNodeAffinity(); + return affinityBuilder.build(); } else { - // User has not specified any selector terms => we add our own - builder = builder - .editOrNewNodeAffinity() - .editOrNewRequiredDuringSchedulingIgnoredDuringExecution() - .addNewNodeSelectorTerm() - .withMatchExpressions(selector) - .endNodeSelectorTerm() - .endRequiredDuringSchedulingIgnoredDuringExecution() - .endNodeAffinity(); + // Rack-awareness is not used. We just keep what the user configured. + return userAffinity; } - return builder; } /** diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java index c127eb6bc0f..df656eb55cb 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java @@ -2413,19 +2413,6 @@ public void testRackAffinity() { .build()) .endRequiredDuringSchedulingIgnoredDuringExecution() .endNodeAffinity() - .withNewPodAntiAffinity() - .withPreferredDuringSchedulingIgnoredDuringExecution( - new WeightedPodAffinityTermBuilder() - .withWeight(100) - .withNewPodAffinityTerm() - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/cluster", "foo", "strimzi.io/name", "foo-kafka")) - .endLabelSelector() - .withTopologyKey("failure-domain.beta.kubernetes.io/zone") - .endPodAffinityTerm() - .build() - ) - .endPodAntiAffinity() .build(); Kafka kafka = new KafkaBuilder(KAFKA) @@ -2643,7 +2630,7 @@ public void testAffinityAndTolerationsInKafkaPool() { assertThat(pod.getSpec().getAffinity(), is(poolAffinity)); assertThat(pod.getSpec().getTolerations(), is(poolToleration)); } else { - assertThat(pod.getSpec().getAffinity(), is(new Affinity())); + assertThat(pod.getSpec().getAffinity(), is(nullValue())); assertThat(pod.getSpec().getTolerations(), is(List.of())); } } @@ -2680,15 +2667,6 @@ public void testAffinityAndRack() { .endLabelSelector() .withTopologyKey("kubernetes.io/hostname") .endPodAffinityTerm() - .build(), - new WeightedPodAffinityTermBuilder() - .withWeight(100) - .withNewPodAffinityTerm() - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/cluster", "foo", "strimzi.io/name", "foo-kafka")) - .endLabelSelector() - .withTopologyKey("failure-domain.beta.kubernetes.io/zone") - .endPodAffinityTerm() .build() ) .endPodAntiAffinity() @@ -2783,15 +2761,6 @@ public void testAffinityAndRackInKafkaAndKafkaPool() { .endLabelSelector() .withTopologyKey("kubernetes.io/hostname") .endPodAffinityTerm() - .build(), - new WeightedPodAffinityTermBuilder() - .withWeight(100) - .withNewPodAffinityTerm() - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/cluster", "foo", "strimzi.io/name", "foo-kafka")) - .endLabelSelector() - .withTopologyKey("failure-domain.beta.kubernetes.io/zone") - .endPodAffinityTerm() .build() ) .endPodAntiAffinity() @@ -2825,15 +2794,6 @@ public void testAffinityAndRackInKafkaAndKafkaPool() { .endLabelSelector() .withTopologyKey("kubernetes.io/hostname") .endPodAffinityTerm() - .build(), - new WeightedPodAffinityTermBuilder() - .withWeight(100) - .withNewPodAffinityTerm() - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/cluster", "foo", "strimzi.io/name", "foo-kafka")) - .endLabelSelector() - .withTopologyKey("failure-domain.beta.kubernetes.io/zone") - .endPodAffinityTerm() .build() ) .endPodAntiAffinity() @@ -2949,19 +2909,6 @@ public void testAffinityAndRackInKafkaPool() { .build()) .endRequiredDuringSchedulingIgnoredDuringExecution() .endNodeAffinity() - .withNewPodAntiAffinity() - .withPreferredDuringSchedulingIgnoredDuringExecution( - new WeightedPodAffinityTermBuilder() - .withWeight(100) - .withNewPodAffinityTerm() - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/cluster", "foo", "strimzi.io/name", "foo-kafka")) - .endLabelSelector() - .withTopologyKey("failure-domain.beta.kubernetes.io/zone") - .endPodAffinityTerm() - .build() - ) - .endPodAntiAffinity() .build(); Affinity mergedRackAffinity = new AffinityBuilder() @@ -2992,15 +2939,6 @@ public void testAffinityAndRackInKafkaPool() { .endLabelSelector() .withTopologyKey("kubernetes.io/hostname") .endPodAffinityTerm() - .build(), - new WeightedPodAffinityTermBuilder() - .withWeight(100) - .withNewPodAffinityTerm() - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/cluster", "foo", "strimzi.io/name", "foo-kafka")) - .endLabelSelector() - .withTopologyKey("failure-domain.beta.kubernetes.io/zone") - .endPodAffinityTerm() .build() ) .endPodAntiAffinity() @@ -4467,7 +4405,7 @@ public void testCustomizedPodSetInNodePool() { assertThat(pod.getSpec().getTerminationGracePeriodSeconds(), is(30L)); assertThat(pod.getSpec().getImagePullSecrets().size(), is(0)); assertThat(pod.getSpec().getSecurityContext().getFsGroup(), is(0L)); - assertThat(pod.getSpec().getAffinity(), is(new Affinity())); + assertThat(pod.getSpec().getAffinity(), is(nullValue())); assertThat(pod.getSpec().getTolerations(), is(List.of())); assertThat(pod.getSpec().getVolumes().size(), is(4)); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/ModelUtilsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/ModelUtilsTest.java index ec6778bbd50..4e70ab09366 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/ModelUtilsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/ModelUtilsTest.java @@ -4,11 +4,20 @@ */ package io.strimzi.operator.cluster.model; +import io.fabric8.kubernetes.api.model.Affinity; +import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.NodeSelectorRequirementBuilder; +import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodAffinityTermBuilder; import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PreferredSchedulingTermBuilder; +import io.strimzi.api.kafka.model.common.RackBuilder; +import io.strimzi.api.kafka.model.common.template.PodTemplate; +import io.strimzi.api.kafka.model.common.template.PodTemplateBuilder; import io.strimzi.api.kafka.model.kafka.EphemeralStorageBuilder; import io.strimzi.api.kafka.model.kafka.JbodStorageBuilder; import io.strimzi.api.kafka.model.kafka.Kafka; @@ -26,6 +35,7 @@ import static io.strimzi.operator.common.Util.parseMap; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; public class ModelUtilsTest { @@ -178,4 +188,102 @@ public void testServiceDnsNames() { assertThat(dnsNames.size(), is(4)); assertThat(dnsNames, hasItems("my-service", "my-service.my-namespace", "my-service.my-namespace.svc", "my-service.my-namespace.svc.cluster.local")); } + + @Test + public void testNullAffinity() { + assertThat(ModelUtils.affinityWithRackLabelSelector(null, null), is(nullValue())); + assertThat(ModelUtils.affinityWithRackLabelSelector(new PodTemplate(), null), is(nullValue())); + assertThat(ModelUtils.affinityWithRackLabelSelector(new PodTemplateBuilder().withAffinity(null).build(), null), is(nullValue())); + } + + @Test + public void testAffinityWithRack() { + Affinity expectedAffinity = new AffinityBuilder() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/my").build()).build()) + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .build(); + + assertThat(ModelUtils.affinityWithRackLabelSelector(null, new RackBuilder().withTopologyKey("topology.key/my").build()), is(expectedAffinity)); + } + + @Test + public void testCombinedPodAffinityWithRack() { + Affinity userAffinity = new AffinityBuilder() + .withNewPodAntiAffinity() + .withRequiredDuringSchedulingIgnoredDuringExecution(new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").withNewLabelSelector().withMatchLabels(Map.of("database", "true")).endLabelSelector().build()) + .endPodAntiAffinity() + .build(); + + Affinity expectedAffinity = new AffinityBuilder() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/my").build()).build()) + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .withNewPodAntiAffinity() + .withRequiredDuringSchedulingIgnoredDuringExecution(new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").withNewLabelSelector().withMatchLabels(Map.of("database", "true")).endLabelSelector().build()) + .endPodAntiAffinity() + .build(); + + assertThat(ModelUtils.affinityWithRackLabelSelector(new PodTemplateBuilder().withAffinity(userAffinity).build(), new RackBuilder().withTopologyKey("topology.key/my").build()), is(expectedAffinity)); + } + + @Test + public void testCombinedPreferredNodeAffinityWithRack() { + Affinity userAffinity = new AffinityBuilder() + .withNewPodAntiAffinity() + .withRequiredDuringSchedulingIgnoredDuringExecution(new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").withNewLabelSelector().withMatchLabels(Map.of("database", "true")).endLabelSelector().build()) + .endPodAntiAffinity() + .withNewNodeAffinity() + .withPreferredDuringSchedulingIgnoredDuringExecution(new PreferredSchedulingTermBuilder().withWeight(100).withPreference(new NodeSelectorTermBuilder().withMatchExpressions(new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/other").build()).build()).build()) + .endNodeAffinity() + .build(); + + Affinity expectedAffinity = new AffinityBuilder() + .withNewNodeAffinity() + .withPreferredDuringSchedulingIgnoredDuringExecution(new PreferredSchedulingTermBuilder().withWeight(100).withPreference(new NodeSelectorTermBuilder().withMatchExpressions(new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/other").build()).build()).build()) + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/my").build()).build()) + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .withNewPodAntiAffinity() + .withRequiredDuringSchedulingIgnoredDuringExecution(new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").withNewLabelSelector().withMatchLabels(Map.of("database", "true")).endLabelSelector().build()) + .endPodAntiAffinity() + .build(); + + assertThat(ModelUtils.affinityWithRackLabelSelector(new PodTemplateBuilder().withAffinity(userAffinity).build(), new RackBuilder().withTopologyKey("topology.key/my").build()), is(expectedAffinity)); + } + + @Test + public void testCombinedRequiredNodeAffinityWithRack() { + Affinity userAffinity = new AffinityBuilder() + .withNewPodAntiAffinity() + .withRequiredDuringSchedulingIgnoredDuringExecution(new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").withNewLabelSelector().withMatchLabels(Map.of("database", "true")).endLabelSelector().build()) + .endPodAntiAffinity() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/other").build()).build()) + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .build(); + + Affinity expectedAffinity = new AffinityBuilder() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions( + new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/other").build(), + new NodeSelectorRequirementBuilder().withOperator("Exists").withKey("topology.key/my").build() + ).build()) + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .withNewPodAntiAffinity() + .withRequiredDuringSchedulingIgnoredDuringExecution(new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").withNewLabelSelector().withMatchLabels(Map.of("database", "true")).endLabelSelector().build()) + .endPodAntiAffinity() + .build(); + + assertThat(ModelUtils.affinityWithRackLabelSelector(new PodTemplateBuilder().withAffinity(userAffinity).build(), new RackBuilder().withTopologyKey("topology.key/my").build()), is(expectedAffinity)); + } } diff --git a/documentation/api/io.strimzi.api.kafka.model.common.Rack.adoc b/documentation/api/io.strimzi.api.kafka.model.common.Rack.adoc index 012fec57edc..7da84e0bfc7 100644 --- a/documentation/api/io.strimzi.api.kafka.model.common.Rack.adoc +++ b/documentation/api/io.strimzi.api.kafka.model.common.Rack.adoc @@ -38,8 +38,8 @@ NOTE: The _rack_ in which brokers are running can change in some cases when the As a result, the replicas running in different racks might then share the same rack. Use Cruise Control and the `KafkaRebalance` resource with the `RackAwareGoal` to make sure that replicas remain distributed across different racks. -When rack awareness is enabled in the `Kafka` custom resource, Strimzi will automatically add the Kubernetes `preferredDuringSchedulingIgnoredDuringExecution` affinity rule to distribute the Kafka brokers across the different racks. -However, the _preferred_ rule does not guarantee that the brokers will be spread. +When rack awareness is enabled, Strimzi will automatically add a node affinity rule to make sure the Kafka nodes are scheduled on one Kubernetes workers with the `topologyKey` label present. +It will not add any rules for distributing the Kafka nodes across the racks or zones. Depending on your exact Kubernetes and Kafka configurations, you should add additional `affinity` rules or configure `topologySpreadConstraints` for Kafka to make sure the nodes are properly distributed across as many racks as possible. For more information see link:{BookURLDeploying}#assembly-scheduling-str[Configuring pod scheduling^]. diff --git a/documentation/assemblies/configuring/assembly-scheduling.adoc b/documentation/assemblies/configuring/assembly-scheduling.adoc index 578ec04c6b9..1d6246837fe 100644 --- a/documentation/assemblies/configuring/assembly-scheduling.adoc +++ b/documentation/assemblies/configuring/assembly-scheduling.adoc @@ -19,6 +19,8 @@ include::../../modules/configuring/ref-affinity.adoc[leveloffset=+1] include::../../modules/configuring/proc-scheduling-brokers-on-different-worker-nodes.adoc[leveloffset=+1] +include::../../modules/configuring/proc-distributing-brokers-across-zones.adoc[leveloffset=+1] + include::../../modules/configuring/proc-scheduling-based-on-other-pods.adoc[leveloffset=+1] include::../../modules/configuring/proc-scheduling-deployment-to-node-using-node-affinity.adoc[leveloffset=+1] diff --git a/documentation/modules/configuring/con-common-configuration.adoc b/documentation/modules/configuring/con-common-configuration.adoc index a398ecb7cd0..ef02dc93744 100644 --- a/documentation/modules/configuring/con-common-configuration.adoc +++ b/documentation/modules/configuring/con-common-configuration.adoc @@ -149,6 +149,7 @@ spec: # ... ---- +[id='con-common-config-rack-awareness-{context}'] == Rack awareness Enable rack-aware broker assignment to improve fault tolerance. @@ -158,12 +159,17 @@ Enable rack-aware broker assignment to improve fault tolerance. [source,yaml] ---- # ... -spec: - rack: - topologyKey: topology.kubernetes.io/zone - # ... +rack: + topologyKey: topology.kubernetes.io/zone +# ... ---- +IMPORTANT: With the rack awareness enabled, Strimzi will automatically set the rack (zone) information from the `topologyKey` label in the Kafka configuration. +It will also automatically add a node affinity rule to make sure the Pods are scheduled only on the nodes that have the `topologyKey` label. +But it will not add any anti-affinity or topology spread constraints to spread the Pods across the racks/zones. +To do so, make sure to add your own rules in the `Kafka` or `KafkaNodePool` custom resources. +For more information see xref:proc-distributing-brokers-across-zones-{context}[Distributing brokers across availability zones] + == Distributed tracing configuration Enable distributed tracing using OpenTelemetry to monitor Kafka component operations. diff --git a/documentation/modules/configuring/con-config-kafka-kraft.adoc b/documentation/modules/configuring/con-config-kafka-kraft.adoc index 38558b41927..3a68e251f00 100644 --- a/documentation/modules/configuring/con-config-kafka-kraft.adoc +++ b/documentation/modules/configuring/con-config-kafka-kraft.adoc @@ -169,6 +169,7 @@ spec: <16> ADVANCED OPTION: Container image configuration, which is recommended only in special situations. <17> Authorization enables simple, OAuth 2.0, custom, or OPA (deprecated) authorization on the Kafka broker. Simple authorization uses the `StandardAuthorizer` Kafka plugin. <18> Rack awareness configuration to spread replicas across different racks, data centers, or availability zones. The `topologyKey` must match a node label containing the rack ID. The example used in this configuration specifies a zone using the standard `{K8sZoneLabel}` label. + You should use it together with custom `topologySpreadConstraint` or `affinity` rules for spreading the Pods across the zones. <19> Prometheus metrics enabled. In this example, metrics are configured for the Prometheus JMX Exporter (the default metrics exporter). <20> Rules for exporting metrics in Prometheus format to a Grafana dashboard through the Prometheus JMX Exporter, which are enabled by referencing a ConfigMap containing configuration for the Prometheus JMX exporter. You can enable metrics without further configuration using a reference to a ConfigMap containing an empty file under `metricsConfig.valueFrom.configMapKeyRef.key`. <21> Entity Operator configuration, which specifies the configuration for the Topic Operator and User Operator. diff --git a/documentation/modules/configuring/proc-distributing-brokers-across-zones.adoc b/documentation/modules/configuring/proc-distributing-brokers-across-zones.adoc new file mode 100644 index 00000000000..58697cee480 --- /dev/null +++ b/documentation/modules/configuring/proc-distributing-brokers-across-zones.adoc @@ -0,0 +1,84 @@ +:_mod-docs-content-type: PROCEDURE + +// Module included in the following assemblies: +// +// assembly-scheduling.adoc + +[id='proc-distributing-brokers-across-zones-{context}'] += Distributing brokers across availability zones + +[role="_abstract"] +To improve fault tolerance, you should distribute your Kafka nodes across multiple different availability zones (racks). +This is typically combined with the xref:con-common-config-rack-awareness-{context}[Rack awareness] feature. + +This procedure provides configuration examples for the `Kafka` resource with enabled rack awareness and the pool-specific `KafkaNodePool` resources with the topology spread constraints configuration to balance pods across zones. + +.Prerequisites + +* xref:deploying-cluster-operator-str[The Cluster Operator must be deployed.] + +.Procedure + +. Configure `rack` in the `Kafka` resource and `topologySpreadConstraints` in the `KafkaNodePool` resources. ++ +Make sure the `strimzi.io/cluster` label in the `topologySpreadConstraint[].labelSelector` fields is set to the name of your Kafka cluster. ++ +.Example cluster with rack awareness and topology spread constraints +[source,yaml,subs="+attributes"] +---- +apiVersion: {KafkaApiVersion} +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + # ... + rack: + topologyKey: topology.kubernetes.io/zone + # ... +# ... +--- +apiVersion: {KafkaApiVersion} +kind: KafkaNodePool +metadata: + name: controllers + labels: + strimzi.io/cluster: my-cluster +spec: + roles: + - controller + template: + pod: + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: DoNotSchedule + labelSelector: + matchLabels: + strimzi.io/cluster: my-cluster + strimzi.io/controller-role: "true" + # ... +--- +apiVersion: {KafkaApiVersion} +kind: KafkaNodePool +metadata: + name: brokers + labels: + strimzi.io/cluster: my-cluster +spec: + roles: + - broker + template: + pod: + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: DoNotSchedule + labelSelector: + matchLabels: + strimzi.io/cluster: my-cluster + strimzi.io/broker-role: "true" + # ... +---- + +. Apply the changes to your custom resource configuration. \ No newline at end of file diff --git a/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java b/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java index 04fc62fb340..44358f8beb6 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java @@ -8,7 +8,6 @@ import io.fabric8.kubernetes.api.model.NodeAffinity; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodAffinityTerm; import io.skodjob.testframe.resources.KubeResourceManager; import io.strimzi.api.kafka.model.connect.KafkaConnectResources; import io.strimzi.api.kafka.model.kafka.KafkaResources; @@ -49,7 +48,6 @@ import static io.strimzi.systemtest.TestTags.MIRROR_MAKER2; import static io.strimzi.systemtest.TestTags.REGRESSION; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; @Tag(REGRESSION) @@ -109,13 +107,6 @@ void testKafkaRackAwareness() { assertThat(specNodeRequirement.getKey(), is(TOPOLOGY_KEY)); assertThat(specNodeRequirement.getOperator(), is("Exists")); - PodAffinityTerm specPodAntiAffinityTerm = specAffinity.getPodAntiAffinity().getPreferredDuringSchedulingIgnoredDuringExecution().get(0).getPodAffinityTerm(); - PodAffinityTerm podAntiAffinityTerm = pod.getSpec().getAffinity().getPodAntiAffinity().getPreferredDuringSchedulingIgnoredDuringExecution().get(0).getPodAffinityTerm(); - assertThat(podAntiAffinityTerm, is(specPodAntiAffinityTerm)); - assertThat(specPodAntiAffinityTerm.getTopologyKey(), is(TOPOLOGY_KEY)); - assertThat(specPodAntiAffinityTerm.getLabelSelector().getMatchLabels(), hasEntry("strimzi.io/cluster", testStorage.getClusterName())); - assertThat(specPodAntiAffinityTerm.getLabelSelector().getMatchLabels(), hasEntry("strimzi.io/name", KafkaResources.kafkaComponentName(testStorage.getClusterName()))); - // check Kafka rack awareness configuration String podNodeName = pod.getSpec().getNodeName(); String hostname = podNodeName.contains(".") ? podNodeName.substring(0, podNodeName.indexOf(".")) : podNodeName;