Skip to content

Commit 2f17f21

Browse files
committed
(feat) unsubscribe topics before close
1 parent 3ab0f00 commit 2f17f21

File tree

2 files changed

+3
-0
lines changed

2 files changed

+3
-0
lines changed

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ private void gracefulShutdown() {
138138
final long start = System.currentTimeMillis();
139139
long remain = gracefulShutdownMillis;
140140
try {
141+
consumer.unsubscribe();
141142
for (Future<ConsumerRecord<K, V>> future : pendingFutures.values()) {
142143
try {
143144
if (remain > 0) {
@@ -153,6 +154,7 @@ private void gracefulShutdown() {
153154
processCompletedRecords();
154155
} catch (InterruptedException ex) {
155156
logger.warn("Graceful shutdown was interrupted.");
157+
Thread.currentThread().interrupt();
156158
} catch (ExecutionException ex) {
157159
logger.error("Handle message got unexpected exception. Continue shutdown without wait handling message done.", ex);
158160
}

src/test/java/cn/leancloud/kafka/consumer/FetcherTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public void testGracefulShutdown() throws Exception {
7575
fetcherThread.join();
7676
assertThat(fetcher.pendingFutures()).isEmpty();
7777
verify(policy, times(1)).partialCommit();
78+
assertThat(consumer.subscription()).isEmpty();
7879
}
7980

8081
@Test

0 commit comments

Comments
 (0)