Skip to content

Commit ac63ce9

Browse files
authored
KAFKA-19544 Improve MetadataVersion.fromVersionString() to take an enableUnstableFeature flag (#20248)
Improve `MetadataVersion.fromVersionString()` to take an `enableUnstableFeature` flag, and enable `FeatureCommand` and `StorageTool` to leverage the exception message from `fromVersionString`. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 348e64c commit ac63ce9

File tree

8 files changed

+101
-99
lines changed

8 files changed

+101
-99
lines changed

core/src/main/scala/kafka/tools/StorageTool.scala

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,12 @@ object StorageTool extends Logging {
129129
setControllerListenerName(config.controllerListenerNames.get(0)).
130130
setMetadataLogDirectory(config.metadataLogDir)
131131

132-
def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = {
133-
val versions = MetadataVersion.VERSIONS.slice(first.ordinal, last.ordinal + 1)
134-
versions.map(_.toString).mkString(", ")
135-
}
136132
Option(namespace.getString("release_version")).foreach(releaseVersion => {
137133
try {
138-
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
134+
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion, config.unstableFeatureVersionsEnabled))
139135
} catch {
140-
case _: Throwable =>
141-
throw new TerseFailure(s"Unknown metadata.version $releaseVersion. Supported metadata.version are " +
142-
s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())}")
136+
case e: Throwable =>
137+
throw new TerseFailure(e.getMessage)
143138
}
144139
})
145140

@@ -184,9 +179,9 @@ object StorageTool extends Logging {
184179
* Maps the given release version to the corresponding metadata version
185180
* and prints the corresponding features.
186181
*
187-
* @param namespace Arguments containing the release version.
188-
* @param printStream The print stream to output the version mapping.
189-
* @param validFeatures List of features to be considered in the output
182+
* @param namespace Arguments containing the release version.
183+
* @param printStream The print stream to output the version mapping.
184+
* @param validFeatures List of features to be considered in the output.
190185
*/
191186
def runVersionMappingCommand(
192187
namespace: Namespace,
@@ -195,7 +190,7 @@ object StorageTool extends Logging {
195190
): Unit = {
196191
val releaseVersion = Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
197192
try {
198-
val metadataVersion = MetadataVersion.fromVersionString(releaseVersion)
193+
val metadataVersion = MetadataVersion.fromVersionString(releaseVersion, true)
199194

200195
val metadataVersionLevel = metadataVersion.featureLevel()
201196
printStream.print(f"metadata.version=$metadataVersionLevel%d ($releaseVersion%s)%n")
@@ -206,8 +201,7 @@ object StorageTool extends Logging {
206201
}
207202
} catch {
208203
case e: IllegalArgumentException =>
209-
throw new TerseFailure(s"Unknown release version '$releaseVersion'. Supported versions are: " +
210-
s"${MetadataVersion.MINIMUM_VERSION.version} to ${MetadataVersion.latestTesting().version()}")
204+
throw new TerseFailure(e.getMessage)
211205
}
212206
}
213207

core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ class KRaftClusterTest {
807807
val cluster = new KafkaClusterTestKit.Builder(
808808
new TestKitNodes.Builder().
809809
setNumBrokerNodes(4).
810-
setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString)).
810+
setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString, true)).
811811
setNumControllerNodes(3).build()).
812812
build()
813813
try {

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ Found problem:
315315
val stream = new ByteArrayOutputStream()
316316
val failure = assertThrows(classOf[TerseFailure], () =>
317317
runFormatCommand(stream, properties, Seq("--release-version", "3.3-IV1"))).getMessage
318-
assertTrue(failure.contains("Unknown metadata.version 3.3-IV1"))
318+
assertTrue(failure.contains("Unknown metadata.version '3.3-IV1'"))
319319
assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version))
320320
assertTrue(failure.contains(MetadataVersion.latestProduction().version))
321321
}
@@ -735,18 +735,18 @@ Found problem:
735735
runVersionMappingCommand(stream, "2.9-IV2")
736736
})
737737

738-
assertEquals("Unknown release version '2.9-IV2'." +
739-
" Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version +
740-
" to " + MetadataVersion.latestTesting().version, exception.getMessage
738+
assertEquals("Unknown metadata.version '2.9-IV2'. Supported metadata.version are: " +
739+
MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()),
740+
exception.getMessage
741741
)
742742

743743
val exception2 = assertThrows(classOf[TerseFailure], () => {
744744
runVersionMappingCommand(stream, "invalid")
745745
})
746746

747-
assertEquals("Unknown release version 'invalid'." +
748-
" Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version +
749-
" to " + MetadataVersion.latestTesting().version, exception2.getMessage
747+
assertEquals("Unknown metadata.version 'invalid'. Supported metadata.version are: " +
748+
MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()),
749+
exception2.getMessage
750750
)
751751
}
752752

metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public void testNoLeaderEpochBumpIfNothingChanged(short version) {
310310
@ParameterizedTest
311311
@ValueSource(strings = {"3.6-IV0", "3.7-IV2", "4.0-IV0"})
312312
public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
313-
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
313+
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true);
314314
testTriggerLeaderEpochBumpIfNeeded(
315315
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
316316
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))),
@@ -325,7 +325,7 @@ public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
325325
@ParameterizedTest
326326
@ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
327327
public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
328-
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
328+
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true);
329329
testTriggerLeaderEpochBumpIfNeeded(
330330
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
331331
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))),
@@ -339,7 +339,7 @@ public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
339339
@ParameterizedTest
340340
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "4.0-IV0"})
341341
public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
342-
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
342+
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true);
343343
testTriggerLeaderEpochBumpIfNeeded(
344344
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
345345
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1, 3, 4))),
@@ -354,7 +354,7 @@ public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
354354
@ParameterizedTest
355355
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "4.0-IV0"})
356356
public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) {
357-
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
357+
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true);
358358
testTriggerLeaderEpochBumpIfNeeded(
359359
createFooBuilder(metadataVersion).setTargetReplicas(List.of(2, 1, 4)),
360360
new PartitionChangeRecord(),
@@ -368,7 +368,7 @@ public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionStr
368368
@ParameterizedTest
369369
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"})
370370
public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) {
371-
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
371+
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true);
372372
PartitionRegistration partition = new PartitionRegistration.Builder().
373373
setReplicas(new int[] {2}).
374374
setDirectories(new Uuid[]{

server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import java.util.Arrays;
2121
import java.util.HashMap;
22+
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Optional;
2425
import java.util.regex.Pattern;
26+
import java.util.stream.Collectors;
2527

2628
/**
2729
* This class contains the different Kafka versions.
@@ -340,11 +342,12 @@ Optional<MetadataVersion> previous() {
340342

341343
/**
342344
* Return an `MetadataVersion` instance for `versionString`, which can be in a variety of formats (e.g. "3.8", "3.8.x",
343-
* "3.8.0", "3.8-IV0"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`.
345+
* "3.8.0", "3.8-IV0"). The `unstableFeatureVersionsEnabled` parameter determines whether unstable versions are permitted.
346+
* `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`.
344347
* Note that 'misconfigured' values such as "3.8.1" will be parsed to `IBP_3_8_IV0` as we ignore anything after the first
345348
* two segments.
346349
*/
347-
public static MetadataVersion fromVersionString(String versionString) {
350+
public static MetadataVersion fromVersionString(String versionString, boolean unstableFeatureVersionsEnabled) {
348351
String[] versionSegments = versionString.split(Pattern.quote("."));
349352
int numSegments = 2;
350353
String key;
@@ -353,10 +356,22 @@ public static MetadataVersion fromVersionString(String versionString) {
353356
} else {
354357
key = String.join(".", Arrays.copyOfRange(versionSegments, 0, numSegments));
355358
}
356-
return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() ->
357-
new IllegalArgumentException("Version " + versionString + " is not a valid version. The minimum version is " + MINIMUM_VERSION
358-
+ " and the maximum version is " + latestTesting())
359-
);
359+
360+
MetadataVersion metadataVersion = IBP_VERSIONS.get(key);
361+
if (metadataVersion == null || (!unstableFeatureVersionsEnabled && !metadataVersion.isProduction())) {
362+
String errorMsg = "Unknown metadata.version '" + versionString + "'. Supported metadata.version are: "
363+
+ metadataVersionsToString(MetadataVersion.MINIMUM_VERSION,
364+
unstableFeatureVersionsEnabled ? MetadataVersion.latestTesting() : MetadataVersion.latestProduction());
365+
throw new IllegalArgumentException(errorMsg);
366+
}
367+
return metadataVersion;
368+
}
369+
370+
public static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) {
371+
List<MetadataVersion> versions = List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
372+
return versions.stream()
373+
.map(String::valueOf)
374+
.collect(Collectors.joining(", "));
360375
}
361376

362377
public static MetadataVersion fromFeatureLevel(short version) {

server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.apache.kafka.server.common.MetadataVersion.*;
2727
import static org.junit.jupiter.api.Assertions.assertEquals;
2828
import static org.junit.jupiter.api.Assertions.assertFalse;
29+
import static org.junit.jupiter.api.Assertions.assertThrows;
2930
import static org.junit.jupiter.api.Assertions.assertTrue;
3031

3132
class MetadataVersionTest {
@@ -42,55 +43,69 @@ public void testFeatureLevels() {
4243
@SuppressWarnings("checkstyle:JavaNCSS")
4344
public void testFromVersionString() {
4445
// 3.3-IV3 is the latest production version in the 3.3 line
45-
assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
46-
assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3"));
46+
assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3", true));
47+
assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3", true));
4748

4849
// 3.4-IV0 is the latest production version in the 3.4 line
49-
assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4"));
50-
assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0"));
50+
assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4", true));
51+
assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0", true));
5152

5253
// 3.5-IV2 is the latest production version in the 3.5 line
53-
assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5"));
54-
assertEquals(IBP_3_5_IV0, MetadataVersion.fromVersionString("3.5-IV0"));
55-
assertEquals(IBP_3_5_IV1, MetadataVersion.fromVersionString("3.5-IV1"));
56-
assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5-IV2"));
54+
assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5", true));
55+
assertEquals(IBP_3_5_IV0, MetadataVersion.fromVersionString("3.5-IV0", true));
56+
assertEquals(IBP_3_5_IV1, MetadataVersion.fromVersionString("3.5-IV1", true));
57+
assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5-IV2", true));
5758

5859
// 3.6-IV2 is the latest production version in the 3.6 line
59-
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6"));
60-
assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0"));
61-
assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1"));
62-
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2"));
60+
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6", true));
61+
assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0", true));
62+
assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1", true));
63+
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2", true));
6364

6465
// 3.7-IV4 is the latest production version in the 3.7 line
65-
assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7"));
66-
assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0"));
67-
assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1"));
68-
assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2"));
69-
assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3"));
70-
assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4"));
66+
assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7", true));
67+
assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0", true));
68+
assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1", true));
69+
assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2", true));
70+
assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3", true));
71+
assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4", true));
7172

7273
// 3.8-IV0 is the latest production version in the 3.8 line
73-
assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8"));
74-
assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0"));
74+
assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8", true));
75+
assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0", true));
7576

7677
// 3.9-IV0 is the latest production version in the 3.9 line
77-
assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9"));
78-
assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0"));
78+
assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9", true));
79+
assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0", true));
7980

8081
// 4.0-IV3 is the latest production version in the 4.0 line
81-
assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0"));
82-
assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0"));
83-
assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1"));
84-
assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2"));
85-
assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3"));
82+
assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0", true));
83+
assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0", true));
84+
assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1", true));
85+
assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2", true));
86+
assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3", true));
8687

8788
// 4.1-IV1 is the latest production version in the 4.1 line
88-
assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1"));
89-
assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0"));
90-
assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1"));
89+
assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1", true));
90+
assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0", true));
91+
assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1", true));
92+
93+
assertEquals(IBP_4_2_IV0, MetadataVersion.fromVersionString("4.2-IV0", true));
94+
assertEquals(IBP_4_2_IV1, MetadataVersion.fromVersionString("4.2-IV1", true));
95+
96+
// Throws exception when unstableFeatureVersionsEnabled is false
97+
assertEquals("Unknown metadata.version '4.2-IV0'. Supported metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
98+
+ "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, 4.1-IV1",
99+
assertThrows(IllegalArgumentException.class, () -> fromVersionString("4.2-IV0", false)).getMessage());
100+
assertEquals("Unknown metadata.version '4.2-IV1'. Supported metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
101+
+ "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, 4.1-IV1",
102+
assertThrows(IllegalArgumentException.class, () -> fromVersionString("4.2-IV1", false)).getMessage());
103+
}
91104

92-
assertEquals(IBP_4_2_IV0, MetadataVersion.fromVersionString("4.2-IV0"));
93-
assertEquals(IBP_4_2_IV1, MetadataVersion.fromVersionString("4.2-IV1"));
105+
@Test
106+
public void testMetadataVersionsToString() {
107+
assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0",
108+
MetadataVersion.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0, MetadataVersion.IBP_3_6_IV0));
94109
}
95110

96111
@Test

0 commit comments

Comments
 (0)