Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
83e37a9
make cloudwatch metrics enablement config driven
rakshit-souvik Jul 25, 2025
1d4b22b
MINOR: Improve Kafka Streams Protocol Upgrade Doc (#20241)
lucliu1108 Jul 25, 2025
d1ffff4
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Jul 26, 2025
9c83c6d
MINOR: Delete the redundant feature upgrade instruction for running K…
lucliu1108 Jul 29, 2025
0fbfc94
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Jul 29, 2025
0179193
KAFKA-19529: State updater sensor names should be unique (#20262) (#2…
lucasbru Aug 1, 2025
87b60a5
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 1, 2025
cdc7a4e
MINOR: improve the min.insync.replicas doc (#20237)
showuon Aug 4, 2025
37fe08d
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 4, 2025
7722cff
MINOR: The upgrade.html file contains duplicate IDs on the same page …
m1a2st Jun 19, 2025
fc030b4
KAFKA-19576 Fix typo in state-change log filename after rotate (#20269)
jaredharley Aug 5, 2025
b4c5cc3
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 5, 2025
de16dd1
KAFKA-19581: Temporary fix for Streams system tests
mimaison Jul 28, 2025
6340f43
Revert "Bump version to 4.1.0"
mimaison Aug 5, 2025
23b6440
Bump version to 4.1.0
mimaison Aug 5, 2025
73f235f
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 5, 2025
d9be929
MINOR: add missing section to TOC (#20305)
mjsax Aug 5, 2025
a1589f4
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 5, 2025
b6a0778
Change ci_tools import path (#1746)
omkreddy Aug 6, 2025
969a490
DPA-1801 Add run_tags to worker-ami and aws-packer (#1765)
rakshit-souvik Aug 6, 2025
f61a5a5
MINOR: Remove SPAM URL in Streams Documentation (#20321)
rauwuckl Aug 8, 2025
3e3d1b5
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 13, 2025
2dbed66
KAFKA-15307: Kafka Streams configuration docs outdated (#20329)
shashankhs11 Aug 17, 2025
4675ebb
Merge branch '4.1' of https://github.com/confluentinc/kafka into 4.1
semaphore-agent-production[bot] Aug 17, 2025
ea7dca9
make cloudwatch metrics enablement config driven
rakshit-souvik Jul 25, 2025
f11c1ed
Merge remote-tracking branch 'origin/4.1_cw_param' into 4.1_cw_param
rakshit-souvik Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import subprocess
import sys

from confluent.ci.scripts.ci_utils import run_cmd, regex_replace, replace
from ci_tools.utils import run_cmd, regex_replace, replace
logging.basicConfig(level=logging.INFO, format='%(message)s')
log = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public class TopicConfig {
"When used together, <code>min.insync.replicas</code> and <code>acks</code> allow you to enforce greater durability guarantees. " +
"A typical scenario would be to create a topic with a replication factor of 3, " +
"set <code>min.insync.replicas</code> to 2, and produce with <code>acks</code> of \"all\". " +
"This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers.";
"This ensures that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers." +
"<p>Note that when the Eligible Leader Replicas feature is enabled, the semantics of this config changes. Please refer to <a href=\"#eligible_leader_replicas\">the ELR section</a> for more info.</p>";

public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
Expand Down
2 changes: 1 addition & 1 deletion config/log4j2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Configuration:
# State Change appender
- name: StateChangeAppender
fileName: "${sys:kafka.logs.dir}/state-change.log"
filePattern: "${sys:kafka.logs.dir}/stage-change.log.%d{yyyy-MM-dd-HH}"
filePattern: "${sys:kafka.logs.dir}/state-change.log.%d{yyyy-MM-dd-HH}"
PatternLayout:
pattern: "${logPattern}"
TimeBasedTriggeringPolicy:
Expand Down
13 changes: 10 additions & 3 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -4499,9 +4499,16 @@ <h4 class="anchor-heading"><a id="eligible_leader_replicas_upgrade" class="ancho
<p>Downgrades are safe to perform by setting <code>eligible.leader.replicas.version=0</code>.</p>

<h4 class="anchor-heading"><a id="eligible_leader_replicas_tool" class="anchor-link"></a><a href="#eligible_leader_replicas_tool">Tool</a></h4>
<p>The ELR fields can be checked through the API DescribeTopicPartitions. The admin client can fetch the ELR info by describing the topics.
Also note that, if <code>min.insync.replicas</code> is updated for a topic, the ELR field will be cleaned. If cluster default min ISR is updated,
all the ELR fields will be cleaned.</p>
<p>The ELR fields can be checked through the API DescribeTopicPartitions. The admin client can fetch the ELR info by describing the topics.</p>
<p>Note that when the ELR feature is enabled:</p>
<ul>
<li>The cluster-level <code>min.insync.replicas</code> config will be added if there is not any. The value is the same as the static config in the active controller.</li>
<li>The removal of <code>min.insync.replicas</code> config at the cluster-level is not allowed.</li>
<li>If the cluster-level <code>min.insync.replicas</code> is updated, even if the value is unchanged, all the ELR state will be cleaned.</li>
<li>The previously set <code>min.insync.replicas</code> value at the broker-level config will be removed. Please set at the cluster-level if necessary.</li>
<li>The alteration of <code>min.insync.replicas</code> config at the broker-level is not allowed.</li>
<li>If <code>min.insync.replicas</code> is updated for a topic, the ELR state will be cleaned.</li>
</ul>

</script>

Expand Down
2 changes: 1 addition & 1 deletion docs/streams/core-concepts.html
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ <h2 class="anchor-heading"><a id="streams_processing_guarantee" class="anchor-li
<p>
In stream processing, one of the most frequently asked question is "does my stream processing system guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing?"
Failing to guarantee exactly-once stream processing is a deal-breaker for many applications that cannot tolerate any data-loss or data duplicates, and in that case a batch-oriented framework is usually used in addition
to the stream processing pipeline, known as the <a href="http://lambda-architecture.net/">Lambda Architecture</a>.
to the stream processing pipeline, known as the <a href="https://en.wikipedia.org/wiki/Lambda_architecture">Lambda Architecture</a>.
Prior to 0.11.0.0, Kafka only provides at-least-once delivery guarantees and hence any stream processing systems that leverage it as the backend storage could not guarantee end-to-end exactly-once semantics.
In fact, even for those stream processing systems that claim to support exactly-once processing, as long as they are reading from / writing to Kafka as the source / sink, their applications cannot actually guarantee that
no duplicates will be generated throughout the pipeline.<br />
Expand Down
34 changes: 19 additions & 15 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,12 @@ <h4><a class="toc-backref" href="#id45">num.standby.replicas</a><a class="header
<tr class="row-even"><td>commit.interval.ms</td>
<td>Low</td>
<td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td>
<td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td>
<td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds) (at-least-once) / <code class="docutils literal"><span class="pre">100</span></code> (exactly-once)</td>
</tr>
<tr class="row-odd"><td>default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.)</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
<td><code class="docutils literal"><span class="pre">LogAndFailExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>default.key.serde</td>
<td>Medium</td>
Expand All @@ -328,11 +328,10 @@ <h4><a class="toc-backref" href="#id45">num.standby.replicas</a><a class="header
set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>default.dsl.store</td>
<tr class="row-even"><td>default.dsl.store (Deprecated. Use dsl.store.suppliers.class instead.)</td>
<td>Low</td>
<td colspan="2">
[DEPRECATED] The default state store type used by DSL operators. Deprecated in
favor of <code>dsl.store.suppliers.class</code>
The default state store type used by DSL operators.
</td>
<td><code class="docutils literal"><span class="pre">"ROCKS_DB"</span></code></td>
</tr>
Expand Down Expand Up @@ -492,54 +491,59 @@ <h4><a class="toc-backref" href="#id45">num.standby.replicas</a><a class="header
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
</tr>
<tr class="row-odd"><td>retry.backoff.ms</td>
<tr class="row-odd"><td>repartition.purge.interval.ms</td>
<td>Low</td>
<td colspan="2">The frequency in milliseconds with which to delete fully consumed records from repartition topics. Purging will occur after at least this value since the last purge, but may be delayed until later.</td>
<td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td>
</tr>
<tr class="row-even"><td>retry.backoff.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-even"><td>rocksdb.config.setter</td>
<tr class="row-odd"><td>rocksdb.config.setter</td>
<td>Medium</td>
<td colspan="2">The RocksDB configuration.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>state.cleanup.delay.ms</td>
<tr class="row-even"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td><code class="docutils literal"><span class="pre">600000</span></code></td> (10 minutes)</td>
</tr>
<tr class="row-even"><td>state.dir</td>
<tr class="row-odd"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
<tr class="row-odd"><td>task.assignor.class</td>
<tr class="row-even"><td>task.assignor.class</td>
<td>Medium</td>
<td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td>
</tr>
<tr class="row-even"><td>task.timeout.ms</td>
<tr class="row-odd"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td><code class="docutils literal"><span class="pre">300000</span></code></td> (5 minutes)</td>
</tr>
<tr class="row-odd"><td>topology.optimization</td>
<tr class="row-even"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<td><code class="docutils literal"><span class="pre">"NO_OPTIMIZATION"</span></code></td>
</tr>
<tr class="row-even"><td>upgrade.from</td>
<tr class="row-odd"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade.
See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
</tr>
<tr class="row-even"><td>window.size.ms</td>
<tr class="row-odd"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
Expand Down
3 changes: 3 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ <h4>Early Access of the Streams Rebalance Protocol</h4>
Set <code>unstable.feature.versions.enable=true</code> for controllers and brokers, and
set <code>unstable.api.versions.enable=true</code> on the brokers as well. In your Kafka Streams application
configuration, set <code>group.protocol=streams</code>.
After the new feature is configured, check
<code>kafka-features.sh --bootstrap-server localhost:9092 describe</code>
and `streams.version` should now have FinalizedVersionLevel 1.
</p>

<p>
Expand Down
1 change: 1 addition & 0 deletions docs/toc.html
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
<li><a href="#monitoring">6.7 Monitoring</a>
<ul>
<li><a href="#remote_jmx">Security Considerations for Remote Monitoring using JMX</a>
<li><a href="#group_coordinator_monitoring">Group Coordinator Monitoring</a>
<li><a href="#tiered_storage_monitoring">Tiered Storage Monitoring</a>
<li><a href="#kraft_monitoring">KRaft Monitoring</a>
<li><a href="#selector_monitoring">Selector Monitoring</a>
Expand Down
29 changes: 22 additions & 7 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

<h5><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading Servers to 4.1.0 from any version 3.3.x through 4.0.x</a></h5>
<h6><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4.1.0</a></h6>
<h5><a id="upgrade_4_1_0_from" href="#upgrade_4_1_0_from">Upgrading Servers to 4.1.0 from any version 3.3.x through 4.0.x</a></h5>
<h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4.1.0</a></h5>
<ul>
<li>
Apache Kafka 4.1 ships with a preview of Queues for Kafka (<a href="https://cwiki.apache.org/confluence/x/4hA0Dw">KIP-932</a>). This feature introduces a new kind of group called
Expand All @@ -37,6 +37,9 @@ <h6><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4
The logger class name for LogCleaner has been updated from <code>kafka.log.LogCleaner</code> to <code>org.apache.kafka.storage.internals.log.LogCleaner</code> in the log4j2.yaml configuration file.
Added loggers for <code>org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread</code> and <code>org.apache.kafka.storage.internals.log.Cleaner</code> classes to CleanerAppender.
</li>
<li>
The filename for rotated <code>state-change.log</code> files has been updated from <code>stage-change.log.[date]</code> to <code>state-change.log.[date]</code> in the log4j2.yaml configuration file.
</li>
</ul>
</li>
<li><b>Broker</b>
Expand All @@ -48,6 +51,8 @@ <h6><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4
</li>
<li>
The KIP-966 part 1: Eligible Leader Replicas(ELR) will be enabled by default on the new clusters.
After the ELR feature enabled, the previously set <code>min.insync.replicas</code> value at the broker-level config will be removed.
Please set at the cluster-level if necessary.
For further details, please refer to <a href="/{{version}}/documentation.html#eligible_leader_replicas">here</a>.
</li>
</ul>
Expand Down Expand Up @@ -82,9 +87,9 @@ <h6><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4
</li>
</ul>

<h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0</a></h4>
<h4><a id="upgrade_4_0_1_from" href="#upgrade_4_0_1_from">Upgrading to 4.0.1</a></h4>

<h5><a id="upgrade_clients_4_0_0" href="#upgrade_clients_4_0_0">Upgrading Clients to 4.0.0</a></h5>
<h5><a id="upgrade_clients_4_0_1" href="#upgrade_clients_4_0_1">Upgrading Clients to 4.0.1</a></h5>

<p><b>For a rolling upgrade:</b></p>

Expand All @@ -95,7 +100,7 @@ <h5><a id="upgrade_clients_4_0_0" href="#upgrade_clients_4_0_0">Upgrading Client
or <a href="https://cwiki.apache.org/confluence/x/y4kgF">KIP-1124</a>.</li>
</ol>

<h5><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading Servers to 4.0.0 from any version 3.3.x through 3.9.x</a></h5>
<h5><a id="upgrade_4_0_1" href="#upgrade_4_0_1">Upgrading Servers to 4.0.1 from any version 3.3.x through 3.9.x</a></h5>

<p>Note: Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. As such, <b>broker upgrades to 4.0.0 (and higher) require KRaft mode and
the software and metadata versions must be at least 3.3.x</b> (the first version when KRaft mode was deemed production ready). For clusters in KRaft mode
Expand All @@ -119,7 +124,13 @@ <h5><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading Servers to 4.0.0 from
has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_4_0_IV1(23, "4.0", "IV1", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
</ol>

<h5><a id="upgrade_servers_401_notable" href="#upgrade_servers_401_notable">Notable changes in 4.0.1</a></h5>
<ul>
<li>
The filename for rotated <code>state-change.log</code> files has been updated from <code>stage-change.log.[date]</code> to <code>state-change.log.[date]</code> in the log4j2.yaml configuration file.
See <a href="https://issues.apache.org/jira/browse/KAFKA-19576">KAFKA-19576</a> for details.
</li>
</ul>
<h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Notable changes in 4.0.0</a></h5>
<ul>
<li>
Expand All @@ -131,7 +142,7 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
</li>
<li>
Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. About version upgrade,
check <a href="/{{version}}/documentation.html#upgrade_4_0_0">Upgrading to 4.0.0 from any version 3.3.x through 3.9.x</a> for more info.
check <a href="/{{version}}/documentation.html#upgrade_4_0_1">Upgrading to 4.0.1 from any version 3.3.x through 3.9.x</a> for more info.
</li>
<li>
Apache Kafka 4.0 ships with a brand-new group coordinator implementation (See <a href="https://cwiki.apache.org/confluence/x/HhD1D">here</a>).
Expand Down Expand Up @@ -474,6 +485,10 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
<li> See <a href="https://cwiki.apache.org/confluence/x/B40ODg">KIP-890</a> and
<a href="https://cwiki.apache.org/confluence/x/8ItyEg">KIP-1050</a> for more details </li>
</ul>
<li>
The filename for rotated <code>state-change.log</code> files incorrectly rotates to <code>stage-change.log.[date]</code> (changing state to stage). This issue is corrected in 4.0.1.
See <a href="https://issues.apache.org/jira/browse/KAFKA-19576">KAFKA-19576</a> for details.
</li>
</ul>
</li>
</ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ int getStaticallyConfiguredMinInsyncReplicas() {

/**
* Generate any configuration records that are needed to make it safe to enable ELR.
* Specifically, we need to remove all cluster-level configurations for min.insync.replicas,
* Specifically, we need to remove all broker-level configurations for min.insync.replicas,
* and create a cluster-level configuration for min.insync.replicas. It is always safe to call
* this function if ELR is already enabled; it will simply do nothing if the necessary
* configurations already exist.
Expand Down
Loading