Skip to content

Commit d19f2ec

Browse files
authored
Merge pull request #9 from leancloud/feat/handle-timeout
handle record timeout
2 parents 41a2b30 + cb8a8e5 commit d19f2ec

File tree

10 files changed

+718
-84
lines changed

10 files changed

+718
-84
lines changed

src/main/java/cn/leancloud/kafka/consumer/CatchAllExceptionConsumerRecordHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import java.util.function.BiConsumer;
88

9+
import static java.util.Objects.requireNonNull;
10+
911
/**
1012
* A wrapper over a {@link ConsumerRecordHandler} to catch and swallow all the exceptions throw from the
1113
* wrapped {@code ConsumerRecordHandler} when it failed to handle a consumed record.
@@ -31,6 +33,7 @@ public final class CatchAllExceptionConsumerRecordHandler<K, V> implements Consu
3133
* the wrapped handler failed on calling {@link ConsumerRecordHandler#handleRecord(ConsumerRecord)}.
3234
*
3335
* @param wrappedHandler the wrapped {@link ConsumerRecordHandler}.
36+
* @throws NullPointerException when {@code wrappedHandler} is null
3437
*/
3538
public CatchAllExceptionConsumerRecordHandler(ConsumerRecordHandler<K, V> wrappedHandler) {
3639
this(wrappedHandler, (record, throwable) -> logger.error("Handle kafka consumer record: " + record + " failed.", throwable));
@@ -43,9 +46,12 @@ public CatchAllExceptionConsumerRecordHandler(ConsumerRecordHandler<K, V> wrappe
4346
* @param wrappedHandler the wrapped {@link ConsumerRecordHandler}.
4447
* @param errorConsumer a {@link BiConsumer} to consume the failed record and the exception thrown from
4548
* the {@link ConsumerRecordHandler#handleRecord(ConsumerRecord)}
49+
* @throws NullPointerException when {@code wrappedHandler} or {@code errorConsumer} is null
4650
*/
4751
public CatchAllExceptionConsumerRecordHandler(ConsumerRecordHandler<K, V> wrappedHandler,
4852
BiConsumer<ConsumerRecord<K, V>, Throwable> errorConsumer) {
53+
requireNonNull(wrappedHandler, "wrappedHandler");
54+
requireNonNull(errorConsumer, "errorConsumer");
4955
this.wrappedHandler = wrappedHandler;
5056
this.errorConsumer = errorConsumer;
5157
}

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 164 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import org.apache.kafka.clients.consumer.ConsumerRecords;
66
import org.apache.kafka.common.TopicPartition;
77
import org.apache.kafka.common.errors.WakeupException;
8+
import org.apache.kafka.common.utils.Time;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

1112
import java.io.Closeable;
13+
import java.time.Duration;
1214
import java.util.HashMap;
1315
import java.util.Map;
1416
import java.util.Set;
@@ -17,53 +19,119 @@
1719
final class Fetcher<K, V> implements Runnable, Closeable {
1820
private static final Logger logger = LoggerFactory.getLogger(Fetcher.class);
1921

20-
private final long pollTimeout;
22+
@VisibleForTesting
23+
static class TimeoutFuture<K, V> implements Future<ConsumerRecord<K, V>> {
24+
private final Future<ConsumerRecord<K, V>> wrappedFuture;
25+
private final long timeoutAtNanos;
26+
private final Time time;
27+
28+
TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture) {
29+
this(wrappedFuture, Long.MAX_VALUE);
30+
}
31+
32+
TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture, long timeoutInNanos) {
33+
this(wrappedFuture, timeoutInNanos, Time.SYSTEM);
34+
}
35+
36+
TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture, long timeoutInNanos, Time time) {
37+
assert timeoutInNanos >= 0;
38+
this.wrappedFuture = wrappedFuture;
39+
long timeoutAtNanos = time.nanoseconds() + timeoutInNanos;
40+
if (timeoutAtNanos < 0) {
41+
timeoutAtNanos = Long.MAX_VALUE;
42+
}
43+
this.timeoutAtNanos = timeoutAtNanos;
44+
this.time = time;
45+
}
46+
47+
@Override
48+
public boolean cancel(boolean mayInterruptIfRunning) {
49+
return wrappedFuture.cancel(mayInterruptIfRunning);
50+
}
51+
52+
@Override
53+
public boolean isCancelled() {
54+
return wrappedFuture.isCancelled();
55+
}
56+
57+
@Override
58+
public boolean isDone() {
59+
return wrappedFuture.isDone();
60+
}
61+
62+
@Override
63+
public ConsumerRecord<K, V> get() throws InterruptedException, ExecutionException {
64+
return wrappedFuture.get();
65+
}
66+
67+
@Override
68+
public ConsumerRecord<K, V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
69+
// if it is already timeout, throw exception immediately
70+
if (timeout()) {
71+
throw new TimeoutException();
72+
}
73+
74+
final long timeoutNanos = Math.max(0, Math.min(unit.toNanos(timeout), timeoutAtNanos - time.nanoseconds()));
75+
return wrappedFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
76+
}
77+
78+
boolean timeout() {
79+
return time.nanoseconds() >= timeoutAtNanos;
80+
}
81+
}
82+
83+
private final long pollTimeoutMillis;
2184
private final Consumer<K, V> consumer;
2285
private final ConsumerRecordHandler<K, V> handler;
2386
private final ExecutorCompletionService<ConsumerRecord<K, V>> service;
2487
private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures;
2588
private final CommitPolicy<K, V> policy;
26-
private final long gracefulShutdownMillis;
89+
private final long gracefulShutdownTimeoutNanos;
2790
private final CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture;
91+
private final long handleRecordTimeoutNanos;
2892
private volatile boolean closed;
2993

3094
Fetcher(LcKafkaConsumerBuilder<K, V> consumerBuilder) {
3195
this(consumerBuilder.getConsumer(), consumerBuilder.getPollTimeout(), consumerBuilder.getConsumerRecordHandler(),
32-
consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.gracefulShutdownMillis());
96+
consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.gracefulShutdownTimeout(),
97+
consumerBuilder.handleRecordTimeout());
3398
}
3499

35100
Fetcher(Consumer<K, V> consumer,
36-
long pollTimeout,
101+
Duration pollTimeout,
37102
ConsumerRecordHandler<K, V> handler,
38103
ExecutorService workerPool,
39104
CommitPolicy<K, V> policy,
40-
long gracefulShutdownMillis) {
105+
Duration gracefulShutdownTimeout,
106+
Duration handleRecordTimeout) {
41107
this.pendingFutures = new HashMap<>();
42108
this.consumer = consumer;
43-
this.pollTimeout = pollTimeout;
109+
this.pollTimeoutMillis = pollTimeout.toMillis();
44110
this.handler = handler;
45111
this.service = new ExecutorCompletionService<>(workerPool);
46112
this.policy = policy;
47-
this.gracefulShutdownMillis = gracefulShutdownMillis;
113+
this.gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.toNanos();
48114
this.unsubscribeStatusFuture = new CompletableFuture<>();
115+
this.handleRecordTimeoutNanos = handleRecordTimeout.toNanos();
49116
}
50117

51118
@Override
52119
public void run() {
53120
logger.debug("Fetcher thread started.");
54-
final long pollTimeout = this.pollTimeout;
121+
final long pollTimeoutMillis = this.pollTimeoutMillis;
55122
final Consumer<K, V> consumer = this.consumer;
56123
UnsubscribedStatus unsubscribedStatus = UnsubscribedStatus.CLOSED;
57124
while (true) {
58125
try {
59-
final ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
126+
final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMillis);
60127

61128
if (logger.isDebugEnabled()) {
62129
logger.debug("Fetched " + records.count() + " records from: " + records.partitions());
63130
}
64131

65132
dispatchFetchedRecords(records);
66133
processCompletedRecords();
134+
processTimeoutRecords();
67135

68136
if (!pendingFutures.isEmpty() && !records.isEmpty()) {
69137
consumer.pause(records.partitions());
@@ -74,7 +142,15 @@ public void run() {
74142
if (closed()) {
75143
break;
76144
}
145+
} catch (ExecutionException ex) {
146+
unsubscribedStatus = UnsubscribedStatus.ERROR;
147+
close();
148+
break;
77149
} catch (Exception ex) {
150+
if (ex instanceof InterruptedException) {
151+
Thread.currentThread().interrupt();
152+
}
153+
78154
unsubscribedStatus = UnsubscribedStatus.ERROR;
79155
close();
80156
logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", ex);
@@ -83,6 +159,7 @@ public void run() {
83159
}
84160

85161
gracefulShutdown(unsubscribedStatus);
162+
logger.debug("Fetcher thread exit.");
86163
}
87164

88165
@Override
@@ -111,19 +188,49 @@ private void dispatchFetchedRecords(ConsumerRecords<K, V> records) {
111188
handler.handleRecord(record);
112189
return record;
113190
});
114-
pendingFutures.put(record, future);
191+
pendingFutures.put(record, timeoutAwareFuture(future));
115192
policy.addPendingRecord(record);
116193
}
117194
}
118195

196+
private TimeoutFuture<K, V> timeoutAwareFuture(Future<ConsumerRecord<K, V>> future) {
197+
if (unlimitedHandleRecordTime()) {
198+
return new TimeoutFuture<>(future);
199+
} else {
200+
return new TimeoutFuture<>(future, handleRecordTimeoutNanos);
201+
}
202+
}
203+
119204
private void processCompletedRecords() throws InterruptedException, ExecutionException {
120205
Future<ConsumerRecord<K, V>> f;
121206
while ((f = service.poll()) != null) {
122-
assert f.isDone();
123-
final ConsumerRecord<K, V> r = f.get();
124-
final Future<ConsumerRecord<K, V>> v = pendingFutures.remove(r);
125-
assert v != null;
126-
policy.completeRecord(r);
207+
processCompletedRecord(f);
208+
}
209+
}
210+
211+
private void processCompletedRecord(Future<ConsumerRecord<K, V>> future) throws InterruptedException, ExecutionException {
212+
assert future.isDone();
213+
final ConsumerRecord<K, V> record = future.get();
214+
assert record != null;
215+
assert !future.isCancelled();
216+
final Future<ConsumerRecord<K, V>> v = pendingFutures.remove(record);
217+
assert v != null;
218+
policy.completeRecord(record);
219+
}
220+
221+
private void processTimeoutRecords() throws TimeoutException {
222+
if (unlimitedHandleRecordTime()) {
223+
return;
224+
}
225+
226+
for (Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry : pendingFutures.entrySet()) {
227+
final TimeoutFuture<K, V> future = (TimeoutFuture<K, V>) entry.getValue();
228+
if (future.timeout()) {
229+
future.cancel(false);
230+
// do not wait for it again on graceful shutdown
231+
pendingFutures.remove(entry.getKey(), entry.getValue());
232+
throw new TimeoutException("timeout on handling record: " + entry.getKey());
233+
}
127234
}
128235
}
129236

@@ -143,37 +250,54 @@ private void tryCommitRecordOffsets() {
143250
}
144251
}
145252

253+
private boolean unlimitedHandleRecordTime() {
254+
return handleRecordTimeoutNanos == 0;
255+
}
256+
146257
private void gracefulShutdown(UnsubscribedStatus unsubscribedStatus) {
147-
final long start = System.currentTimeMillis();
148-
long remain = gracefulShutdownMillis;
258+
long shutdownTimeout = 0L;
149259
try {
150-
consumer.unsubscribe();
151-
for (Future<ConsumerRecord<K, V>> future : pendingFutures.values()) {
152-
try {
153-
if (remain > 0) {
154-
future.get(remain, TimeUnit.MILLISECONDS);
155-
remain = gracefulShutdownMillis - (System.currentTimeMillis() - start);
156-
} else {
157-
future.cancel(false);
158-
}
159-
} catch (TimeoutException ex) {
160-
remain = 0;
161-
}
260+
shutdownTimeout = waitPendingFuturesDone();
261+
policy.partialCommit();
262+
pendingFutures.clear();
263+
} catch (Exception ex) {
264+
logger.error("Graceful shutdown got unexpected exception", ex);
265+
} finally {
266+
try {
267+
consumer.close(shutdownTimeout, TimeUnit.NANOSECONDS);
268+
} finally {
269+
unsubscribeStatusFuture.complete(unsubscribedStatus);
162270
}
163-
processCompletedRecords();
164-
} catch (InterruptedException ex) {
165-
logger.warn("Graceful shutdown was interrupted.");
166-
Thread.currentThread().interrupt();
167-
} catch (ExecutionException ex) {
168-
logger.error("Handle message got unexpected exception. Continue shutdown without wait handling message done.", ex);
169271
}
272+
}
170273

171-
policy.partialCommit();
172-
173-
pendingFutures.clear();
274+
private long waitPendingFuturesDone() {
275+
final long start = System.nanoTime();
276+
long remain = gracefulShutdownTimeoutNanos;
174277

175-
unsubscribeStatusFuture.complete(unsubscribedStatus);
278+
for (Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry : pendingFutures.entrySet()) {
279+
final Future<ConsumerRecord<K, V>> future = entry.getValue();
280+
try {
281+
assert remain >= 0;
282+
final ConsumerRecord<K, V> record = future.get(remain, TimeUnit.MILLISECONDS);
283+
assert record != null;
284+
policy.completeRecord(record);
285+
} catch (TimeoutException ex) {
286+
future.cancel(false);
287+
} catch (InterruptedException ex) {
288+
future.cancel(false);
289+
Thread.currentThread().interrupt();
290+
} catch (CancellationException ex) {
291+
// ignore
292+
} catch (ExecutionException ex) {
293+
logger.error("Fetcher quit with unexpected exception on handling consumer record: " + entry.getKey(), ex.getCause());
294+
} finally {
295+
if (remain >= 0) {
296+
remain = Math.max(0, gracefulShutdownTimeoutNanos - (System.nanoTime() - start));
297+
}
298+
}
299+
}
176300

177-
logger.debug("Fetcher thread exit.");
301+
return remain;
178302
}
179303
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ public void close() {
156156
fetcher.close();
157157
fetcherThread.join();
158158
}
159-
consumer.close();
159+
160+
// consumer was closed in fetcher
160161
if (shutdownWorkerPoolOnStop) {
161162
workerPool.shutdown();
162163
workerPool.awaitTermination(1, TimeUnit.DAYS);

0 commit comments

Comments
 (0)