Skip to content

Commit 410065a

Browse files
authored
KAFKA-18517: Enable ConsumerBounceTest to run for new async consumer (apache#18532)
Reviewers: Andrew Schofield <[email protected]>, Kirk True <[email protected]>
1 parent f4d9039 commit 410065a

File tree

1 file changed

+31
-14
lines changed

1 file changed

+31
-14
lines changed

core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
2929
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
3030
import org.apache.kafka.server.util.ShutdownableThread
3131
import org.junit.jupiter.api.Assertions._
32-
import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
32+
import org.junit.jupiter.api.{AfterEach, Disabled, TestInfo}
3333
import org.junit.jupiter.params.ParameterizedTest
3434
import org.junit.jupiter.params.provider.MethodSource
3535

@@ -59,7 +59,12 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
5959
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
6060
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set small enough session timeout
6161
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",
62+
63+
// Tests will run for CONSUMER and CLASSIC group protocol, so set the group max size property
64+
// required for each.
65+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
6266
GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
67+
6368
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
6469
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
6570
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
@@ -94,7 +99,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
9499
}
95100

96101
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
97-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
102+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
98103
def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: String): Unit = consumeWithBrokerFailures(10)
99104

100105
/*
@@ -139,7 +144,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
139144
}
140145

141146
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
142-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
147+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
143148
def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: String): Unit = seekAndCommitWithBrokerFailures(5)
144149

145150
def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
@@ -183,7 +188,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
183188
}
184189

185190
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
186-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
191+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
187192
def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: String): Unit = {
188193
val numRecords = 1000
189194
val newtopic = "newtopic"
@@ -243,7 +248,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
243248

244249
checkCloseGoodPath(numRecords, "group1")
245250
checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
246-
checkCloseWithClusterFailure(numRecords, "group4", "group5")
251+
checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol)
247252
}
248253

249254
/**
@@ -297,12 +302,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
297302
* there is no coordinator, but close should timeout and return. If close is invoked with a very
298303
* large timeout, close should timeout after request timeout.
299304
*/
300-
private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String): Unit = {
305+
private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String,
306+
groupProtocol: String): Unit = {
301307
val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords)
302308

303309
val requestTimeout = 6000
304-
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
305-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
310+
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
311+
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
312+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
313+
}
306314
this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
307315
val consumer2 = createConsumerAndReceive(group2, manualAssign = true, numRecords)
308316

@@ -319,17 +327,20 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
319327
* the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config.
320328
* Then, 1 consumer should be left out of the group.
321329
*/
322-
@Test
330+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
331+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
323332
@Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13421)
324-
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
333+
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(quorum: String, groupProtocol: String): Unit = {
325334
val group = "group-max-size-test"
326335
val topic = "group-max-size-test"
327336
val maxGroupSize = 2
328337
val consumerCount = maxGroupSize + 1
329338
val partitionCount = consumerCount * 2
330339

331340
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
332-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
341+
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
342+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
343+
}
333344
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
334345
val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)
335346

@@ -361,12 +372,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
361372
* When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception
362373
*/
363374
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
364-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
375+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
365376
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, groupProtocol: String): Unit = {
366377
val group = "fatal-exception-test"
367378
val topic = "fatal-exception-test"
368379
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
369-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
380+
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
381+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
382+
}
370383
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
371384

372385
val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
@@ -401,11 +414,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
401414
*/
402415
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
403416
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
417+
// TODO: enable for all protocols after fix for not generating/blocking on unneeded
418+
// FindCoordinator on close for the new consumer
404419
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
405420
val topic = "closetest"
406421
createTopic(topic, 10, brokerCount)
407422
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
408-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
423+
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
424+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
425+
}
409426
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
410427
checkCloseDuringRebalance("group1", topic, executor, brokersAvailableDuringClose = true)
411428
}

0 commit comments

Comments
 (0)