Skip to content

Commit 7ddd0d7

Browse files
KAFKA-19703: Removed versions 2.3 and below from UpgradeFromValues. (#20539)
Removed versions 2.3 and below from UpgradeFromValues, including all the usagesof them. Reviewers: Matthias J. Sax <[email protected]>
1 parent 68f1da8 commit 7ddd0d7

File tree

9 files changed

+8
-886
lines changed

9 files changed

+8
-886
lines changed

docs/streams/developer-guide/config-streams.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,7 @@ <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="heade
12221222
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
12231223
</p>
12241224
<p>
1225-
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
1225+
Note that you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
12261226
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
12271227
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
12281228
</p>
@@ -1235,7 +1235,7 @@ <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="heade
12351235
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
12361236
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
12371237
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
1238-
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
1238+
when upgrading to 3.4+ from any version lower than 3.4.
12391239
</div>
12401240
</blockquote>
12411241
</div>

docs/streams/upgrade-guide.html

Lines changed: 6 additions & 733 deletions
Large diffs are not rendered by default.

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -287,66 +287,6 @@ public class StreamsConfig extends AbstractConfig {
287287
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS,
288288
SINGLE_STORE_SELF_JOIN);
289289

290-
/**
291-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
292-
*/
293-
@SuppressWarnings("WeakerAccess")
294-
public static final String UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString();
295-
296-
/**
297-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
298-
*/
299-
@SuppressWarnings("WeakerAccess")
300-
public static final String UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString();
301-
302-
/**
303-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
304-
*/
305-
@SuppressWarnings("WeakerAccess")
306-
public static final String UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString();
307-
308-
/**
309-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
310-
*/
311-
@SuppressWarnings("WeakerAccess")
312-
public static final String UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString();
313-
314-
/**
315-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
316-
*/
317-
@SuppressWarnings("WeakerAccess")
318-
public static final String UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString();
319-
320-
/**
321-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
322-
*/
323-
@SuppressWarnings("WeakerAccess")
324-
public static final String UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString();
325-
326-
/**
327-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}.
328-
*/
329-
@SuppressWarnings("WeakerAccess")
330-
public static final String UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString();
331-
332-
/**
333-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}.
334-
*/
335-
@SuppressWarnings("WeakerAccess")
336-
public static final String UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString();
337-
338-
/**
339-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}.
340-
*/
341-
@SuppressWarnings("WeakerAccess")
342-
public static final String UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString();
343-
344-
/**
345-
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}.
346-
*/
347-
@SuppressWarnings("WeakerAccess")
348-
public static final String UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString();
349-
350290
/**
351291
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}.
352292
*/

streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,6 @@
1717
package org.apache.kafka.streams.internals;
1818

1919
public enum UpgradeFromValues {
20-
UPGRADE_FROM_0100("0.10.0"),
21-
UPGRADE_FROM_0101("0.10.1"),
22-
UPGRADE_FROM_0102("0.10.2"),
23-
UPGRADE_FROM_0110("0.11.0"),
24-
UPGRADE_FROM_10("1.0"),
25-
UPGRADE_FROM_11("1.1"),
26-
UPGRADE_FROM_20("2.0"),
27-
UPGRADE_FROM_21("2.1"),
28-
UPGRADE_FROM_22("2.2"),
29-
UPGRADE_FROM_23("2.3"),
3020
UPGRADE_FROM_24("2.4"),
3121
UPGRADE_FROM_25("2.5"),
3222
UPGRADE_FROM_26("2.6"),

streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,6 @@ private static boolean isUpgrade(final Map<String, ?> configs) {
5858
}
5959

6060
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
61-
case UPGRADE_FROM_0100:
62-
case UPGRADE_FROM_0101:
63-
case UPGRADE_FROM_0102:
64-
case UPGRADE_FROM_0110:
65-
case UPGRADE_FROM_10:
66-
case UPGRADE_FROM_11:
67-
case UPGRADE_FROM_20:
68-
case UPGRADE_FROM_21:
69-
case UPGRADE_FROM_22:
70-
case UPGRADE_FROM_23:
7161
case UPGRADE_FROM_24:
7262
case UPGRADE_FROM_25:
7363
case UPGRADE_FROM_26:

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,16 +100,6 @@ private boolean isNotUpgrade(final Map<String, ?> configs) {
100100
}
101101

102102
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
103-
case UPGRADE_FROM_0100:
104-
case UPGRADE_FROM_0101:
105-
case UPGRADE_FROM_0102:
106-
case UPGRADE_FROM_0110:
107-
case UPGRADE_FROM_10:
108-
case UPGRADE_FROM_11:
109-
case UPGRADE_FROM_20:
110-
case UPGRADE_FROM_21:
111-
case UPGRADE_FROM_22:
112-
case UPGRADE_FROM_23:
113103
case UPGRADE_FROM_24:
114104
case UPGRADE_FROM_25:
115105
case UPGRADE_FROM_26:

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,6 @@ private static boolean upgradeFromV0(final Map<String, ?> configs) {
7777
}
7878

7979
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
80-
case UPGRADE_FROM_0100:
81-
case UPGRADE_FROM_0101:
82-
case UPGRADE_FROM_0102:
83-
case UPGRADE_FROM_0110:
84-
case UPGRADE_FROM_10:
85-
case UPGRADE_FROM_11:
86-
case UPGRADE_FROM_20:
87-
case UPGRADE_FROM_21:
88-
case UPGRADE_FROM_22:
89-
case UPGRADE_FROM_23:
9080
case UPGRADE_FROM_24:
9181
case UPGRADE_FROM_25:
9282
case UPGRADE_FROM_26:

streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.kafka.common.utils.Utils;
2424
import org.apache.kafka.streams.StreamsConfig;
2525
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
26-
import org.apache.kafka.streams.internals.UpgradeFromValues;
2726
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
2827
import org.apache.kafka.streams.processor.internals.ClientUtils;
2928
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
@@ -59,8 +58,6 @@ public AssignorConfiguration(final Map<String, ?> configs) {
5958
final LogContext logContext = new LogContext(logPrefix);
6059
log = logContext.logger(getClass());
6160

62-
validateUpgradeFrom();
63-
6461
{
6562
final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
6663
if (o == null) {
@@ -94,32 +91,6 @@ public ReferenceContainer referenceContainer() {
9491
return referenceContainer;
9592
}
9693

97-
// cooperative rebalancing was introduced in 2.4 and the old protocol (eager rebalancing) was removed
98-
// in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer possible without a bridge release
99-
public void validateUpgradeFrom() {
100-
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
101-
if (upgradeFrom != null) {
102-
switch (UpgradeFromValues.fromString(upgradeFrom)) {
103-
case UPGRADE_FROM_0100:
104-
case UPGRADE_FROM_0101:
105-
case UPGRADE_FROM_0102:
106-
case UPGRADE_FROM_0110:
107-
case UPGRADE_FROM_10:
108-
case UPGRADE_FROM_11:
109-
case UPGRADE_FROM_20:
110-
case UPGRADE_FROM_21:
111-
case UPGRADE_FROM_22:
112-
case UPGRADE_FROM_23:
113-
final String errMsg = String.format(
114-
"The eager rebalancing protocol is no longer supported in 4.0 which means live upgrades from 2.3 or below are not possible."
115-
+ " Please see the Streams upgrade guide for the bridge releases and recommended upgrade path. Got upgrade.from='%s'", upgradeFrom);
116-
log.error(errMsg);
117-
throw new ConfigException(errMsg);
118-
119-
}
120-
}
121-
}
122-
12394
public String logPrefix() {
12495
return logPrefix;
12596
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -570,28 +570,6 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment(final Map<String,
570570
assertThat(interleavedTaskIds, equalTo(assignment));
571571
}
572572

573-
@ParameterizedTest
574-
@MethodSource("parameter")
575-
public void shouldThrowOnEagerSubscription(final Map<String, Object> parameterizedConfig) {
576-
setUp(parameterizedConfig, false);
577-
builder.addSource(null, "source1", null, null, null, "topic1");
578-
builder.addSource(null, "source2", null, null, null, "topic2");
579-
builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2");
580-
581-
final Set<TaskId> prevTasks = Set.of(
582-
new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)
583-
);
584-
final Set<TaskId> standbyTasks = Set.of(
585-
new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)
586-
);
587-
588-
createMockTaskManager(prevTasks, standbyTasks);
589-
assertThrows(
590-
ConfigException.class,
591-
() -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
592-
);
593-
}
594-
595573
@ParameterizedTest
596574
@MethodSource("parameter")
597575
public void testCooperativeSubscription(final Map<String, Object> parameterizedConfig) {

0 commit comments

Comments
 (0)