From 83e37a93174bf1ca513f00efcdf49b77d0b26b14 Mon Sep 17 00:00:00 2001 From: souvikrakshit Date: Fri, 25 Jul 2025 12:45:07 +0530 Subject: [PATCH 01/16] make cloudwatch metrics enablement config driven --- terraform/kafka_runner/run-test.py | 4 +++- terraform/kafka_runner/util.py | 1 + vagrant/aws-packer.json | 12 ++++++++++-- vagrant/cloudwatch-agent-setup.sh | 7 +++++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/terraform/kafka_runner/run-test.py b/terraform/kafka_runner/run-test.py index 256bb590e3838..203dbdcad387e 100644 --- a/terraform/kafka_runner/run-test.py +++ b/terraform/kafka_runner/run-test.py @@ -264,7 +264,9 @@ def main(): ssh_account=ssh_account, instance_name=args.instance_name, jdk_version=args.jdk_version, - jdk_arch=args.jdk_arch + jdk_arch=args.jdk_arch, + JOB_ID=JOB_ID, + enable_cloudwatch=args.enable_cloudwatch ) else: logging.info(f"using existing ami: {args.existing_ami}") diff --git a/terraform/kafka_runner/util.py b/terraform/kafka_runner/util.py index e9426fbfc7372..0f341b2bf0395 100644 --- a/terraform/kafka_runner/util.py +++ b/terraform/kafka_runner/util.py @@ -170,6 +170,7 @@ def parse_args(): parser.add_argument("--jdk-arch", action="store", type=str, default="x64", help="JDK arch to execute."), parser.add_argument("--nightly", action="store_true", default=False, help="Mark this as a nightly run") + parser.add_argument("--enable-cloudwatch", action="store", type=parse_bool, default=False, help="Enable cloudwatch metrics if enabled") parser.add_argument("--new-globals", action="store", type=str, default=None, help="Additional global params to be passed in ducktape") parser.add_argument("--arm-image", action="store_true", help="load the ARM based image of specified distro") parser.add_argument("--existing-ami", action="store", type=str, default=None, help="AMI ID to use for the instance, skipping ami creation") diff --git a/vagrant/aws-packer.json b/vagrant/aws-packer.json index 7025202447e2e..de101dccbfe9e 100644 --- a/vagrant/aws-packer.json +++ b/vagrant/aws-packer.json @@ -47,15 +47,23 @@ "type": "shell", "inline": ["while [ ! -f /var/lib/cloud/instance/boot-finished ]; do echo 'Waiting for cloud-init...'; sleep 1; done"] }, + { + "type": "file", + "source": "../vagrant/cloudwatch-agent-configuration.json", + "destination": "/tmp/cloudwatch-agent-configuration.json" + }, { "environment_vars": [ "JDK_MAJOR={{ user `jdk_version` }}", - "JDK_ARCH={{ user `jdk_arch` }}" + "JDK_ARCH={{ user `jdk_arch` }}", + "SEMAPHORE_JOB_ID={{ user `JOB_ID` }}", + "ENABLE_CLOUDWATCH={{ user `enable_cloudwatch` }}" ], "execute_command": "echo 'packer' | {{ .Vars }} sudo -E -S bash '{{ .Path }}'", "type": "shell", "scripts": [ - "../vagrant/base.sh" + "../vagrant/base.sh", + "../vagrant/cloudwatch-agent-setup.sh" ] } ] diff --git a/vagrant/cloudwatch-agent-setup.sh b/vagrant/cloudwatch-agent-setup.sh index a3fde878d69ca..1ae6f3094d675 100644 --- a/vagrant/cloudwatch-agent-setup.sh +++ b/vagrant/cloudwatch-agent-setup.sh @@ -11,6 +11,13 @@ set -ex +if [ "${ENABLE_CLOUDWATCH,,}" = "false" ]; then + echo "Cloudwatch is disabled, skipping setup" + exit 0 +else + echo "CloudWatch setup enabled, installing amazon-cloudwatch-agent..." +fi + architecture=arm64 arch=$(uname -m) From 1d4b22bc3e6477eebaf07e191d59780872eec38a Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 25 Jul 2025 12:22:09 -0500 Subject: [PATCH 02/16] MINOR: Improve Kafka Streams Protocol Upgrade Doc (#20241) As a follow-up minor addition to PR for [KSTREAMS-7735](https://confluentinc.atlassian.net/browse/KSTREAMS-7735 ) (https://github.com/apache/kafka/pull/20029) , add the instructions for upgrading `streams.version` parameter for KIP-1071 EA. Reviewers: Andrew Schofield --- docs/streams/upgrade-guide.html | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 8816f3b933a35..58938c9b34787 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -213,6 +213,11 @@

Early Access of the Streams Rebalance Protocol

Set unstable.feature.versions.enable=true for controllers and brokers, and set unstable.api.versions.enable=true on the brokers as well. In your Kafka Streams application configuration, set group.protocol=streams. + Upgrade the `streams.version` feature to 1. To do this, run + kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature streams.version=1 + to upgrade, or use Admin#updateFeatures API to update `streams.version`. After the feature is updated, check + kafka-features.sh --bootstrap-server localhost:9092 describe + and `streams.version` should now have FinalizedVersionLevel 1.

From 9c83c6d1f30445267fadb51e861db8c00224518f Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Tue, 29 Jul 2025 07:28:30 -0500 Subject: [PATCH 03/16] MINOR: Delete the redundant feature upgrade instruction for running KIP-1071 EA (#20250) Follow up on https://github.com/apache/kafka/pull/20241. Delete the instruction that manually set `streams.version=1` for running Kafka 4.1 since it is already achieved in previous setup steps. Reviewers: Lucas Brutschy --- docs/streams/upgrade-guide.html | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 58938c9b34787..2230ffb8aaae9 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -213,9 +213,7 @@

Early Access of the Streams Rebalance Protocol

Set unstable.feature.versions.enable=true for controllers and brokers, and set unstable.api.versions.enable=true on the brokers as well. In your Kafka Streams application configuration, set group.protocol=streams. - Upgrade the `streams.version` feature to 1. To do this, run - kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature streams.version=1 - to upgrade, or use Admin#updateFeatures API to update `streams.version`. After the feature is updated, check + After the new feature is configured, check kafka-features.sh --bootstrap-server localhost:9092 describe and `streams.version` should now have FinalizedVersionLevel 1.

From 0179193b75fc727be64dbf7036acfa0179378287 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 1 Aug 2025 14:57:33 +0200 Subject: [PATCH 04/16] KAFKA-19529: State updater sensor names should be unique (#20262) (#20274) All state updater threads use the same metrics instance, but do not use unique names for their sensors. This can have the following symptoms: 1) Data inserted into one sensor by one thread can affect the metrics of all state updater threads. 2) If one state updater thread is shutdown, the metrics associated to all state updater threads are removed. 3) If one state updater thread is started, while another one is removed, it can happen that a metric is registered with the `Metrics` instance, but not associated to any `Sensor` (because it is concurrently removed), which means that the metric will not be removed upon shutdown. If a thread with the same name later tries to register the same metric, we may run into a `java.lang.IllegalArgumentException: A metric named ... already exists`, as described in the ticket. This change fixes the bug giving unique names to the sensors. A test is added that there is no interference of the removal of sensors and metrics during shutdown. Reviewers: Matthias J. Sax --- .../internals/DefaultStateUpdater.java | 52 ++++++++--------- .../processor/internals/StreamThread.java | 2 +- .../internals/DefaultStateUpdaterTest.java | 56 ++++++++++++++++++- 3 files changed, 82 insertions(+), 28 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 716d8c42ec5ff..a3a44f6f02d31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.TaskAndAction.Action; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -89,7 +90,7 @@ private class StateUpdaterThread extends Thread { private volatile KafkaFutureImpl clientInstanceIdFuture = new KafkaFutureImpl<>(); public StateUpdaterThread(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final ChangelogReader changelogReader) { super(name); this.changelogReader = changelogReader; @@ -745,7 +746,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t private final Time time; private final Logger log; private final String name; - private final Metrics metrics; + private final StreamsMetricsImpl metrics; private final Consumer restoreConsumer; private final ChangelogReader changelogReader; private final TopologyMetadata topologyMetadata; @@ -766,7 +767,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t private StateUpdaterThread stateUpdaterThread = null; public DefaultStateUpdater(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final StreamsConfig config, final Consumer restoreConsumer, final ChangelogReader changelogReader, @@ -1062,70 +1063,71 @@ private class StateUpdaterMetrics { private final Sensor standbyRestoreRatioSensor; private final Sensor checkpointRatioSensor; - private final Deque allSensorNames = new LinkedList<>(); + private final Deque allSensors = new LinkedList<>(); private final Deque allMetricNames = new LinkedList<>(); - private StateUpdaterMetrics(final Metrics metrics, final String threadId) { + private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) { final Map threadLevelTags = new LinkedHashMap<>(); threadLevelTags.put(THREAD_ID_TAG, threadId); + final Metrics metricsRegistry = metrics.metricsRegistry(); - MetricName metricName = metrics.metricName("active-restoring-tasks", + MetricName metricName = metricsRegistry.metricName("active-restoring-tasks", STATE_LEVEL_GROUP, "The number of active tasks currently undergoing restoration", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numRestoringActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-updating-tasks", + metricName = metricsRegistry.metricName("standby-updating-tasks", STATE_LEVEL_GROUP, "The number of standby tasks currently undergoing state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numUpdatingStandbyTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("active-paused-tasks", + metricName = metricsRegistry.metricName("active-paused-tasks", STATE_LEVEL_GROUP, "The number of active tasks paused restoring", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-paused-tasks", + metricName = metricsRegistry.metricName("standby-paused-tasks", STATE_LEVEL_GROUP, "The number of standby tasks paused state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedStandbyTasks() : 0); allMetricNames.push(metricName); - this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO); + this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("idle-ratio"); + allSensors.add(this.idleRatioSensor); - this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO); + this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("active-restore-ratio"); + allSensors.add(this.activeRestoreRatioSensor); - this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO); + this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("standby-update-ratio"); + allSensors.add(this.standbyRestoreRatioSensor); - this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO); + this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("checkpoint-ratio"); + allSensors.add(this.checkpointRatioSensor); - this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO); + this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate()); this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount())); - allSensorNames.add("restore-records"); + allSensors.add(this.restoreSensor); } void clear() { - while (!allSensorNames.isEmpty()) { - metrics.removeSensor(allSensorNames.pop()); + while (!allSensors.isEmpty()) { + metrics.removeSensor(allSensors.pop()); } while (!allMetricNames.isEmpty()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 89cc988313bdf..ad66a822e7269 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -646,7 +646,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; final StateUpdater stateUpdater = new DefaultStateUpdater( name, - streamsMetrics.metricsRegistry(), + streamsMetrics, streamsConfig, restoreConsumer, changelogReader, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index abb128698a081..b6d41966257a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; @@ -105,7 +106,7 @@ class DefaultStateUpdaterTest { // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); - private final Metrics metrics = new Metrics(time); + private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); @@ -1680,8 +1681,59 @@ public void shouldRecordMetrics() throws Exception { assertThat(metrics.metrics().size(), is(1)); } + @Test + public void shouldRemoveMetricsWithoutInterference() { + final DefaultStateUpdater stateUpdater2 = + new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time); + final List threadMetrics = getMetricNames("test-state-updater"); + final List threadMetrics2 = getMetricNames("test-state-updater2"); + + stateUpdater.start(); + stateUpdater2.start(); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + + stateUpdater2.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + + stateUpdater.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + } + + private static List getMetricNames(final String threadId) { + final Map tagMap = Map.of("thread-id", threadId); + return List.of( + new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap) + ); + } + @SuppressWarnings("unchecked") - private static void verifyMetric(final Metrics metrics, + private static void verifyMetric(final StreamsMetricsImpl metrics, final MetricName metricName, final Matcher matcher) { assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description())); From cdc7a4e2b737a614259003028984fce76d6337e7 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 5 Aug 2025 00:22:13 +0800 Subject: [PATCH 05/16] MINOR: improve the min.insync.replicas doc (#20237) Along with the change: https://github.com/apache/kafka/pull/17952 ([KIP-966](https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas)), the semantics of `min.insync.replicas` config has small change, and add some constraints. We should document them clearly. Reviewers: Jun Rao , Calvin Liu , Mickael Maison , Paolo Patierno , Federico Valeri , Chia-Ping Tsai --- .../org/apache/kafka/common/config/TopicConfig.java | 3 ++- docs/ops.html | 13 ++++++++++--- docs/upgrade.html | 2 ++ .../controller/ConfigurationControlManager.java | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 08fad4535685e..77c476b49462d 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -181,7 +181,8 @@ public class TopicConfig { "When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. " + "A typical scenario would be to create a topic with a replication factor of 3, " + "set min.insync.replicas to 2, and produce with acks of \"all\". " + - "This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers."; + "This ensures that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers." + + "

Note that when the Eligible Leader Replicas feature is enabled, the semantics of this config changes. Please refer to the ELR section for more info.

"; public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " + diff --git a/docs/ops.html b/docs/ops.html index 529bce159bb1e..be0bfe89e8bf9 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4499,9 +4499,16 @@

Tool

-

The ELR fields can be checked through the API DescribeTopicPartitions. The admin client can fetch the ELR info by describing the topics. - Also note that, if min.insync.replicas is updated for a topic, the ELR field will be cleaned. If cluster default min ISR is updated, - all the ELR fields will be cleaned.

+

The ELR fields can be checked through the API DescribeTopicPartitions. The admin client can fetch the ELR info by describing the topics.

+

Note that when the ELR feature is enabled:

+
    +
  • The cluster-level min.insync.replicas config will be added if there is not any. The value is the same as the static config in the active controller.
  • +
  • The removal of min.insync.replicas config at the cluster-level is not allowed.
  • +
  • If the cluster-level min.insync.replicas is updated, even if the value is unchanged, all the ELR state will be cleaned.
  • +
  • The previously set min.insync.replicas value at the broker-level config will be removed. Please set at the cluster-level if necessary.
  • +
  • The alteration of min.insync.replicas config at the broker-level is not allowed.
  • +
  • If min.insync.replicas is updated for a topic, the ELR state will be cleaned.
  • +
diff --git a/docs/upgrade.html b/docs/upgrade.html index 1be3a20faa7a7..0b4017eca785c 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -48,6 +48,8 @@
Notable changes in 4
  • The KIP-966 part 1: Eligible Leader Replicas(ELR) will be enabled by default on the new clusters. + After the ELR feature enabled, the previously set min.insync.replicas value at the broker-level config will be removed. + Please set at the cluster-level if necessary. For further details, please refer to here.
  • diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index b367fda8114d5..d64695bd39b52 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -613,7 +613,7 @@ int getStaticallyConfiguredMinInsyncReplicas() { /** * Generate any configuration records that are needed to make it safe to enable ELR. - * Specifically, we need to remove all cluster-level configurations for min.insync.replicas, + * Specifically, we need to remove all broker-level configurations for min.insync.replicas, * and create a cluster-level configuration for min.insync.replicas. It is always safe to call * this function if ELR is already enabled; it will simply do nothing if the necessary * configurations already exist. From 7722cff6ceab350df802d7670b0530e71b35724f Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Thu, 19 Jun 2025 17:18:58 +0800 Subject: [PATCH 06/16] MINOR: The upgrade.html file contains duplicate IDs on the same page (#19996) According to correct HTML syntax, IDs on the same page should be unique, so we should fix this. Reviewers: TengYao Chi --- docs/upgrade.html | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 0b4017eca785c..f9bf9a135adbc 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -21,8 +21,8 @@

    Upgrading to 4.1.0

    -
    Upgrading Servers to 4.1.0 from any version 3.3.x through 4.0.x
    -
    Notable changes in 4.1.0
    +
    Upgrading Servers to 4.1.0 from any version 3.3.x through 4.0.x
    +
    Notable changes in 4.1.0
    • Apache Kafka 4.1 ships with a preview of Queues for Kafka (KIP-932). This feature introduces a new kind of group called @@ -84,7 +84,7 @@
      Notable changes in 4
    -

    Upgrading to 4.0.0

    +

    Upgrading to 4.0.0

    Upgrading Clients to 4.0.0
    From fc030b411c3881e752856c443c7b14f393d4c46e Mon Sep 17 00:00:00 2001 From: Jared Harley Date: Mon, 4 Aug 2025 22:22:54 -0600 Subject: [PATCH 07/16] KAFKA-19576 Fix typo in state-change log filename after rotate (#20269) The `state-change.log` file is being incorrectly rotated to `stage-change.log.[date]`. This change fixes the typo to have the log file correctly rotated to `state-change.log.[date]` _No functional changes._ Reviewers: Mickael Maison , Christo Lolov , Luke Chen , Ken Huang , TengYao Chi , Chia-Ping Tsai --- config/log4j2.yaml | 2 +- docs/upgrade.html | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/config/log4j2.yaml b/config/log4j2.yaml index 49bcf78d136c6..de263c57c928e 100644 --- a/config/log4j2.yaml +++ b/config/log4j2.yaml @@ -44,7 +44,7 @@ Configuration: # State Change appender - name: StateChangeAppender fileName: "${sys:kafka.logs.dir}/state-change.log" - filePattern: "${sys:kafka.logs.dir}/stage-change.log.%d{yyyy-MM-dd-HH}" + filePattern: "${sys:kafka.logs.dir}/state-change.log.%d{yyyy-MM-dd-HH}" PatternLayout: pattern: "${logPattern}" TimeBasedTriggeringPolicy: diff --git a/docs/upgrade.html b/docs/upgrade.html index f9bf9a135adbc..257f14e40b04d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -37,6 +37,9 @@
    Notable changes in 4 The logger class name for LogCleaner has been updated from kafka.log.LogCleaner to org.apache.kafka.storage.internals.log.LogCleaner in the log4j2.yaml configuration file. Added loggers for org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread and org.apache.kafka.storage.internals.log.Cleaner classes to CleanerAppender. +
  • + The filename for rotated state-change.log files has been updated from stage-change.log.[date] to state-change.log.[date] in the log4j2.yaml configuration file. +
  • Broker @@ -84,9 +87,9 @@
    Notable changes in 4
  • -

    Upgrading to 4.0.0

    +

    Upgrading to 4.0.1

    -
    Upgrading Clients to 4.0.0
    +
    Upgrading Clients to 4.0.1

    For a rolling upgrade:

    @@ -97,7 +100,7 @@
    Upgrading Client or KIP-1124. -
    Upgrading Servers to 4.0.0 from any version 3.3.x through 3.9.x
    +
    Upgrading Servers to 4.0.1 from any version 3.3.x through 3.9.x

    Note: Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. As such, broker upgrades to 4.0.0 (and higher) require KRaft mode and the software and metadata versions must be at least 3.3.x (the first version when KRaft mode was deemed production ready). For clusters in KRaft mode @@ -121,7 +124,13 @@

    Upgrading Servers to 4.0.0 from has a boolean parameter that indicates if there are metadata changes (i.e. IBP_4_0_IV1(23, "4.0", "IV1", true) means this version has metadata changes). Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between. - +
    Notable changes in 4.0.1
    +
      +
    • + The filename for rotated state-change.log files has been updated from stage-change.log.[date] to state-change.log.[date] in the log4j2.yaml configuration file. + See KAFKA-19576 for details. +
    • +
    Notable changes in 4.0.0
    +
  • + The filename for rotated state-change.log files incorrectly rotates to stage-change.log.[date] (changing state to stage). This issue is corrected in 4.0.1. + See KAFKA-19576 for details. +
  • From de16dd103af93bb68a329987ff19469941f85cbc Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 28 Jul 2025 15:19:36 +0200 Subject: [PATCH 08/16] KAFKA-19581: Temporary fix for Streams system tests --- tests/kafkatest/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index cb5f70b45818f..cf88ea4cfb94f 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -110,7 +110,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("4.1.0") LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java From 6340f437cd2d15be4180febb9505437266080002 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 5 Aug 2025 13:01:40 +0200 Subject: [PATCH 09/16] Revert "Bump version to 4.1.0" This reverts commit e14d849cbf8836cc9e4a592342baf19a1fbd93c9. --- gradle.properties | 2 +- streams/quickstart/java/pom.xml | 2 +- .../java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gradle.properties b/gradle.properties index 34e605f5a5469..52bf115819de5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=4.1.0 +version=4.1.0-SNAPSHOT scalaVersion=2.13.16 # Adding swaggerVersion in gradle.properties to have a single version in place for swagger swaggerVersion=2.2.25 diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 2d85d3c6a6927..9e6d406898c1a 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 4.1.0 + 4.1.0-SNAPSHOT .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index b4b2ce160ae89..133d0ee951e5d 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 4.1.0 + 4.1.0-SNAPSHOT 2.0.16 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index b452399ae02b2..021e9d0c0d6f9 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 4.1.0 + 4.1.0-SNAPSHOT Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index dd8e97fa30ce0..99c9dfe22e7f8 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '4.1.0' +__version__ = '4.1.0.dev0' From 23b64404ae7ba98d89a2d456991abaf2f32af35f Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 5 Aug 2025 14:29:00 +0200 Subject: [PATCH 10/16] Bump version to 4.1.0 --- gradle.properties | 2 +- streams/quickstart/java/pom.xml | 2 +- .../java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- tests/kafkatest/version.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gradle.properties b/gradle.properties index 52bf115819de5..34e605f5a5469 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=4.1.0-SNAPSHOT +version=4.1.0 scalaVersion=2.13.16 # Adding swaggerVersion in gradle.properties to have a single version in place for swagger swaggerVersion=2.2.25 diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 9e6d406898c1a..2d85d3c6a6927 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 4.1.0-SNAPSHOT + 4.1.0 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 133d0ee951e5d..b4b2ce160ae89 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 4.1.0-SNAPSHOT + 4.1.0 2.0.16 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 021e9d0c0d6f9..b452399ae02b2 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 4.1.0-SNAPSHOT + 4.1.0 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 99c9dfe22e7f8..dd8e97fa30ce0 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '4.1.0.dev0' +__version__ = '4.1.0' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index cf88ea4cfb94f..cb5f70b45818f 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -110,7 +110,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("4.1.0") +DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT") LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java From d9be929f4acab066f84c6addf37ef24142a06d10 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 5 Aug 2025 14:27:09 -0700 Subject: [PATCH 11/16] MINOR: add missing section to TOC (#20305) Add new group coordinator metrics section to TOC. Reviewers: Chia-Ping Tsai --- docs/toc.html | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/toc.html b/docs/toc.html index 032228de3d219..304bd1c8a53c1 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -153,6 +153,7 @@
  • 6.7 Monitoring