Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* **This version introduces a new API version to our CRDs.**
**Before upgrading to Strimzi 0.49 or newer, make sure that you update your `KafkaUser` resources to use the `.spec.authorization.acls[]operations` field instead of the deprecated `.spec.authorization.acls[]operation`.**
**Especially when using Helm, make sure that the CRDs are updated when you upgrade the operator.**
* **When rack-awareness is enabled in the `Kafka` custom resource (`.spec.kafka.rack`), Strimzi will not automatically add the best-effort-affinity rules for spreading the Kafka Pods between the zones.**
**Please make sure to set your own `topologySpreadConstraint` or `affinity` rules instead.**
* The `.status.kafkaMetadataState` field in the `Kafka` custom resource is deprecated and not used anymore.
* The `type: oauth` authentication in Kafka brokers and Kafka clients (Kafka Connect, MirrorMaker 2, and Strimzi HTTP Bridge) has been deprecated.
Please use the `type: custom` authentication instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package io.strimzi.operator.cluster.model;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -370,7 +368,7 @@ public Deployment generateDeployment(Map<String, String> annotations, boolean is
templatePod,
DEFAULT_POD_LABELS,
annotations,
getMergedAffinity(),
ModelUtils.affinityWithRackLabelSelector(templatePod, rack),
ContainerUtils.listOrNull(createInitContainer(imagePullPolicy)),
List.of(createContainer(imagePullPolicy)),
getVolumes(isOpenShift),
Expand Down Expand Up @@ -510,18 +508,6 @@ public KafkaBridgeHttpConfig getHttp() {
return this.http;
}

/**
* Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity.
*/
protected Affinity getMergedAffinity() {
Affinity userAffinity = templatePod != null && templatePod.getAffinity() != null ? templatePod.getAffinity() : new Affinity();
AffinityBuilder builder = new AffinityBuilder(userAffinity);
if (rack != null) {
builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey());
}
return builder.build();
}

/**
* Creates the ClusterRoleBinding which is used to bind the Kafka Bridge SA to the ClusterRole
* which permissions the Kafka init container to access K8S nodes (necessary for rack-awareness).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*/
package io.strimzi.operator.cluster.model;

import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -1207,7 +1205,7 @@ public List<StrimziPodSet> generatePodSets(boolean isOpenShift,
DEFAULT_POD_LABELS,
podAnnotationsProvider.apply(node),
KafkaResources.brokersServiceName(cluster),
getMergedAffinity(pool),
ModelUtils.affinityWithRackLabelSelector(pool.templatePod, rack),
ContainerUtils.listOrNull(createInitContainer(imagePullPolicy, pool)),
List.of(createContainer(imagePullPolicy, pool)),
getPodSetVolumes(node, pool.storage, pool.templatePod, isOpenShift),
Expand Down Expand Up @@ -1479,39 +1477,6 @@ private List<VolumeMount> getInitContainerVolumeMounts(KafkaPool pool) {
return volumeMountList;
}

/**
* Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity.
*
* @param pool Node pool with custom affinity configuration
*
* @return Combined affinity
*/
private Affinity getMergedAffinity(KafkaPool pool) {
Affinity userAffinity = pool.templatePod != null && pool.templatePod.getAffinity() != null ? pool.templatePod.getAffinity() : new Affinity();
AffinityBuilder builder = new AffinityBuilder(userAffinity);
if (rack != null) {
// If there's a rack config, we need to add a podAntiAffinity to spread the brokers among the racks
// We add the affinity even for controller only nodes as we prefer them to be spread even if they don't directly use rack awareness
builder = builder
.editOrNewPodAntiAffinity()
.addNewPreferredDuringSchedulingIgnoredDuringExecution()
.withWeight(100)
.withNewPodAffinityTerm()
.withTopologyKey(rack.getTopologyKey())
.withNewLabelSelector()
.addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, cluster)
.addToMatchLabels(Labels.STRIMZI_NAME_LABEL, componentName)
.endLabelSelector()
.endPodAffinityTerm()
.endPreferredDuringSchedulingIgnoredDuringExecution()
.endPodAntiAffinity();

builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey());
}

return builder.build();
}

private List<EnvVar> getInitContainerEnvVars(KafkaPool pool) {
List<EnvVar> varList = new ArrayList<>();
varList.add(ContainerUtils.createEnvVarFromFieldRef(ENV_VAR_KAFKA_INIT_NODE_NAME, "spec.nodeName"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*/
package io.strimzi.operator.cluster.model;

import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
import io.fabric8.kubernetes.api.model.Container;
Expand Down Expand Up @@ -604,18 +602,6 @@ private List<VolumeMount> getExternalConfigurationVolumeMounts() {
return volumeMountList;
}

/**
* Returns a combined affinity: Adding the affinity needed for the "kafka-rack" to the user-provided affinity.
*/
protected Affinity getMergedAffinity() {
Affinity userAffinity = templatePod != null && templatePod.getAffinity() != null ? templatePod.getAffinity() : new Affinity();
AffinityBuilder builder = new AffinityBuilder(userAffinity);
if (rack != null) {
builder = ModelUtils.populateAffinityBuilderWithRackLabelSelector(builder, userAffinity, rack.getTopologyKey());
}
return builder.build();
}

/**
* Generates the StrimziPodSet for the Kafka cluster.
* enabled.
Expand Down Expand Up @@ -659,7 +645,7 @@ public StrimziPodSet generatePodSet(int replicas,
defaultPodLabels(),
podAnnotations,
componentName,
getMergedAffinity(),
ModelUtils.affinityWithRackLabelSelector(templatePod, rack),
ContainerUtils.listOrNull(createInitContainer(imagePullPolicy)),
List.of(createContainer(imagePullPolicy, customContainerImage)),
getVolumes(isOpenShift),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.strimzi.api.kafka.model.common.CertificateAuthority;
import io.strimzi.api.kafka.model.common.Rack;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.Util;
Expand Down Expand Up @@ -186,54 +188,65 @@ public static List<String> getLinesWithoutCommentsAndEmptyLines(String config) {
}

/**
* Adds user-configured affinity to the AffinityBuilder
* Mixes the user-configured affinity with the rack-awareness node selector affinity
*
* @param builder the builder which is used to populate the node affinity
* @param userAffinity the userAffinity which is defined by the user
* @param topologyKey the topology key which is used to select the node
* @return the AffinityBuilder which has the node selector with topology key which is needed to make sure
* the pods are scheduled only on nodes with the rack label
* @param podTemplate PodTemplate with user affinity configuration
* @param rack Rack awareness configuration
*
* @return Affinity which has the node selector with the topology-key needed to make sure the pods are scheduled only
* on nodes with the rack label
*/
public static AffinityBuilder populateAffinityBuilderWithRackLabelSelector(AffinityBuilder builder, Affinity userAffinity, String topologyKey) {
// We need to add node affinity to make sure the pods are scheduled only on nodes with the rack label
NodeSelectorRequirement selector = new NodeSelectorRequirementBuilder()
.withOperator("Exists")
.withKey(topologyKey)
.build();
public static Affinity affinityWithRackLabelSelector(PodTemplate podTemplate, Rack rack) {
// Extract the user-defined affinity if set
Affinity userAffinity = podTemplate != null && podTemplate.getAffinity() != null ? podTemplate.getAffinity() : null;

if (rack != null) {
// When rack-awareness is used, we need to add node affinity to make sure the pods are scheduled only on nodes with the rack label
NodeSelectorRequirement rackAwareSelector = new NodeSelectorRequirementBuilder()
.withOperator("Exists")
.withKey(rack.getTopologyKey())
.build();

AffinityBuilder affinityBuilder = new AffinityBuilder(userAffinity);

if (userAffinity != null
if (userAffinity != null
&& userAffinity.getNodeAffinity() != null
&& userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution() != null
&& userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms() != null) {
// User has specified some Node Selector Terms => we should enhance them
List<NodeSelectorTerm> oldTerms = userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms();
List<NodeSelectorTerm> enhancedTerms = new ArrayList<>(oldTerms.size());

for (NodeSelectorTerm term : oldTerms) {
NodeSelectorTerm enhancedTerm = new NodeSelectorTermBuilder(term)
.addToMatchExpressions(selector)
.build();
enhancedTerms.add(enhancedTerm);
// User has specified some Node Selector Terms => we should enhance them by injecting our own selector to the matchExpressions
List<NodeSelectorTerm> oldTerms = userAffinity.getNodeAffinity().getRequiredDuringSchedulingIgnoredDuringExecution().getNodeSelectorTerms();
List<NodeSelectorTerm> enhancedTerms = new ArrayList<>(oldTerms.size());

for (NodeSelectorTerm term : oldTerms) {
NodeSelectorTerm enhancedTerm = new NodeSelectorTermBuilder(term)
.addToMatchExpressions(rackAwareSelector)
.build();
enhancedTerms.add(enhancedTerm);
}

affinityBuilder
.editOrNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.withNodeSelectorTerms(enhancedTerms)
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
} else {
// User has not specified any selector terms => we add our own
affinityBuilder
.editOrNewNodeAffinity()
.editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTerm()
.withMatchExpressions(rackAwareSelector)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
}

builder = builder
.editOrNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.withNodeSelectorTerms(enhancedTerms)
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
return affinityBuilder.build();
} else {
// User has not specified any selector terms => we add our own
builder = builder
.editOrNewNodeAffinity()
.editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTerm()
.withMatchExpressions(selector)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity();
// Rack-awareness is not used. We just keep what the user configured.
return userAffinity;
}
return builder;
}

/**
Expand Down
Loading