Skip to content

Commit 55acef8

Browse files
authored
Merge pull request #8 from leancloud/feat/unsubscribe-status
unsubscribed status
2 parents 5cad939 + 2969c53 commit 55acef8

File tree

11 files changed

+139
-16
lines changed

11 files changed

+139
-16
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,12 @@ void clearCachedCompletedPartitionsRecords(Set<TopicPartition> completedPartitio
7575
}
7676
}
7777

78+
@VisibleForTesting
7879
Map<TopicPartition, Long> topicOffsetHighWaterMark() {
7980
return topicOffsetHighWaterMark;
8081
}
8182

83+
@VisibleForTesting
8284
Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
8385
return completedTopicOffsets;
8486
}

src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ void updateNextRecommitTime() {
4040
updateNextRecommitTime(System.nanoTime());
4141
}
4242

43+
@VisibleForTesting
4344
long nextRecommitNanos() {
4445
return nextRecommitNanos;
4546
}

src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,17 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
4646
return partitions;
4747
}
4848

49+
@VisibleForTesting
4950
int pendingAsyncCommitCount() {
5051
return pendingAsyncCommitCounter;
5152
}
5253

54+
@VisibleForTesting
5355
boolean forceSync() {
5456
return forceSync;
5557
}
5658

59+
@VisibleForTesting
5760
void setForceSync(boolean forceSync) {
5861
this.forceSync = forceSync;
5962
}

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ final class Fetcher<K, V> implements Runnable, Closeable {
2424
private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures;
2525
private final CommitPolicy<K, V> policy;
2626
private final long gracefulShutdownMillis;
27+
private final CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture;
2728
private volatile boolean closed;
2829

2930
Fetcher(LcKafkaConsumerBuilder<K, V> consumerBuilder) {
@@ -44,13 +45,15 @@ final class Fetcher<K, V> implements Runnable, Closeable {
4445
this.service = new ExecutorCompletionService<>(workerPool);
4546
this.policy = policy;
4647
this.gracefulShutdownMillis = gracefulShutdownMillis;
48+
this.unsubscribeStatusFuture = new CompletableFuture<>();
4749
}
4850

4951
@Override
5052
public void run() {
5153
logger.debug("Fetcher thread started.");
5254
final long pollTimeout = this.pollTimeout;
5355
final Consumer<K, V> consumer = this.consumer;
56+
UnsubscribedStatus unsubscribedStatus = UnsubscribedStatus.CLOSED;
5457
while (true) {
5558
try {
5659
final ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
@@ -72,13 +75,14 @@ public void run() {
7275
break;
7376
}
7477
} catch (Exception ex) {
78+
unsubscribedStatus = UnsubscribedStatus.ERROR;
7579
close();
7680
logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", ex);
7781
break;
7882
}
7983
}
8084

81-
gracefulShutdown();
85+
gracefulShutdown(unsubscribedStatus);
8286
}
8387

8488
@Override
@@ -87,6 +91,11 @@ public void close() {
8791
consumer.wakeup();
8892
}
8993

94+
CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture() {
95+
return unsubscribeStatusFuture;
96+
}
97+
98+
@VisibleForTesting
9099
Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures() {
91100
return pendingFutures;
92101
}
@@ -134,7 +143,7 @@ private void tryCommitRecordOffsets() {
134143
}
135144
}
136145

137-
private void gracefulShutdown() {
146+
private void gracefulShutdown(UnsubscribedStatus unsubscribedStatus) {
138147
final long start = System.currentTimeMillis();
139148
long remain = gracefulShutdownMillis;
140149
try {
@@ -163,6 +172,8 @@ private void gracefulShutdown() {
163172

164173
pendingFutures.clear();
165174

175+
unsubscribeStatusFuture.complete(unsubscribedStatus);
176+
166177
logger.debug("Fetcher thread exit.");
167178
}
168179
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.io.Closeable;
88
import java.util.Collection;
99
import java.util.Map;
10-
import java.util.Objects;
10+
import java.util.concurrent.CompletableFuture;
1111
import java.util.concurrent.ExecutorService;
1212
import java.util.concurrent.TimeUnit;
1313
import java.util.regex.Pattern;
@@ -67,11 +67,15 @@ int code() {
6767
* Subscribe some Kafka topics to consume records from them.
6868
*
6969
* @param topics the topics to consume.
70+
* @return a {@link CompletableFuture} which will be complete when the internal
71+
* {@link org.apache.kafka.clients.consumer.KafkaConsumer} unsubscribed all the topics that have subscribed to
72+
* due to an error occurred or {@link LcKafkaConsumer#close()} was called. If it is due to an error occurred, this
73+
* {@code LcKafkaConsumer} will be closed before the returned {@code CompletableFuture} complete.
7074
* @throws IllegalStateException if this {@code LcKafkaConsumer} has closed or subscribed to some topics
7175
* @throws NullPointerException if the input {@code topics} is null
7276
* @throws IllegalArgumentException if the input {@code topics} is empty or contains null or empty topic
7377
*/
74-
public synchronized void subscribe(Collection<String> topics) {
78+
public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Collection<String> topics) {
7579
requireNonNull(topics, "topics");
7680

7781
if (topics.isEmpty()) {
@@ -91,6 +95,8 @@ public synchronized void subscribe(Collection<String> topics) {
9195
fetcherThread.start();
9296

9397
state = State.SUBSCRIBED;
98+
99+
return setupUnsubscribedFuture();
94100
}
95101

96102
/**
@@ -100,10 +106,14 @@ public synchronized void subscribe(Collection<String> topics) {
100106
* the max metadata age, the consumer will refresh metadata more often and check for matching topics.
101107
*
102108
* @param pattern {@link Pattern} to subscribe to.
103-
* @throws IllegalStateException if this {@code LcKafkaConsumer} has closed or subscribed to some topics
104-
* @throws NullPointerException if the input {@code pattern} is null
109+
* @return a {@link CompletableFuture} which will be complete when the internal
110+
* {@link org.apache.kafka.clients.consumer.KafkaConsumer} unsubscribed all the topics that have subscribed to
111+
* due to an error occurred or {@link LcKafkaConsumer#close()} was called. If it is due to an error occurred, this
112+
* {@code LcKafkaConsumer} will be closed before the returned {@code CompletableFuture} complete.
113+
* @throws IllegalStateException if this {@code LcKafkaConsumer} has closed or subscribed to some topics
114+
* @throws NullPointerException if the input {@code pattern} is null
105115
*/
106-
public synchronized void subscribe(Pattern pattern) {
116+
public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Pattern pattern) {
107117
requireNonNull(pattern, "pattern");
108118

109119
ensureInInit();
@@ -114,6 +124,8 @@ public synchronized void subscribe(Pattern pattern) {
114124
fetcherThread.start();
115125

116126
state = State.SUBSCRIBED;
127+
128+
return setupUnsubscribedFuture();
117129
}
118130

119131
/**
@@ -139,9 +151,11 @@ public void close() {
139151
state = State.CLOSED;
140152
}
141153

142-
fetcher.close();
143154
try {
144-
fetcherThread.join();
155+
if (Thread.currentThread() != fetcherThread) {
156+
fetcher.close();
157+
fetcherThread.join();
158+
}
145159
consumer.close();
146160
if (shutdownWorkerPoolOnStop) {
147161
workerPool.shutdown();
@@ -152,21 +166,37 @@ public void close() {
152166
}
153167
}
154168

169+
@VisibleForTesting
155170
boolean subscribed() {
156171
return state.code() > State.INIT.code();
157172
}
158173

174+
@VisibleForTesting
159175
boolean closed() {
160176
return state == State.CLOSED;
161177
}
162178

179+
@VisibleForTesting
163180
CommitPolicy<K, V> policy() {
164181
return policy;
165182
}
166183

184+
private CompletableFuture<UnsubscribedStatus> setupUnsubscribedFuture() {
185+
assert !fetcher.unsubscribeStatusFuture().isDone();
186+
187+
// use a new CompletableFuture to ensure that close() should be called first, then do the pipeline bind to the
188+
// returned CompletableFuture
189+
final CompletableFuture<UnsubscribedStatus> ret = new CompletableFuture<>();
190+
fetcher.unsubscribeStatusFuture().thenAccept(status -> {
191+
close();
192+
ret.complete(status);
193+
});
194+
return ret;
195+
}
196+
167197
private void ensureInInit() {
168198
if (subscribed() || closed()) {
169-
throw new IllegalStateException("client is in " + state + " state. expect: " + State.INIT);
199+
throw new IllegalStateException("consumer is closed or have subscribed to some topics or pattern");
170200
}
171201
}
172202

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ private Consumer<K, V> buildConsumer(boolean autoCommit) {
484484
return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer);
485485
}
486486

487-
Duration getRecommitInterval() {
487+
private Duration getRecommitInterval() {
488488
if (recommitInterval == null) {
489489
logger.warn("Recommit interval is not set for a non-automatic commit consumer, the default " +
490490
"interval of 1 hour will be used.");

src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,12 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
4747
}
4848
}
4949

50+
@VisibleForTesting
5051
int pendingAsyncCommitCount() {
5152
return pendingAsyncCommitCounter;
5253
}
5354

55+
@VisibleForTesting
5456
boolean forceSync() {
5557
return forceSync;
5658
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
5+
/**
6+
* A status of {@link LcKafkaConsumer} when it unsubscribed to all the topics that have subscribed to.
7+
*/
8+
public enum UnsubscribedStatus {
9+
/**
10+
* Indicate that the unsubscription is due to {@link LcKafkaConsumer#close()} was called.
11+
* Usually, this is an expected status.
12+
*/
13+
CLOSED,
14+
15+
/**
16+
* Indicate that the unsubscription is due to an error occurred, maybe an exception was thrown from
17+
* {@link ConsumerRecordHandler#handleRecord(ConsumerRecord)}, or some unrecoverable error happened
18+
* within {@link org.apache.kafka.clients.consumer.KafkaConsumer}. You can check the specific exception
19+
* in log.
20+
*/
21+
ERROR
22+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
/**
4+
* Indicates that the visibility of a type or member has been relaxed to make the code testable. Idea borrowed from
5+
* Guava
6+
*
7+
*/
8+
public @interface VisibleForTesting {}
9+

src/test/java/cn/leancloud/kafka/consumer/FetcherTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.HashSet;
1313
import java.util.List;
1414
import java.util.Set;
15+
import java.util.concurrent.CompletableFuture;
1516
import java.util.concurrent.CyclicBarrier;
1617
import java.util.concurrent.ExecutorService;
1718
import java.util.concurrent.Executors;
@@ -62,6 +63,7 @@ public void tearDown() throws Exception {
6263
public void testGracefulShutdown() throws Exception {
6364
executorService = mock(ExecutorService.class);
6465
fetcher = new Fetcher<>(consumer, pollTimeout, consumerRecordHandler, executorService, policy, 0);
66+
final CompletableFuture<UnsubscribedStatus> unsubscribedStatusFuture = fetcher.unsubscribeStatusFuture();
6567
fetcherThread = new Thread(fetcher);
6668

6769
doNothing().when(executorService).execute(any(Runnable.class));
@@ -76,11 +78,13 @@ public void testGracefulShutdown() throws Exception {
7678
assertThat(fetcher.pendingFutures()).isEmpty();
7779
verify(policy, times(1)).partialCommit();
7880
assertThat(consumer.subscription()).isEmpty();
81+
assertThat(unsubscribedStatusFuture).isCompletedWithValue(UnsubscribedStatus.CLOSED);
7982
}
8083

8184
@Test
8285
public void testHandleMsgFailed() throws Exception {
8386
fetcher = new Fetcher<>(consumer, pollTimeout, consumerRecordHandler, executorService, policy, 10_000);
87+
final CompletableFuture<UnsubscribedStatus> unsubscribedStatusFuture = fetcher.unsubscribeStatusFuture();
8488
fetcherThread = new Thread(fetcher);
8589

8690
assignPartitions(consumer, toPartitions(singletonList(0)), 0L);
@@ -96,6 +100,7 @@ public void testHandleMsgFailed() throws Exception {
96100
verify(policy, never()).completeRecord(any());
97101
verify(policy, never()).tryCommit(anyBoolean());
98102
verify(consumerRecordHandler, times(1)).handleRecord(defaultTestingRecord);
103+
assertThat(unsubscribedStatusFuture).isCompletedWithValue(UnsubscribedStatus.ERROR);
99104
}
100105

101106
@Test

0 commit comments

Comments
 (0)