Skip to content

Commit 73d0bcb

Browse files
committed
(feat) graceful shutdown
1 parent 8399f31 commit 73d0bcb

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
113113
* @param pollTimeoutMs the poll timeout in milliseconds
114114
* @return this
115115
*/
116-
public LcKafkaConsumerBuilder<K, V> pollTimeoutMs(long pollTimeoutMs) {
117-
requireArgument(pollTimeoutMs >= 0, "pollTimeoutMs: %s (expect >= 0)", pollTimeoutMs);
116+
public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMs) {
117+
requireArgument(pollTimeoutMs >= 0, "pollTimeoutMillis: %s (expect >= 0)", pollTimeoutMs);
118118
this.pollTimeout = pollTimeoutMs;
119119
return this;
120120
}
@@ -137,13 +137,13 @@ public LcKafkaConsumerBuilder<K, V> pollTimeout(Duration pollTimeout) {
137137
return this;
138138
}
139139

140-
public LcKafkaConsumerBuilder<K,V> gracefulShutdownMs(long gracefulShutdownMs) {
140+
public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeoutMillis(long gracefulShutdownMs) {
141141
requireArgument(gracefulShutdownMs >= 0, "gracefulShutdownMillis: %s (expected >= 0)", gracefulShutdownMs);
142142
this.gracefulShutdownMillis = gracefulShutdownMs;
143143
return this;
144144
}
145145

146-
public LcKafkaConsumerBuilder<K,V> gracefulShutdownMs(Duration duration) {
146+
public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeout(Duration duration) {
147147
requireNonNull(duration);
148148
this.gracefulShutdownMillis = duration.toMillis();
149149
return this;

src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ public void testNullDeserializers() {
8282
@Test
8383
public void testNegativePollTimeoutMs() {
8484
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
85-
.pollTimeoutMs(-1 * ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE)))
85+
.pollTimeoutMillis(-1 * ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE)))
8686
.isInstanceOf(IllegalArgumentException.class)
87-
.hasMessageContaining("pollTimeoutMs");
87+
.hasMessageContaining("pollTimeoutMillis");
8888
}
8989

9090
@Test
@@ -157,7 +157,7 @@ public void testAutoConsumerWithShouldShutdownWorkerPool() {
157157
configs.put("auto.offset.reset", "latest");
158158
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
159159
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
160-
.pollTimeoutMs(1000)
160+
.pollTimeoutMillis(1000)
161161
.maxPendingAsyncCommits(100)
162162
.workerPool(workerPool, true)
163163
.buildAuto())
@@ -173,7 +173,7 @@ public void testAutoConsumer() {
173173
configs.put("auto.offset.reset", "latest");
174174
final LcKafkaConsumer<Object, Object> consumer = LcKafkaConsumerBuilder.newBuilder(configs, testingHandler)
175175
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
176-
.pollTimeoutMs(1000)
176+
.pollTimeoutMillis(1000)
177177
.maxPendingAsyncCommits(100)
178178
.workerPool(workerPool, false)
179179
.buildAuto();

src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public LcKafkaConsumer<Integer, String> buildConsumer(String consumerName, TestS
5353
configs.put("auto.offset.reset", "earliest");
5454
configs.put("group.id", "2614911922612339122");
5555
configs.put("max.poll.records", 2);
56-
configs.put("max.poll.interval.ms", "5000");
56+
configs.put("max.poll.interval.ms", 5000);
5757

5858
return LcKafkaConsumerBuilder.newBuilder(
5959
configs,

0 commit comments

Comments
 (0)