diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java index 105f31f3a51..2f7d892dad1 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java @@ -44,149 +44,6 @@ public class KafkaConfiguration extends AbstractConfiguration { DEFAULTS.put("min.insync.replicas", "1"); } - /** - * List of configuration options that are relevant to controllers and should be considered when deciding whether - * a controller-only node needs to be rolled or not. - */ - private static final Set CONTROLLER_RELEVANT_CONFIGS = Set.of( - "alter.config.policy.class.name", - "authorizer.class.name", - "auto.create.topics.enable", - "background.threads", - "broker.heartbeat.interval.ms", - "broker.rack", - "broker.session.timeout.ms", - "connection.failed.authentication.delay.ms", - "connections.max.idle.ms", - "connections.max.reauth.ms", - "controlled.shutdown.enable", - "controlled.shutdown.max.retries", - "controlled.shutdown.retry.backoff.ms", - "controller.listener.names", - "controller.quorum.append.linger.ms", - "controller.quorum.election.backoff.max.ms", - "controller.quorum.election.timeout.ms", - "controller.quorum.fetch.timeout.ms", - "controller.quorum.request.timeout.ms", - "controller.quorum.retry.backoff.ms", - "controller.quorum.voters", - "controller.quota.window.num", - "controller.quota.window.size.seconds", - "controller.socket.timeout.ms", - "create.topic.policy.class.name", - "default.replication.factor", - "delete.topic.enable", - "early.start.listeners", - "kafka.metrics.polling.interval.secs", - "kafka.metrics.reporters", - "leader.imbalance.check.interval.seconds", - "leader.imbalance.per.broker.percentage", - "listener.name.controlplane-9090.ssl.keystore.location", - "listener.name.controlplane-9090.ssl.keystore.password", - "listener.name.controlplane-9090.ssl.keystore.type", - "listener.name.controlplane-9090.ssl.truststore.location", - "listener.name.controlplane-9090.ssl.truststore.password", - "listener.name.controlplane-9090.ssl.truststore.type", - "listener.name.controlplane-9090.ssl.client.auth", - "listener.security.protocol.map", - "listeners", - "log.dir", - "log.dirs", - "min.insync.replicas", - "max.connection.creation.rate", - "max.connections.per.ip.overrides", - "max.connections.per.ip", - "max.connections", - "metadata.log.dir", - "metadata.log.max.record.bytes.between.snapshots", - "metadata.log.max.snapshot.interval.ms", - "metadata.log.segment.bytes", - "metadata.log.segment.min.bytes", - "metadata.log.segment.ms", - "metadata.max.idle.interval.ms", - "metadata.max.retention.bytes", - "metadata.max.retention.ms", - "metric.reporters", - "metrics.num.samples", - "metrics.recording.level", - "metrics.sample.window.ms", - "node.id", - "num.io.threads", - "num.network.threads", - "num.partitions", - "offsets.topic.replication.factor", - "principal.builder.class", - "process.roles", - "remote.log.storage.system.enable", - "replica.selector.class", - "reserved.broker.max.id", - "sasl.enabled.mechanisms", - "sasl.kerberos.kinit.cmd", - "sasl.kerberos.min.time.before.relogin", - "sasl.kerberos.principal.to.local.rules", - "sasl.kerberos.service.name", - "sasl.kerberos.ticket.renew.jitter", - "sasl.kerberos.ticket.renew.window.factor", - "sasl.login.callback.handler.class", - "sasl.login.class", - "sasl.login.connect.timeout.ms", - "sasl.login.read.timeout.ms", - "sasl.login.refresh.buffer.seconds", - "sasl.login.refresh.min.period.seconds", - "sasl.login.refresh.window.factor", - "sasl.login.refresh.window.jitter", - "sasl.login.retry.backoff.max.ms", - "sasl.login.retry.backoff.ms", - "sasl.mechanism.controller.protocol", - "sasl.oauthbearer.clock.skew.seconds", - "sasl.oauthbearer.expected.audience", - "sasl.oauthbearer.expected.issuer", - "sasl.oauthbearer.jwks.endpoint.refresh.ms", - "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms", - "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms", - "sasl.oauthbearer.jwks.endpoint.url", - "sasl.oauthbearer.scope.claim.name", - "sasl.oauthbearer.sub.claim.name", - "sasl.oauthbearer.token.endpoint.url", - "sasl.server.callback.handler.class", - "sasl.server.max.receive.size", - "security.providers", - "server.max.startup.time.ms", - "socket.connection.setup.timeout.max.ms", - "socket.connection.setup.timeout.ms", - "socket.listen.backlog.size", - "socket.receive.buffer.bytes", - "socket.request.max.bytes", - "socket.send.buffer.bytes", - "ssl.cipher.suites", - "ssl.client.auth", - "ssl.enabled.protocols", - "ssl.endpoint.identification.algorithm", - "ssl.engine.factory.class", - "ssl.key.password", - "ssl.keymanager.algorithm", - "ssl.keystore.certificate.chain", - "ssl.keystore.key", - "ssl.keystore.location", - "ssl.keystore.password", - "ssl.keystore.type", - "ssl.principal.mapping.rules", - "ssl.protocol", - "ssl.provider", - "ssl.secure.random.implementation", - "ssl.trustmanager.algorithm", - "ssl.truststore.certificates", - "ssl.truststore.location", - "ssl.truststore.password", - "ssl.truststore.type", - "super.users", - "transaction.state.log.min.isr", - "transaction.state.log.replication.factor", - "queued.max.requests", - "queued.max.requests.bytes", - "unclean.leader.election.enable" - ); - /** * Copy constructor which creates new instance of the Kafka Configuration from existing configuration. It is * useful when you need to modify an instance of the configuration without permanently changing the original. @@ -287,24 +144,6 @@ public Set unknownConfigsWithValues(KafkaVersion kafkaVersion) { return result; } - /** - * Return the config properties with their values in this KafkaConfiguration which are known to be relevant for the - * Kafka controller nodes. - * - * @return The configuration options relevant for controllers - */ - public Set controllerConfigsWithValues() { - Set result = new HashSet<>(); - - for (Map.Entry e :this.asOrderedProperties().asMap().entrySet()) { - if (CONTROLLER_RELEVANT_CONFIGS.contains(e.getKey())) { - result.add(e.getKey() + "=" + e.getValue()); - } - } - - return result; - } - /** * @return True if the configuration is empty. False otherwise. */ diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index 8ff3c7d15e7..9e89c31ef06 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -723,12 +723,6 @@ protected Future perBrokerKafkaConfiguration(MetricsAndLogging metricsAndL // We collect the configuration options related to various plugins nodeConfiguration += kc.unknownConfigsWithValues(kafka.getKafkaVersion()).toString(); - // We collect the information relevant to controller-only nodes - if (pool.isController() && !pool.isBroker()) { - // For controllers only, we extract the controller-relevant configurations and use it in the configuration annotations - nodeConfiguration = kc.controllerConfigsWithValues().toString(); - } - // We store hash of the broker configurations for later use in Pod and in rolling updates this.brokerConfigurationHash.put(nodeId, Util.hashStub(nodeConfiguration)); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaConfigurationDiff.java similarity index 54% rename from cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java rename to cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaConfigurationDiff.java index 5fcadc1301c..14e4a9d92aa 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaConfigurationDiff.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -36,18 +37,19 @@ * 3b. If entry was removed from desired, add it to the diff with null value. * 3c. If custom entry was removed, delete property */ -public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff { - private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaBrokerConfigurationDiff.class); - +public class KafkaConfigurationDiff extends AbstractJsonDiff { + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaConfigurationDiff.class); private final Reconciliation reconciliation; - private final Collection brokerConfigDiff; + private final Collection nodeConfigDiff; private final Map configModel; + private final boolean isController; + private final boolean isBroker; /** * These options are skipped because they contain placeholders * 909[1-4] is for skipping all (internal, plain, secured, external) listeners properties */ - public static final Pattern IGNORABLE_PROPERTIES = Pattern.compile( + public static final Pattern IGNORABLE_PROPERTIES_PATTERN = Pattern.compile( "^(broker\\.id" + "|.*-[0-9]{2,5}\\.ssl\\.keystore\\.location" + "|.*-[0-9]{2,5}\\.ssl\\.keystore\\.password" @@ -62,22 +64,158 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff { + "|broker\\.rack)$"); /** - * KRaft controller configuration options are skipped if it is not combined node + * Controller configuration options to skip for broker only node */ - private static final Pattern IGNORABLE_CONTROLLER_PROPERTIES = Pattern.compile("controller\\.quorum\\..*"); + private static final Pattern IGNORABLE_CONTROLLER_PROPERTIES_PATTERN = Pattern.compile("controller\\.quorum\\..*"); + + /** + * Configuration options that are relevant to controllers. This list is mutually exclusive with IGNORABLE_PROPERTIES_PATTERN (no overlap). + */ + private static final Set CONTROLLER_RELEVANT_PROPERTIES = Set.of( + "alter.config.policy.class.name", + "authorizer.class.name", + "auto.create.topics.enable", + "background.threads", + "broker.heartbeat.interval.ms", + "broker.session.timeout.ms", + "connection.failed.authentication.delay.ms", + "connections.max.idle.ms", + "connections.max.reauth.ms", + "controlled.shutdown.enable", + "controlled.shutdown.max.retries", + "controlled.shutdown.retry.backoff.ms", + "controller.listener.names", + "controller.quorum.append.linger.ms", + "controller.quorum.election.backoff.max.ms", + "controller.quorum.election.timeout.ms", + "controller.quorum.fetch.timeout.ms", + "controller.quorum.request.timeout.ms", + "controller.quorum.retry.backoff.ms", + "controller.quorum.voters", + "controller.quota.window.num", + "controller.quota.window.size.seconds", + "controller.socket.timeout.ms", + "create.topic.policy.class.name", + "default.replication.factor", + "delete.topic.enable", + "early.start.listeners", + "kafka.metrics.polling.interval.secs", + "kafka.metrics.reporters", + "leader.imbalance.check.interval.seconds", + "leader.imbalance.per.broker.percentage", + "listener.security.protocol.map", + "listeners", + "log.dir", + "log.dirs", + "min.insync.replicas", + "max.connection.creation.rate", + "max.connections.per.ip.overrides", + "max.connections.per.ip", + "max.connections", + "metadata.log.dir", + "metadata.log.max.record.bytes.between.snapshots", + "metadata.log.max.snapshot.interval.ms", + "metadata.log.segment.bytes", + "metadata.log.segment.min.bytes", + "metadata.log.segment.ms", + "metadata.max.idle.interval.ms", + "metadata.max.retention.bytes", + "metadata.max.retention.ms", + "metric.reporters", + "metrics.num.samples", + "metrics.recording.level", + "metrics.sample.window.ms", + "num.io.threads", + "num.network.threads", + "num.partitions", + "offsets.topic.replication.factor", + "principal.builder.class", + "process.roles", + "replica.selector.class", + "reserved.broker.max.id", + "sasl.kerberos.kinit.cmd", + "sasl.kerberos.min.time.before.relogin", + "sasl.kerberos.principal.to.local.rules", + "sasl.kerberos.service.name", + "sasl.kerberos.ticket.renew.jitter", + "sasl.kerberos.ticket.renew.window.factor", + "sasl.login.callback.handler.class", + "sasl.login.class", + "sasl.login.connect.timeout.ms", + "sasl.login.read.timeout.ms", + "sasl.login.refresh.buffer.seconds", + "sasl.login.refresh.min.period.seconds", + "sasl.login.refresh.window.factor", + "sasl.login.refresh.window.jitter", + "sasl.login.retry.backoff.max.ms", + "sasl.login.retry.backoff.ms", + "sasl.mechanism.controller.protocol", + "sasl.oauthbearer.clock.skew.seconds", + "sasl.oauthbearer.expected.audience", + "sasl.oauthbearer.expected.issuer", + "sasl.oauthbearer.jwks.endpoint.refresh.ms", + "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms", + "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms", + "sasl.oauthbearer.jwks.endpoint.url", + "sasl.oauthbearer.scope.claim.name", + "sasl.oauthbearer.sub.claim.name", + "sasl.oauthbearer.token.endpoint.url", + "sasl.server.callback.handler.class", + "sasl.server.max.receive.size", + "security.providers", + "server.max.startup.time.ms", + "socket.connection.setup.timeout.max.ms", + "socket.connection.setup.timeout.ms", + "socket.listen.backlog.size", + "socket.receive.buffer.bytes", + "socket.request.max.bytes", + "socket.send.buffer.bytes", + "ssl.cipher.suites", + "ssl.client.auth", + "ssl.enabled.protocols", + "ssl.endpoint.identification.algorithm", + "ssl.engine.factory.class", + "ssl.key.password", + "ssl.keymanager.algorithm", + "ssl.keystore.certificate.chain", + "ssl.keystore.key", + "ssl.keystore.location", + "ssl.keystore.password", + "ssl.keystore.type", + "ssl.principal.mapping.rules", + "ssl.protocol", + "ssl.provider", + "ssl.secure.random.implementation", + "ssl.trustmanager.algorithm", + "ssl.truststore.certificates", + "ssl.truststore.location", + "ssl.truststore.password", + "ssl.truststore.type", + "super.users", + "transaction.state.log.min.isr", + "transaction.state.log.replication.factor", + "queued.max.requests", + "queued.max.requests.bytes", + "unclean.leader.election.enable" + ); + /** * Constructor * * @param reconciliation Reconciliation marker - * @param brokerConfigs Broker configuration from Kafka Admin API + * @param nodeConfigs Kafka node configuration from Kafka Admin API * @param desired Desired configuration * @param kafkaVersion Kafka version - * @param brokerNodeRef Broker node reference + * @param nodeRef Node reference + * @param isController Whether node has a controller role + * @param isBroker Whether node has a broker role */ - protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, NodeRef brokerNodeRef) { + protected KafkaConfigurationDiff(Reconciliation reconciliation, Config nodeConfigs, String desired, KafkaVersion kafkaVersion, NodeRef nodeRef, boolean isController, boolean isBroker) { this.reconciliation = reconciliation; this.configModel = KafkaConfiguration.readConfigModel(kafkaVersion); - this.brokerConfigDiff = diff(brokerNodeRef, desired, brokerConfigs, configModel); + this.isController = isController; + this.isBroker = isBroker; + this.nodeConfigDiff = diff(nodeRef, desired, nodeConfigs, configModel); } /** @@ -85,7 +223,7 @@ protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config bro */ protected boolean canBeUpdatedDynamically() { boolean result = true; - for (AlterConfigOp entry : brokerConfigDiff) { + for (AlterConfigOp entry : nodeConfigDiff) { if (isScope(entry.configEntry(), Scope.READ_ONLY)) { result = false; LOGGER.infoCr(reconciliation, "Configuration can't be updated dynamically due to: {}", entry); @@ -100,44 +238,48 @@ protected boolean canBeUpdatedDynamically() { * @return Collection of AlterConfigOp containing difference between current and desired configuration */ protected Collection getConfigDiff() { - return brokerConfigDiff; + return nodeConfigDiff; } /** * @return The number of broker configs which are different. */ protected int getDiffSize() { - return brokerConfigDiff.size(); + return nodeConfigDiff.size(); } - private static boolean isIgnorableProperty(final String key, final boolean nodeIsController) { - // If node is not a KRaft controller, ignore KRaft controller config properties. - if (!nodeIsController) { - return IGNORABLE_PROPERTIES.matcher(key).matches() || IGNORABLE_CONTROLLER_PROPERTIES.matcher(key).matches(); + private boolean isIgnorableProperty(final String key) { + if (isController && !isBroker) { + // Controller only -> a property is ignorable if it is not relevant to controllers + return !CONTROLLER_RELEVANT_PROPERTIES.contains(key); + } else if (isController) { + // Combined -> a property is ignorable if it matches the ignorable pattern + return IGNORABLE_PROPERTIES_PATTERN.matcher(key).matches(); } else { - return IGNORABLE_PROPERTIES.matcher(key).matches(); + // Broker only -> a property is ignorable if it matches the ignorable pattern or is a controller property because it's not relevant to brokers + return IGNORABLE_PROPERTIES_PATTERN.matcher(key).matches() || IGNORABLE_CONTROLLER_PROPERTIES_PATTERN.matcher(key).matches(); } } /** * Computes diff between two maps. Entries in IGNORABLE_PROPERTIES are skipped - * @param brokerNodeRef broker node reference of compared broker + * @param nodeRef node reference of compared node * @param desired desired configuration, may be null if the related ConfigMap does not exist yet or no changes are required - * @param brokerConfigs current configuration + * @param nodeConfigs current configuration * @param configModel default configuration for {@code kafkaVersion} of broker * @return Collection of AlterConfigOp containing all entries which were changed from current in desired configuration */ - private Collection diff(NodeRef brokerNodeRef, String desired, - Config brokerConfigs, + private Collection diff(NodeRef nodeRef, String desired, + Config nodeConfigs, Map configModel) { - if (brokerConfigs == null || desired == null) { + if (nodeConfigs == null || desired == null) { return Collections.emptyList(); } Map currentMap; Collection updatedCE = new ArrayList<>(); - currentMap = brokerConfigs.entries().stream().collect( + currentMap = nodeConfigs.entries().stream().collect( Collectors.toMap( ConfigEntry::name, configEntry -> configEntry.value() == null ? "null" : configEntry.value())); @@ -154,7 +296,7 @@ private Collection diff(NodeRef brokerNodeRef, String desired, String pathValue = d.get("path").asText(); String pathValueWithoutSlash = pathValue.substring(1); - Optional optEntry = brokerConfigs.entries().stream() + Optional optEntry = nodeConfigs.entries().stream() .filter(configEntry -> configEntry.name().equals(pathValueWithoutSlash)) .findFirst(); @@ -162,25 +304,25 @@ private Collection diff(NodeRef brokerNodeRef, String desired, if (optEntry.isPresent()) { ConfigEntry entry = optEntry.get(); if ("remove".equals(op)) { - removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry, brokerNodeRef.controller()); + removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry); } else if ("replace".equals(op)) { // entry is in the current, desired is updated value - updateOrAdd(entry.name(), configModel, desiredMap, updatedCE, brokerNodeRef.controller()); + updateOrAdd(entry.name(), configModel, desiredMap, updatedCE); } } else { if ("add".equals(op)) { // entry is not in the current, it is added - updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE, brokerNodeRef.controller()); + updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE); } } if ("remove".equals(op)) { // there is a lot of properties set by default - not having them in desired causes very noisy log output - LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d); + LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", nodeRef.nodeId(), d); LOGGER.traceCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue)); LOGGER.traceCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue)); } else { - LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d); + LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", nodeRef.nodeId(), d); LOGGER.debugCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue)); LOGGER.debugCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue)); } @@ -189,8 +331,8 @@ private Collection diff(NodeRef brokerNodeRef, String desired, return updatedCE; } - private void updateOrAdd(String propertyName, Map configModel, Map desiredMap, Collection updatedCE, boolean nodeIsController) { - if (!isIgnorableProperty(propertyName, nodeIsController)) { + private void updateOrAdd(String propertyName, Map configModel, Map desiredMap, Collection updatedCE) { + if (!isIgnorableProperty(propertyName)) { if (KafkaConfiguration.isCustomConfigurationOption(propertyName, configModel)) { LOGGER.traceCr(reconciliation, "custom property {} has been updated/added {}", propertyName, desiredMap.get(propertyName)); } else { @@ -202,7 +344,7 @@ private void updateOrAdd(String propertyName, Map configMod } } - private void removeProperty(Map configModel, Collection updatedCE, String pathValueWithoutSlash, ConfigEntry entry, boolean nodeIsController) { + private void removeProperty(Map configModel, Collection updatedCE, String pathValueWithoutSlash, ConfigEntry entry) { if (KafkaConfiguration.isCustomConfigurationOption(entry.name(), configModel)) { // we are deleting custom option LOGGER.traceCr(reconciliation, "removing custom property {}", entry.name()); @@ -215,7 +357,7 @@ private void removeProperty(Map configModel, Collection it was using non-default value and was removed // if the entry was custom, it should be deleted - if (!isIgnorableProperty(pathValueWithoutSlash, nodeIsController)) { + if (!isIgnorableProperty(pathValueWithoutSlash)) { updatedCE.add(new AlterConfigOp(new ConfigEntry(pathValueWithoutSlash, null), AlterConfigOp.OpType.DELETE)); LOGGER.infoCr(reconciliation, "{} not set in desired, unsetting back to default {}", entry.name(), "deleted entry"); } else { @@ -229,7 +371,7 @@ private void removeProperty(Map configModel, Collection getConfigDiff(Scope scope) { Collection configDiff = new ArrayList<>(); - for (AlterConfigOp entry : brokerConfigDiff) { + for (AlterConfigOp entry : nodeConfigDiff) { if (isScope(entry.configEntry(), scope)) { configDiff.add(entry); } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index b6049359af8..daf5e81ef25 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -297,7 +297,7 @@ protected static class RestartContext { boolean needsReconfig; boolean forceRestart; boolean podStuck; - KafkaBrokerConfigurationDiff brokerConfigDiff; + KafkaConfigurationDiff configDiff; KafkaQuorumCheck quorumCheck; RestartContext(Supplier backOffSupplier) { @@ -437,7 +437,7 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) throw new UnforceableProblem("Pod " + nodeRef.podName() + " cannot be updated right now."); } else { // Check for rollability before trying a dynamic update so that if the dynamic update fails we can go to a full restart - if (!maybeDynamicUpdateBrokerConfig(nodeRef, restartContext)) { + if (!maybeDynamicUpdateBrokerConfig(nodeRef, restartContext, isController && !isBroker)) { LOGGER.infoCr(reconciliation, "Rolling Pod {} due to {}", nodeRef, restartContext.restartReasons.getAllReasonNotes()); restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext); } else { @@ -518,12 +518,16 @@ private boolean isPodStuck(Pod pod) { * Dynamically update the broker config if the plan says we can. * Return true if the broker was successfully updated dynamically. */ - private boolean maybeDynamicUpdateBrokerConfig(NodeRef nodeRef, RestartContext restartContext) throws InterruptedException { + private boolean maybeDynamicUpdateBrokerConfig(NodeRef nodeRef, RestartContext restartContext, boolean isControllerOnly) throws InterruptedException { boolean updatedDynamically; - + // needsReconfig is true, if node doesn't need to be restarted but updated configs can be dynamically updated. if (restartContext.needsReconfig) { try { - dynamicUpdateBrokerConfig(nodeRef, brokerAdminClient, restartContext.brokerConfigDiff); + if (isControllerOnly) { + dynamicUpdateKafkaConfig(nodeRef, controllerAdminClient, restartContext.configDiff); + } else { + dynamicUpdateKafkaConfig(nodeRef, brokerAdminClient, restartContext.configDiff); + } updatedDynamically = true; } catch (ForceableProblem e) { LOGGER.debugCr(reconciliation, "Pod {} could not be updated dynamically ({}), will restart", nodeRef, e); @@ -546,7 +550,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) { restartContext.needsRestart = false; restartContext.needsReconfig = false; restartContext.forceRestart = true; - restartContext.brokerConfigDiff = null; + restartContext.configDiff = null; } /** @@ -570,7 +574,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont } boolean needsRestart = reasonToRestartPod.shouldRestart(); - KafkaBrokerConfigurationDiff brokerConfigDiff = null; + KafkaConfigurationDiff configDiff = null; boolean needsReconfig = false; if (isController) { @@ -591,34 +595,35 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont markRestartContextWithForceRestart(restartContext); return; } + } - // Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can - // connect to the broker and that it's capable of responding. - Config brokerConfig; - try { - brokerConfig = brokerConfig(nodeRef); - } catch (ForceableProblem e) { - if (restartContext.backOff.done()) { - needsRestart = true; - brokerConfig = null; - } else { - throw e; - } + // Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can + // connect to the broker and that it's capable of responding. + Config config; + try { + config = getConfig(nodeRef, isController && !isBroker); + } catch (ForceableProblem e) { + if (restartContext.backOff.done()) { + needsRestart = true; + restartContext.restartReasons.add(RestartReason.CONFIG_CHANGE_REQUIRES_RESTART); + config = null; + } else { + throw e; } + } - if (!needsRestart && allowReconfiguration) { - LOGGER.traceCr(reconciliation, "Pod {}: description {}", nodeRef, brokerConfig); - brokerConfigDiff = new KafkaBrokerConfigurationDiff(reconciliation, brokerConfig, kafkaConfigProvider.apply(nodeRef.nodeId()), kafkaVersion, nodeRef); + if (!needsRestart && allowReconfiguration) { + LOGGER.traceCr(reconciliation, "Pod {}: description {}", nodeRef, config); + configDiff = new KafkaConfigurationDiff(reconciliation, config, kafkaConfigProvider.apply(nodeRef.nodeId()), kafkaVersion, nodeRef, isController, isBroker); - if (brokerConfigDiff.getDiffSize() > 0) { - if (brokerConfigDiff.canBeUpdatedDynamically()) { - LOGGER.debugCr(reconciliation, "Pod {} needs to be reconfigured.", nodeRef); - needsReconfig = true; - } else { - LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, dynamic update cannot be done.", nodeRef); - restartContext.restartReasons.add(RestartReason.CONFIG_CHANGE_REQUIRES_RESTART); - needsRestart = true; - } + if (configDiff.getDiffSize() > 0) { + if (configDiff.canBeUpdatedDynamically()) { + LOGGER.debugCr(reconciliation, "Pod {} needs to be reconfigured.", nodeRef); + needsReconfig = true; + } else { + LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, dynamic update cannot be done.", nodeRef); + restartContext.restartReasons.add(RestartReason.CONFIG_CHANGE_REQUIRES_RESTART); + needsRestart = true; } } } @@ -626,23 +631,32 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont restartContext.needsRestart = needsRestart; restartContext.needsReconfig = needsReconfig; restartContext.forceRestart = false; - restartContext.brokerConfigDiff = brokerConfigDiff; + restartContext.configDiff = configDiff; } /** * Returns a config of the given broker. * @param nodeRef The reference of the broker. + * @param isPureController Whether this node is a pure controller (not a combined node) * @return the config of the given broker. */ - /* test */ Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException { + /* test */ Config getConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem, InterruptedException { ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId())); - return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), - 30, TimeUnit.SECONDS, - error -> new ForceableProblem("Error getting broker config", error) - ); + if (isPureController) { + return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, controllerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), + 30, TimeUnit.SECONDS, + error -> new ForceableProblem("Error getting controller config: " + error, error) + ); + } else { + return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), + 30, TimeUnit.SECONDS, + error -> new ForceableProblem("Error getting broker config: " + error, error) + ); + } + } - /* test */ void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff) + /* test */ void dynamicUpdateKafkaConfig(NodeRef nodeRef, Admin ac, KafkaConfigurationDiff configurationDiff) throws ForceableProblem, InterruptedException { Map> updatedPerBrokerConfig = new HashMap<>(2); Map> updatedClusterWideConfig = new HashMap<>(1); @@ -667,8 +681,8 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont }); await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerConfigFuture), 30, TimeUnit.SECONDS, error -> { - LOGGER.errorCr(reconciliation, "Error updating broker configuration for pod {}", nodeRef, error); - return new ForceableProblem("Error updating broker configuration for pod " + nodeRef, error); + LOGGER.errorCr(reconciliation, "Error updating Kafka configuration for pod {}", nodeRef, error); + return new ForceableProblem("Error updating Kafka configuration for pod " + nodeRef, error); }); LOGGER.infoCr(reconciliation, "Dynamic update of pod {} was successful.", nodeRef); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java index b3593e9bfae..fac5b63513b 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java @@ -61,7 +61,6 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; @@ -720,83 +719,4 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) { async.flag(); }))); } - - /** - * Tests how the KRaft controller-only nodes have their configuration changes tracked using a Pod annotations. The - * annotation on controller-only pods should change when the controller-relevant config is changed. On broker pods - * it should never change. To test this, the test does 3 reconciliations: - * - First initial one to establish the pods and collects the annotations - * - Second with change that is not relevant to controllers => annotations should be the same for all nodes as - * before - * - Third with change to a controller-relevant option => annotations for controller nodes should change, for - * broker nodes should be the same - * - * @param context Test context - */ - @Test - public void testReconcileWithControllerRelevantConfigChange(VertxTestContext context) { - Checkpoint async = context.checkpoint(); - - Map brokerConfigurationAnnotations = new HashMap<>(); - - operator.reconcile(new Reconciliation("initial-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME)) - .onComplete(context.succeeding(v -> context.verify(() -> { - // Collect the configuration annotations - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> brokerConfigurationAnnotations.put(pod.getMetadata().getName(), pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH))); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> brokerConfigurationAnnotations.put(pod.getMetadata().getName(), pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH))); - - // Update Kafka with dynamically changeable option that is not controller relevant => controller pod annotations should not change - Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME) - .edit(k -> new KafkaBuilder(k).editSpec().editKafka().addToConfig(Map.of("compression.type", "gzip")).endKafka().endSpec().build()); - }))) - .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))) - .onComplete(context.succeeding(v -> context.verify(() -> { - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Controller annotations be the same - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is(brokerConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Broker annotations should be the same - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is(brokerConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - // Update Kafka with dynamically changeable controller relevant option => controller pod annotations should change - Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME) - .edit(k -> new KafkaBuilder(k).editSpec().editKafka().addToConfig(Map.of("max.connections", "1000")).endKafka().endSpec().build()); - }))) - .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))) - .onComplete(context.succeeding(v -> context.verify(() -> { - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Controller annotations should differ - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is(not(brokerConfigurationAnnotations.get(pod.getMetadata().getName())))); - }); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Broker annotations should be the same - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is(brokerConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - async.flag(); - }))); - } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiffTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaConfigurationDiffTest.java similarity index 58% rename from cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiffTest.java rename to cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaConfigurationDiffTest.java index 24a87c32d72..1568b88af5e 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiffTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaConfigurationDiffTest.java @@ -32,9 +32,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.fail; -public class KafkaBrokerConfigurationDiffTest { +public class KafkaConfigurationDiffTest { KafkaVersion kafkaVersion = KafkaVersionTestUtils.getKafkaVersionLookup().defaultVersion(); - private final NodeRef nodeRef = new NodeRef("broker-0", 0, "broker", false, true); + private final NodeRef brokerNodeRef = new NodeRef("broker-0", 0, "broker", false, true); + private final NodeRef controllerNodeRef = new NodeRef("controller-1", 1, "controller", true, false); + private final NodeRef combinedNodeRef = new NodeRef("combined-2", 2, "combined", true, true); private ConfigEntry instantiateConfigEntry(String name, String val) { // use reflection to instantiate ConfigEntry @@ -86,7 +88,7 @@ private Config getCurrentConfiguration(List additional) { return new Config(entryList); } - private void assertConfig(KafkaBrokerConfigurationDiff kcd, ConfigEntry ce) { + private void assertConfig(KafkaConfigurationDiff kcd, ConfigEntry ce) { Collection brokerDiffConf = kcd.getConfigDiff(); long appearances = brokerDiffConf.stream().filter(entry -> entry.configEntry().name().equals(ce.name())).count(); Optional en = brokerDiffConf.stream().filter(entry -> entry.configEntry().name().equals(ce.name())).findFirst(); @@ -100,7 +102,8 @@ private void assertConfig(KafkaBrokerConfigurationDiff kcd, ConfigEntry ce) { public void testCustomPropertyAdded() { ArrayList ces = new ArrayList<>(); ces.add(new ConfigEntry("custom.property", "42")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(new ArrayList<>()), getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(new ArrayList<>()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); @@ -109,8 +112,8 @@ public void testCustomPropertyAdded() { @Test public void testCustomPropertyRemoved() { List ces = singletonList(new ConfigEntry("custom.property", "42")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(emptyList()), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(emptyList()), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); // custom changes are applied by changing STS @@ -119,8 +122,8 @@ public void testCustomPropertyRemoved() { @Test public void testCustomPropertyKept() { List ces = singletonList(new ConfigEntry("custom.property", "42")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -129,8 +132,8 @@ public void testCustomPropertyKept() { public void testCustomPropertyChanged() { List ces = singletonList(new ConfigEntry("custom.property", "42")); List ces2 = singletonList(new ConfigEntry("custom.property", "43")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces2), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces2), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -138,8 +141,8 @@ public void testCustomPropertyChanged() { @Test public void testChangedPresentValue() { List ces = singletonList(new ConfigEntry("min.insync.replicas", "1")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); assertConfig(kcd, new ConfigEntry("min.insync.replicas", "1")); @@ -148,45 +151,103 @@ public void testChangedPresentValue() { @Test public void testChangedPresentValueToDefault() { List ces = singletonList(new ConfigEntry("min.insync.replicas", "2")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @Test - public void testChangedKRaftControllerConfig() { + public void testChangedControllerConfigForBrokerNode() { List desiredControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "5000")); List currentControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "1000")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), - getDesiredConfiguration(desiredControllerConfig), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); } @Test - public void testChangedKRaftControllerConfigForCombinedNode() { - NodeRef combinedNodeId = new NodeRef("broker-0", 0, "broker", true, true); + public void testChangedControllerConfigForCombinedNode() { List desiredControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "5000")); List currentControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "1000")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), - getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeId); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeRef, true, true); assertThat(kcd.getDiffSize(), is(1)); + assertThat(kcd.canBeUpdatedDynamically(), is(false)); + } + + @Test + public void testChangedBrokerConfigForCombinedNode() { + List desiredControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "72")); + List currentControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "168")); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeRef, true, true); + assertThat(kcd.getDiffSize(), is(1)); + assertThat(kcd.canBeUpdatedDynamically(), is(false)); + } + + @Test + public void testChangedControllerConfigForControllerNode() { + List desiredControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "5000")); + List currentControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "1000")); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, controllerNodeRef, true, false); + assertThat(kcd.getDiffSize(), is(1)); + assertThat(kcd.canBeUpdatedDynamically(), is(false)); + } + + @Test + public void testChangedControllerDynamicConfig() { + List desiredControllerConfig = singletonList(new ConfigEntry("max.connections", "1000")); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, controllerNodeRef, true, false); + assertThat(kcd.getDiffSize(), is(1)); + assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @Test - public void testChangedAdvertisedListener() { + public void testChangedBrokerConfigForControllerNode() { + List desiredControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "72")); + List currentControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "168")); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, controllerNodeRef, true, false); + assertThat(kcd.getDiffSize(), is(0)); + } + + + @Test + public void testChangedAdvertisedListenerForBroker() { + List ces = singletonList(new ConfigEntry("advertised.listeners", "karel")); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); + assertThat(kcd.getDiffSize(), is(0)); + assertThat(kcd.canBeUpdatedDynamically(), is(true)); + } + + @Test + public void testChangedAdvertisedListenerForController() { List ces = singletonList(new ConfigEntry("advertised.listeners", "karel")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, controllerNodeRef, true, false); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } + @Test + public void testChangedAdvertisedListenerForCombined() { + List ces = singletonList(new ConfigEntry("advertised.listeners", "karel")); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, combinedNodeRef, true, true); + assertThat(kcd.getDiffSize(), is(0)); + assertThat(kcd.canBeUpdatedDynamically(), is(true)); + } + + @Test public void testChangedAdvertisedListenerFromNothingToDefault() { List ces = singletonList(new ConfigEntry("advertised.listeners", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -195,8 +256,8 @@ public void testChangedAdvertisedListenerFromNothingToDefault() { public void testChangedAdvertisedListenerFromNonDefaultToDefault() { // advertised listeners are filled after the pod started List ces = singletonList(new ConfigEntry("advertised.listeners", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -204,8 +265,8 @@ public void testChangedAdvertisedListenerFromNonDefaultToDefault() { @Test public void testChangedLogDirs() { List ces = singletonList(new ConfigEntry("log.dirs", "/var/lib/kafka/data/karel")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); assertConfig(kcd, new ConfigEntry("log.dirs", "/var/lib/kafka/data/karel")); @@ -214,8 +275,8 @@ public void testChangedLogDirs() { @Test public void testLogDirsNonDefaultToDefault() { List ces = singletonList(new ConfigEntry("log.dirs", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); assertConfig(kcd, new ConfigEntry("log.dirs", "null")); @@ -224,8 +285,8 @@ public void testLogDirsNonDefaultToDefault() { @Test public void testLogDirsDefaultToDefault() { List ces = singletonList(new ConfigEntry("log.dirs", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -233,8 +294,8 @@ public void testLogDirsDefaultToDefault() { @Test public void testUnchangedLogDirs() { List ces = singletonList(new ConfigEntry("log.dirs", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); @@ -243,8 +304,8 @@ public void testUnchangedLogDirs() { @Test public void testChangedInterBrokerListenerName() { List ces = singletonList(new ConfigEntry("inter.broker.listener.name", "david")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); } @@ -252,8 +313,8 @@ public void testChangedInterBrokerListenerName() { @Test public void testChangedListenerSecurityProtocolMap() { List ces = singletonList(new ConfigEntry("listener.security.protocol.map", "david")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -262,8 +323,8 @@ public void testChangedListenerSecurityProtocolMap() { public void testChangedListenerSecurityProtocolMapFromNonDefault() { List ces = singletonList(new ConfigEntry("listener.security.protocol.map", "REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT,TLS-9093:SSL,EXTERNAL-9094:SSL")); List ces2 = singletonList(new ConfigEntry("listener.security.protocol.map", "REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT,TLS-9093:SSL")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces2), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces2), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); assertConfig(kcd, new ConfigEntry("listener.security.protocol.map", "REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT,TLS-9093:SSL")); @@ -276,8 +337,8 @@ public void testChangedMoreProperties() { ces.add(new ConfigEntry("inter.broker.listener.name", "david")); ces.add(new ConfigEntry("group.min.session.timeout.ms", "42")); ces.add(new ConfigEntry("auto.create.topics.enable", "false")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(3)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); } @@ -286,8 +347,8 @@ public void testChangedMoreProperties() { public void testRemoveDefaultPropertyWhichIsNotDefault() { // it is not seen as default because the ConfigEntry.ConfigSource.DEFAULT_CONFIG is not set List ces = singletonList(new ConfigEntry("log.retention.hours", "168")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(emptyList()), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(emptyList()), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); } @@ -302,8 +363,8 @@ public void testClusterWideChanged() { new ConfigEntry("min.insync.replicas", "2"), new ConfigEntry("log.retention.bytes", "3000000") ); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(current), - getDesiredConfiguration(desired), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(current), + getDesiredConfiguration(desired), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(2)); assertThat(kcd.getConfigDiff(Scope.CLUSTER_WIDE).size(), is(2)); assertThat(kcd.getConfigDiff(Scope.PER_BROKER).size(), is(0)); @@ -323,8 +384,8 @@ public void testPropertiesAllScopesChanged() { new ConfigEntry("listener.security.protocol.map", "bar"), new ConfigEntry("auto.create.topics.enable", "false") ); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(current), - getDesiredConfiguration(desired), kafkaVersion, nodeRef); + KafkaConfigurationDiff kcd = new KafkaConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(current), + getDesiredConfiguration(desired), kafkaVersion, brokerNodeRef, false, true); assertThat(kcd.getDiffSize(), is(4)); assertThat(kcd.getConfigDiff(Scope.CLUSTER_WIDE).size(), is(2)); assertThat(kcd.getConfigDiff(Scope.PER_BROKER).size(), is(1)); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index ec8a58d8a74..1c75e54b0ed 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -523,6 +523,19 @@ public void testRollHandlesErrorWhenGettingCombinedConfig(VertxTestContext testC asList(0, 2, 3, 4, 1)); } + @Test + public void testRollHandlesErrorWhenGettingControllerConfig(VertxTestContext testContext) { + PodOperator podOps = mockPodOps(podId -> succeededFuture()); + TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addPodNames(0, 0, REPLICAS), podOps, + noException(), null, + noException(), noException(), podId -> podId == 1 ? new KafkaRoller.ForceableProblem("could not get config exception") : null, + brokerId -> succeededFuture(true), new DefaultAdminClientProvider(), new DefaultKafkaAgentClientProvider(), false, null, -1); + // The algorithm should carry on rolling the pods + doSuccessfulRollingRestart(testContext, kafkaRoller, + asList(0, 1, 2, 3, 4), + asList(0, 2, 3, 4, 1)); + } + @Test public void testRollHandlesErrorWhenGettingConfigFromController(VertxTestContext testContext) { int controller = 2; @@ -552,7 +565,7 @@ public void testRollHandlesErrorWhenAlteringConfig(VertxTestContext testContext) @Test public void testSuccessfulAlteringConfigNotRoll(VertxTestContext testContext) { PodOperator podOps = mockPodOps(podId -> succeededFuture()); - TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addPodNames(REPLICAS, 0, 0), podOps, + TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addPodNames(3, 3, 3), podOps, noException(), null, noException(), noException(), noException(), brokerId -> succeededFuture(true), new DefaultAdminClientProvider(), new DefaultKafkaAgentClientProvider(), false, null, -1); @@ -1027,7 +1040,7 @@ boolean deferController(NodeRef nodeRef, RestartContext restartContext) throws E } @Override - protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem { + protected Config getConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem { ForceableProblem problem = getConfigsException.apply(nodeRef.nodeId()); if (problem != null) { throw problem; @@ -1035,7 +1048,7 @@ protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem { } @Override - protected void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff) throws ForceableProblem { + protected void dynamicUpdateKafkaConfig(NodeRef nodeRef, Admin ac, KafkaConfigurationDiff configurationDiff) throws ForceableProblem { ForceableProblem problem = alterConfigsException.apply(nodeRef.nodeId()); if (problem != null) { throw problem;