Skip to content

Commit 2dffe32

Browse files
authored
KAFKA-19249: Replace Consumer#close(Duration) with Consumer#close(CloseOptions) (#20983)
# Description `Consumer#close(Duration)` was deprecated since 4.1. Replaced calls with `Consumer#close(CloseOptions)`. Reviewers: Kirk True <[email protected]>, Andrew Schofield <[email protected]>
1 parent 437337c commit 2dffe32

File tree

4 files changed

+9
-8
lines changed

4 files changed

+9
-8
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartiti
572572

573573
@Override
574574
public void close() {
575-
close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
575+
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
576576
}
577577

578578
@Deprecated

core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package kafka.api
1919

2020
import java.time.Duration
21-
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, KafkaShareConsumer, ShareConsumer}
21+
import org.apache.kafka.clients.consumer.{CloseOptions, Consumer, ConsumerConfig, KafkaConsumer, KafkaShareConsumer, ShareConsumer}
2222
import kafka.utils.TestUtils
2323
import kafka.utils.Implicits._
2424

@@ -307,13 +307,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
307307
@AfterEach
308308
override def tearDown(): Unit = {
309309
try {
310+
val closeOptions = CloseOptions.timeout(Duration.ZERO)
310311
producers.foreach(_.close(Duration.ZERO))
311312
consumers.foreach(_.wakeup())
312-
consumers.foreach(_.close(Duration.ZERO))
313+
consumers.foreach(_.close(closeOptions))
313314
shareConsumers.foreach(_.wakeup())
314315
shareConsumers.foreach(_.close(Duration.ZERO))
315316
streamsConsumers.foreach(_.wakeup())
316-
streamsConsumers.foreach(_.close(Duration.ZERO))
317+
streamsConsumers.foreach(_.close(closeOptions))
317318
adminClients.foreach(_.close(Duration.ZERO))
318319

319320
producers.clear()

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.kafka.clients.CommonClientConfigs
3838
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
3939
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
4040
import org.apache.kafka.clients.admin._
41-
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
41+
import org.apache.kafka.clients.consumer.{CloseOptions, Consumer, ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
4242
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
4343
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition, TopicPartitionInfo}
4444
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -171,7 +171,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
171171
clientThreads.foreach(_.join(5 * 1000))
172172
executors.foreach(_.shutdownNow())
173173
producers.foreach(_.close(Duration.ZERO))
174-
consumers.foreach(_.close(Duration.ofMillis(0)))
174+
consumers.foreach(_.close(CloseOptions.timeout(Duration.ZERO)))
175175
adminClients.foreach(_.close())
176176
TestUtils.shutdownServers(servers)
177177
super.tearDown()

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.server.log.remote.metadata.storage;
1818

19+
import org.apache.kafka.clients.consumer.CloseOptions;
1920
import org.apache.kafka.clients.consumer.Consumer;
2021
import org.apache.kafka.clients.consumer.ConsumerRecord;
2122
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -154,10 +155,9 @@ void ingestRecords() {
154155
}
155156

156157
// visible for testing
157-
@SuppressWarnings("deprecation")
158158
void closeConsumer() {
159159
try {
160-
consumer.close(Duration.ofSeconds(30));
160+
consumer.close(CloseOptions.timeout(Duration.ofSeconds(30)));
161161
} catch (final Exception e) {
162162
log.error("Error encountered while closing the consumer", e);
163163
}

0 commit comments

Comments
 (0)