Skip to content

Commit 611f412

Browse files
authored
MINOR: Enable streams rebalance protocol in EosIntegrationTest (#20592)
Remove stalling instance in EOSIntegrationTest, since it doesn’t matter what it thinks what the assignment is but blocks the test with streams group protocol Reviewers: Lucas Brutschy <[email protected]>
1 parent d76442e commit 611f412

File tree

1 file changed

+77
-51
lines changed

1 file changed

+77
-51
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java

Lines changed: 77 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@
7272
import org.junit.jupiter.api.BeforeAll;
7373
import org.junit.jupiter.api.BeforeEach;
7474
import org.junit.jupiter.api.Tag;
75-
import org.junit.jupiter.api.Test;
7675
import org.junit.jupiter.api.Timeout;
7776
import org.junit.jupiter.params.ParameterizedTest;
77+
import org.junit.jupiter.params.provider.Arguments;
78+
import org.junit.jupiter.params.provider.MethodSource;
7879
import org.junit.jupiter.params.provider.ValueSource;
7980
import org.slf4j.Logger;
8081
import org.slf4j.LoggerFactory;
@@ -169,6 +170,15 @@ public static void closeCluster() {
169170

170171
private String stateTmpDir;
171172

173+
private static java.util.stream.Stream<Arguments> groupProtocolAndProcessingThreadsParameters() {
174+
return java.util.stream.Stream.of(
175+
Arguments.of("classic", true),
176+
Arguments.of("classic", false),
177+
Arguments.of("streams", true),
178+
Arguments.of("streams", false)
179+
);
180+
}
181+
172182
@BeforeEach
173183
public void createTopics() throws Exception {
174184
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
@@ -181,16 +191,19 @@ public void createTopics() throws Exception {
181191
CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
182192
CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
183193
CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
194+
CLUSTER.setGroupStandbyReplicas(applicationId, 1);
184195
}
185196

186-
@Test
187-
public void shouldBeAbleToRunWithEosEnabled() throws Exception {
188-
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
197+
@ParameterizedTest
198+
@ValueSource(strings = {"classic", "streams"})
199+
public void shouldBeAbleToRunWithEosEnabled(final String groupProtocol) throws Exception {
200+
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
189201
}
190202

191-
@Test
192-
public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
193-
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true);
203+
@ParameterizedTest
204+
@ValueSource(strings = {"classic", "streams"})
205+
public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String groupProtocol) throws Exception {
206+
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, groupProtocol);
194207

195208
try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())));
196209
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(mkMap(
@@ -215,36 +228,42 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except
215228
}
216229
}
217230

218-
@Test
219-
public void shouldBeAbleToRestartAfterClose() throws Exception {
220-
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
231+
@ParameterizedTest
232+
@ValueSource(strings = {"classic", "streams"})
233+
public void shouldBeAbleToRestartAfterClose(final String groupProtocol) throws Exception {
234+
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
221235
}
222236

223-
@Test
224-
public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
225-
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false);
237+
@ParameterizedTest
238+
@ValueSource(strings = {"classic", "streams"})
239+
public void shouldBeAbleToCommitToMultiplePartitions(final String groupProtocol) throws Exception {
240+
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
226241
}
227242

228-
@Test
229-
public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
230-
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
243+
@ParameterizedTest
244+
@ValueSource(strings = {"classic", "streams"})
245+
public void shouldBeAbleToCommitMultiplePartitionOffsets(final String groupProtocol) throws Exception {
246+
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
231247
}
232248

233-
@Test
234-
public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
235-
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false);
249+
@ParameterizedTest
250+
@ValueSource(strings = {"classic", "streams"})
251+
public void shouldBeAbleToRunWithTwoSubtopologies(final String groupProtocol) throws Exception {
252+
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
236253
}
237254

238-
@Test
239-
public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
240-
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false);
255+
@ParameterizedTest
256+
@ValueSource(strings = {"classic", "streams"})
257+
public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions(final String groupProtocol) throws Exception {
258+
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
241259
}
242260

243261
private void runSimpleCopyTest(final int numberOfRestarts,
244262
final String inputTopic,
245263
final String throughTopic,
246264
final String outputTopic,
247-
final boolean inputTopicTransactional) throws Exception {
265+
final boolean inputTopicTransactional,
266+
final String groupProtocol) throws Exception {
248267
final StreamsBuilder builder = new StreamsBuilder();
249268
final KStream<Long, Long> input = builder.stream(inputTopic);
250269
KStream<Long, Long> output = input;
@@ -263,6 +282,7 @@ private void runSimpleCopyTest(final int numberOfRestarts,
263282
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
264283
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_INTERVAL_MS - 1);
265284
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
285+
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
266286

267287
for (int i = 0; i < numberOfRestarts; ++i) {
268288
final Properties config = StreamsTestUtils.getStreamsConfig(
@@ -326,8 +346,9 @@ private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key, final List
326346
return recordsPerKey;
327347
}
328348

329-
@Test
330-
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
349+
@ParameterizedTest
350+
@ValueSource(strings = {"classic", "streams"})
351+
public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol) throws Exception {
331352
final StreamsBuilder builder = new StreamsBuilder();
332353
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
333354

@@ -337,6 +358,7 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
337358
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
338359
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
339360
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
361+
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
340362

341363
final Properties config = StreamsTestUtils.getStreamsConfig(
342364
applicationId,
@@ -374,8 +396,8 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
374396
}
375397

376398
@ParameterizedTest
377-
@ValueSource(booleans = {true, false})
378-
public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEnabled) throws Exception {
399+
@MethodSource("groupProtocolAndProcessingThreadsParameters")
400+
public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
379401

380402
// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
381403
// the app is supposed to copy all 40 records into the output topic
@@ -386,7 +408,7 @@ public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEna
386408
// -> the failure only kills one thread
387409
// after fail over, we should read 40 committed records (even if 50 record got written)
388410

389-
try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, processingThreadsEnabled)) {
411+
try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
390412
startApplicationAndWaitUntilRunning(streams);
391413

392414
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -476,8 +498,8 @@ public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEna
476498
}
477499

478500
@ParameterizedTest
479-
@ValueSource(booleans = {true, false})
480-
public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingThreadsEnabled) throws Exception {
501+
@MethodSource("groupProtocolAndProcessingThreadsParameters")
502+
public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
481503

482504
// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
483505
// the app is supposed to emit all 40 update records into the output topic
@@ -493,7 +515,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingT
493515

494516
// We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
495517
// to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
496-
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, processingThreadsEnabled)) {
518+
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
497519
startApplicationAndWaitUntilRunning(streams);
498520

499521
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -594,8 +616,8 @@ public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingT
594616
}
595617

596618
@ParameterizedTest
597-
@ValueSource(booleans = {true, false})
598-
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final boolean processingThreadsEnabled) throws Exception {
619+
@MethodSource("groupProtocolAndProcessingThreadsParameters")
620+
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
599621
// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
600622
// the app is supposed to copy all 60 records into the output topic
601623
//
@@ -607,10 +629,9 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
607629
//
608630
// afterward, the "stalling" thread resumes, and another rebalance should get triggered
609631
// we write the remaining 20 records and verify to read 60 result records
610-
611632
try (
612-
final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, processingThreadsEnabled);
613-
final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, processingThreadsEnabled)
633+
final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, groupProtocol, processingThreadsEnabled);
634+
final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, groupProtocol, processingThreadsEnabled)
614635
) {
615636
startApplicationAndWaitUntilRunning(streams1);
616637
startApplicationAndWaitUntilRunning(streams2);
@@ -667,13 +688,10 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
667688
"Expected a host to start stalling"
668689
);
669690
final String observedStallingHost = stallingHost.get();
670-
final KafkaStreams stallingInstance;
671691
final KafkaStreams remainingInstance;
672692
if ("streams1".equals(observedStallingHost)) {
673-
stallingInstance = streams1;
674693
remainingInstance = streams2;
675694
} else if ("streams2".equals(observedStallingHost)) {
676-
stallingInstance = streams2;
677695
remainingInstance = streams1;
678696
} else {
679697
throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
@@ -683,8 +701,7 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
683701
// the assignment is. We only really care that the remaining instance only sees one host
684702
// that owns both partitions.
685703
waitForCondition(
686-
() -> stallingInstance.metadataForAllStreamsClients().size() == 2
687-
&& remainingInstance.metadataForAllStreamsClients().size() == 1
704+
() -> remainingInstance.metadataForAllStreamsClients().size() == 1
688705
&& remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
689706
MAX_WAIT_TIME_MS,
690707
() -> "Should have rebalanced.\n" +
@@ -755,12 +772,12 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
755772
}
756773

757774
@ParameterizedTest
758-
@ValueSource(booleans = {true, false})
759-
public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean processingThreadsEnabled) throws Exception {
775+
@MethodSource("groupProtocolAndProcessingThreadsParameters")
776+
public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
760777
final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
761778
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
762779

763-
try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, processingThreadsEnabled)) {
780+
try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) {
764781
writeInputData(writtenData);
765782

766783
startApplicationAndWaitUntilRunning(streams);
@@ -787,9 +804,9 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean process
787804
}
788805

789806
@ParameterizedTest
790-
@ValueSource(booleans = {true, false})
807+
@MethodSource("groupProtocolAndProcessingThreadsParameters")
791808
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
792-
final boolean processingThreadsEnabled) throws Exception {
809+
final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
793810

794811
final Properties streamsConfiguration = new Properties();
795812
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -801,6 +818,7 @@ public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
801818
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
802819
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
803820
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
821+
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
804822
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
805823
final String stateStoreName = "stateStore";
806824

@@ -934,8 +952,13 @@ public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
934952
static final AtomicReference<TaskId> TASK_WITH_DATA = new AtomicReference<>();
935953
static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
936954

937-
@Test
938-
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception {
955+
@ParameterizedTest
956+
@ValueSource(strings = {"classic", "streams"})
957+
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String groupProtocol) throws Exception {
958+
// Reset static variables to ensure test isolation
959+
TASK_WITH_DATA.set(null);
960+
DID_REVOKE_IDLE_TASK.set(false);
961+
939962
final AtomicBoolean requestCommit = new AtomicBoolean(false);
940963

941964
final StreamsBuilder builder = new StreamsBuilder();
@@ -970,6 +993,7 @@ public void process(final Record<Long, Long> record) {
970993
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
971994
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), Integer.MAX_VALUE);
972995
properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TestTaskAssignor.class.getName());
996+
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
973997

974998
final Properties config = StreamsTestUtils.getStreamsConfig(
975999
applicationId,
@@ -1003,9 +1027,9 @@ public void process(final Record<Long, Long> record) {
10031027
// add second thread, to trigger rebalance
10041028
// expect idle task to get revoked -- this should not trigger a TX commit
10051029
streams.addStreamThread();
1006-
1007-
waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
1008-
1030+
if (groupProtocol.equals("classic")) {
1031+
waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
1032+
}
10091033
// best-effort sanity check (might pass and not detect issue in slow environments)
10101034
try {
10111035
readResult(SINGLE_PARTITION_OUTPUT_TOPIC, 1, "consumer", 10_000L);
@@ -1104,6 +1128,7 @@ private KafkaStreams getKafkaStreams(final String dummyHostName,
11041128
final boolean withState,
11051129
final String appDir,
11061130
final int numberOfStreamsThreads,
1131+
final String groupProtocol,
11071132
final boolean processingThreadsEnabled) {
11081133
commitRequested = new AtomicInteger(0);
11091134
errorInjected = new AtomicBoolean(false);
@@ -1212,6 +1237,7 @@ public void process(final Record<Long, Long> record) {
12121237
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
12131238
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
12141239
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
1240+
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
12151241

12161242
final Properties config = StreamsTestUtils.getStreamsConfig(
12171243
applicationId,

0 commit comments

Comments
 (0)