Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,149 +44,6 @@ public class KafkaConfiguration extends AbstractConfiguration {
DEFAULTS.put("min.insync.replicas", "1");
}

/**
* List of configuration options that are relevant to controllers and should be considered when deciding whether
* a controller-only node needs to be rolled or not.
*/
private static final Set<String> CONTROLLER_RELEVANT_CONFIGS = Set.of(
"alter.config.policy.class.name",
"authorizer.class.name",
"auto.create.topics.enable",
"background.threads",
"broker.heartbeat.interval.ms",
"broker.rack",
"broker.session.timeout.ms",
"connection.failed.authentication.delay.ms",
"connections.max.idle.ms",
"connections.max.reauth.ms",
"controlled.shutdown.enable",
"controlled.shutdown.max.retries",
"controlled.shutdown.retry.backoff.ms",
"controller.listener.names",
"controller.quorum.append.linger.ms",
"controller.quorum.election.backoff.max.ms",
"controller.quorum.election.timeout.ms",
"controller.quorum.fetch.timeout.ms",
"controller.quorum.request.timeout.ms",
"controller.quorum.retry.backoff.ms",
"controller.quorum.voters",
"controller.quota.window.num",
"controller.quota.window.size.seconds",
"controller.socket.timeout.ms",
"create.topic.policy.class.name",
"default.replication.factor",
"delete.topic.enable",
"early.start.listeners",
"kafka.metrics.polling.interval.secs",
"kafka.metrics.reporters",
"leader.imbalance.check.interval.seconds",
"leader.imbalance.per.broker.percentage",
"listener.name.controlplane-9090.ssl.keystore.location",
"listener.name.controlplane-9090.ssl.keystore.password",
"listener.name.controlplane-9090.ssl.keystore.type",
"listener.name.controlplane-9090.ssl.truststore.location",
"listener.name.controlplane-9090.ssl.truststore.password",
"listener.name.controlplane-9090.ssl.truststore.type",
"listener.name.controlplane-9090.ssl.client.auth",
"listener.security.protocol.map",
"listeners",
"log.dir",
"log.dirs",
"min.insync.replicas",
"max.connection.creation.rate",
"max.connections.per.ip.overrides",
"max.connections.per.ip",
"max.connections",
"metadata.log.dir",
"metadata.log.max.record.bytes.between.snapshots",
"metadata.log.max.snapshot.interval.ms",
"metadata.log.segment.bytes",
"metadata.log.segment.min.bytes",
"metadata.log.segment.ms",
"metadata.max.idle.interval.ms",
"metadata.max.retention.bytes",
"metadata.max.retention.ms",
"metric.reporters",
"metrics.num.samples",
"metrics.recording.level",
"metrics.sample.window.ms",
"node.id",
"num.io.threads",
"num.network.threads",
"num.partitions",
"offsets.topic.replication.factor",
"principal.builder.class",
"process.roles",
"remote.log.storage.system.enable",
"replica.selector.class",
"reserved.broker.max.id",
"sasl.enabled.mechanisms",
"sasl.kerberos.kinit.cmd",
"sasl.kerberos.min.time.before.relogin",
"sasl.kerberos.principal.to.local.rules",
"sasl.kerberos.service.name",
"sasl.kerberos.ticket.renew.jitter",
"sasl.kerberos.ticket.renew.window.factor",
"sasl.login.callback.handler.class",
"sasl.login.class",
"sasl.login.connect.timeout.ms",
"sasl.login.read.timeout.ms",
"sasl.login.refresh.buffer.seconds",
"sasl.login.refresh.min.period.seconds",
"sasl.login.refresh.window.factor",
"sasl.login.refresh.window.jitter",
"sasl.login.retry.backoff.max.ms",
"sasl.login.retry.backoff.ms",
"sasl.mechanism.controller.protocol",
"sasl.oauthbearer.clock.skew.seconds",
"sasl.oauthbearer.expected.audience",
"sasl.oauthbearer.expected.issuer",
"sasl.oauthbearer.jwks.endpoint.refresh.ms",
"sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms",
"sasl.oauthbearer.jwks.endpoint.retry.backoff.ms",
"sasl.oauthbearer.jwks.endpoint.url",
"sasl.oauthbearer.scope.claim.name",
"sasl.oauthbearer.sub.claim.name",
"sasl.oauthbearer.token.endpoint.url",
"sasl.server.callback.handler.class",
"sasl.server.max.receive.size",
"security.providers",
"server.max.startup.time.ms",
"socket.connection.setup.timeout.max.ms",
"socket.connection.setup.timeout.ms",
"socket.listen.backlog.size",
"socket.receive.buffer.bytes",
"socket.request.max.bytes",
"socket.send.buffer.bytes",
"ssl.cipher.suites",
"ssl.client.auth",
"ssl.enabled.protocols",
"ssl.endpoint.identification.algorithm",
"ssl.engine.factory.class",
"ssl.key.password",
"ssl.keymanager.algorithm",
"ssl.keystore.certificate.chain",
"ssl.keystore.key",
"ssl.keystore.location",
"ssl.keystore.password",
"ssl.keystore.type",
"ssl.principal.mapping.rules",
"ssl.protocol",
"ssl.provider",
"ssl.secure.random.implementation",
"ssl.trustmanager.algorithm",
"ssl.truststore.certificates",
"ssl.truststore.location",
"ssl.truststore.password",
"ssl.truststore.type",
"super.users",
"transaction.state.log.min.isr",
"transaction.state.log.replication.factor",
"queued.max.requests",
"queued.max.requests.bytes",
"unclean.leader.election.enable"
);

/**
* Copy constructor which creates new instance of the Kafka Configuration from existing configuration. It is
* useful when you need to modify an instance of the configuration without permanently changing the original.
Expand Down Expand Up @@ -287,24 +144,6 @@ public Set<String> unknownConfigsWithValues(KafkaVersion kafkaVersion) {
return result;
}

/**
* Return the config properties with their values in this KafkaConfiguration which are known to be relevant for the
* Kafka controller nodes.
*
* @return The configuration options relevant for controllers
*/
public Set<String> controllerConfigsWithValues() {
Set<String> result = new HashSet<>();

for (Map.Entry<String, String> e :this.asOrderedProperties().asMap().entrySet()) {
if (CONTROLLER_RELEVANT_CONFIGS.contains(e.getKey())) {
result.add(e.getKey() + "=" + e.getValue());
}
}

return result;
}

/**
* @return True if the configuration is empty. False otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,6 @@ protected Future<Void> perBrokerKafkaConfiguration(MetricsAndLogging metricsAndL
// We collect the configuration options related to various plugins
nodeConfiguration += kc.unknownConfigsWithValues(kafka.getKafkaVersion()).toString();

// We collect the information relevant to controller-only nodes
if (pool.isController() && !pool.isBroker()) {
// For controllers only, we extract the controller-relevant configurations and use it in the configuration annotations
nodeConfiguration = kc.controllerConfigsWithValues().toString();
}

// We store hash of the broker configurations for later use in Pod and in rolling updates
this.brokerConfigurationHash.put(nodeId, Util.hashStub(nodeConfiguration));

Expand Down
Loading
Loading