Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* **This version introduces a new API version to our CRDs.**
**Before upgrading to Strimzi 0.49 or newer, make sure that you update your `KafkaUser` resources to use the `.spec.authorization.acls[]operations` field instead of the deprecated `.spec.authorization.acls[]operation`.**
**Especially when using Helm, make sure that the CRDs are updated when you upgrade the operator.**
* **When rack-awareness is enabled in the `Kafka` custom resource (`.spec.kafka.rack`), Strimzi will not automatically add the best-effort-affinity rules for spreading the Kafka Pods between the zones.**
**Please make sure to set your own `topologySpreadConstraint` or `affinity` rules instead.**
* The `.status.kafkaMetadataState` field in the `Kafka` custom resource is deprecated and not used anymore.
* The `type: oauth` authentication in Kafka brokers and Kafka clients (Kafka Connect, MirrorMaker 2, and Strimzi HTTP Bridge) has been deprecated.
Please use the `type: custom` authentication instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package io.strimzi.operator.cluster.model;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -370,7 +368,7 @@ public Deployment generateDeployment(Map<String, String> annotations, boolean is
templatePod,
DEFAULT_POD_LABELS,
annotations,
getMergedAffinity(),
ModelUtils.affinityWithRackLabelSelector(templatePod, rack),
ContainerUtils.listOrNull(createInitContainer(imagePullPolicy)),
List.of(createContainer(imagePullPolicy)),
getVolumes(isOpenShift),
Expand Down Expand Up @@ -510,18 +508,6 @@ public KafkaBridgeHttpConfig getHttp() {
return this.http;
}

/**
* Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity.
*/
protected Affinity getMergedAffinity() {
Affinity userAffinity = templatePod != null && templatePod.getAffinity() != null ? templatePod.getAffinity() : new Affinity();
AffinityBuilder builder = new AffinityBuilder(userAffinity);
if (rack != null) {
builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey());
}
return builder.build();
}

/**
* Creates the ClusterRoleBinding which is used to bind the Kafka Bridge SA to the ClusterRole
* which permissions the Kafka init container to access K8S nodes (necessary for rack-awareness).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*/
package io.strimzi.operator.cluster.model;

import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -1207,7 +1205,7 @@ public List<StrimziPodSet> generatePodSets(boolean isOpenShift,
DEFAULT_POD_LABELS,
podAnnotationsProvider.apply(node),
KafkaResources.brokersServiceName(cluster),
getMergedAffinity(pool),
ModelUtils.affinityWithRackLabelSelector(pool.templatePod, rack),
ContainerUtils.listOrNull(createInitContainer(imagePullPolicy, pool)),
List.of(createContainer(imagePullPolicy, pool)),
getPodSetVolumes(node, pool.storage, pool.templatePod, isOpenShift),
Expand Down Expand Up @@ -1479,39 +1477,6 @@ private List<VolumeMount> getInitContainerVolumeMounts(KafkaPool pool) {
return volumeMountList;
}

/**
* Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity.
*
* @param pool Node pool with custom affinity configuration
*
* @return Combined affinity
*/
private Affinity getMergedAffinity(KafkaPool pool) {
Affinity userAffinity = pool.templatePod != null && pool.templatePod.getAffinity() != null ? pool.templatePod.getAffinity() : new Affinity();
AffinityBuilder builder = new AffinityBuilder(userAffinity);
if (rack != null) {
// If there's a rack config, we need to add a podAntiAffinity to spread the brokers among the racks
// We add the affinity even for controller only nodes as we prefer them to be spread even if they don't directly use rack awareness
builder = builder
.editOrNewPodAntiAffinity()
.addNewPreferredDuringSchedulingIgnoredDuringExecution()
.withWeight(100)
.withNewPodAffinityTerm()
.withTopologyKey(rack.getTopologyKey())
.withNewLabelSelector()
.addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, cluster)
.addToMatchLabels(Labels.STRIMZI_NAME_LABEL, componentName)
.endLabelSelector()
.endPodAffinityTerm()
.endPreferredDuringSchedulingIgnoredDuringExecution()
.endPodAntiAffinity();

builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey());
}

return builder.build();
}

private List<EnvVar> getInitContainerEnvVars(KafkaPool pool) {
List<EnvVar> varList = new ArrayList<>();
varList.add(ContainerUtils.createEnvVarFromFieldRef(ENV_VAR_KAFKA_INIT_NODE_NAME, "spec.nodeName"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*/
package io.strimzi.operator.cluster.model;

import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
import io.fabric8.kubernetes.api.model.Container;
Expand Down Expand Up @@ -604,18 +602,6 @@ private List<VolumeMount> getExternalConfigurationVolumeMounts() {
return volumeMountList;
}

/**
* Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity.
*/
protected Affinity getMergedAffinity() {
Affinity userAffinity = templatePod != null && templatePod.getAffinity() != null ? templatePod.getAffinity() : new Affinity();
AffinityBuilder builder = new AffinityBuilder(userAffinity);
if (rack != null) {
builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey());
}
return builder.build();
}

/**
* Generates the StrimziPodSet for the Kafka cluster.
* enabled.
Expand Down Expand Up @@ -659,7 +645,7 @@ public StrimziPodSet generatePodSet(int replicas,
defaultPodLabels(),
podAnnotations,
componentName,
getMergedAffinity(),
ModelUtils.affinityWithRackLabelSelector(templatePod, rack),
ContainerUtils.listOrNull(createInitContainer(imagePullPolicy)),
List.of(createContainer(imagePullPolicy, customContainerImage)),
getVolumes(isOpenShift),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.strimzi.api.kafka.model.common.CertificateAuthority;
import io.strimzi.api.kafka.model.common.Rack;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.Util;
Expand Down Expand Up @@ -186,54 +188,65 @@ public static List<String> getLinesWithoutCommentsAndEmptyLines(String config) {
}

/**
* Adds user-configured affinity to the AffinityBuilder
* Mixes the user-configured affinity with the rack-awareness node selector affinity
*
* @param builder the builder which is used to populate the node affinity
* @param userAffinity the userAffinity which is defined by the user
* @param topologyKey the topology key which is used to select the node
* @return the AffinityBuilder which has the node selector with topology key which is needed to make sure
* the pods are scheduled only on nodes with the rack label
* @param podTemplate PodTemplate with user affinity configuration
* @param rack Rack awareness configuration
*
* @return Affinity which has the node selector with the topology-key needed to make sure the pods are scheduled only
* on nodes with the rack label
*/
public static AffinityBuilder populateAffinityBuilderWithRackLabelSelector(AffinityBuilder builder, Affinity userAffinity, String topologyKey) {
// We need to add node affinity to make sure the pods are scheduled only on nodes with the rack label
NodeSelectorRequirement selector = new NodeSelectorRequirementBuilder()
.withOperator("Exists")
.withKey(topologyKey)
.build();
public static Affinity affinityWithRackLabelSelector(PodTemplate podTemplate, Rack rack) {
// Extract the user-defined affinity if set
Affinity userAffinity = podTemplate != null && podTemplate.getAffinity() != null ? podTemplate.getAffinity() : null;

if (rack != null) {
// When rack-awareness is used, we need to add node affinity to make sure the pods are scheduled only on nodes with the rack label
NodeSelectorRequirement rackAwareSelector = new NodeSelectorRequirementBuilder()
.withOperator("Exists")
.withKey(rack.getTopologyKey())
.build();

AffinityBuilder affinityBuilder = new AffinityBuilder(userAffinity);

if (userAffinity != null
if (userAffinity != null
&& userAffinity.getNodeAffinity() != null
&& userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution() != null
&& userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms() != null) {
// User has specified some Node Selector Terms => we should enhance them
List<NodeSelectorTerm> oldTerms = userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms();
List<NodeSelectorTerm> enhancedTerms = new ArrayList<>(oldTerms.size());

for (NodeSelectorTerm term : oldTerms) {
NodeSelectorTerm enhancedTerm = new NodeSelectorTermBuilder(term)
.addToMatchExpressions(selector)
.build();
enhancedTerms.add(enhancedTerm);
// User has specified some Node Selector Terms => we should enhance them by injecting out own selector to the matchExpressions
List<NodeSelectorTerm> oldTerms = userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms();
List<NodeSelectorTerm> enhancedTerms = new ArrayList<>(oldTerms.size());

for (NodeSelectorTerm term : oldTerms) {
NodeSelectorTerm enhancedTerm = new NodeSelectorTermBuilder(term)
.addToMatchExpressions(rackAwareSelector)
.build();
enhancedTerms.add(enhancedTerm);
}

affinityBuilder
.editOrNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.withNodeSelectorTerms(enhancedTerms)
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
} else {
// User has not specified any selector terms => we add our own
affinityBuilder
.editOrNewNodeAffinity()
.editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTerm()
.withMatchExpressions(rackAwareSelector)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
}

builder = builder
.editOrNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.withNodeSelectorTerms(enhancedTerms)
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
return affinityBuilder.build();
} else {
// User has not specified any selector terms => we add our own
builder = builder
.editOrNewNodeAffinity()
.editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTerm()
.withMatchExpressions(selector)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
// Rack-awareness is not used. We just keep what the user configured.
return userAffinity;
}
return builder;
}

/**
Expand Down
Loading