55import org .apache .kafka .clients .consumer .ConsumerRecords ;
66import org .apache .kafka .common .TopicPartition ;
77import org .apache .kafka .common .errors .WakeupException ;
8+ import org .apache .kafka .common .utils .Time ;
89import org .slf4j .Logger ;
910import org .slf4j .LoggerFactory ;
1011
1112import java .io .Closeable ;
13+ import java .time .Duration ;
1214import java .util .HashMap ;
1315import java .util .Map ;
1416import java .util .Set ;
1719final class Fetcher <K , V > implements Runnable , Closeable {
1820 private static final Logger logger = LoggerFactory .getLogger (Fetcher .class );
1921
20- private final long pollTimeout ;
22+ @ VisibleForTesting
23+ static class TimeoutFuture <K , V > implements Future <ConsumerRecord <K , V >> {
24+ private final Future <ConsumerRecord <K , V >> wrappedFuture ;
25+ private final long timeoutAtNanos ;
26+ private final Time time ;
27+
28+ TimeoutFuture (Future <ConsumerRecord <K , V >> wrappedFuture ) {
29+ this (wrappedFuture , Long .MAX_VALUE );
30+ }
31+
32+ TimeoutFuture (Future <ConsumerRecord <K , V >> wrappedFuture , long timeoutInNanos ) {
33+ this (wrappedFuture , timeoutInNanos , Time .SYSTEM );
34+ }
35+
36+ TimeoutFuture (Future <ConsumerRecord <K , V >> wrappedFuture , long timeoutInNanos , Time time ) {
37+ assert timeoutInNanos >= 0 ;
38+ this .wrappedFuture = wrappedFuture ;
39+ long timeoutAtNanos = time .nanoseconds () + timeoutInNanos ;
40+ if (timeoutAtNanos < 0 ) {
41+ timeoutAtNanos = Long .MAX_VALUE ;
42+ }
43+ this .timeoutAtNanos = timeoutAtNanos ;
44+ this .time = time ;
45+ }
46+
47+ @ Override
48+ public boolean cancel (boolean mayInterruptIfRunning ) {
49+ return wrappedFuture .cancel (mayInterruptIfRunning );
50+ }
51+
52+ @ Override
53+ public boolean isCancelled () {
54+ return wrappedFuture .isCancelled ();
55+ }
56+
57+ @ Override
58+ public boolean isDone () {
59+ return wrappedFuture .isDone ();
60+ }
61+
62+ @ Override
63+ public ConsumerRecord <K , V > get () throws InterruptedException , ExecutionException {
64+ return wrappedFuture .get ();
65+ }
66+
67+ @ Override
68+ public ConsumerRecord <K , V > get (long timeout , TimeUnit unit ) throws InterruptedException , ExecutionException , TimeoutException {
69+ // if it is already timeout, throw exception immediately
70+ if (timeout ()) {
71+ throw new TimeoutException ();
72+ }
73+
74+ final long timeoutNanos = Math .max (0 , Math .min (unit .toNanos (timeout ), timeoutAtNanos - time .nanoseconds ()));
75+ return wrappedFuture .get (timeoutNanos , TimeUnit .NANOSECONDS );
76+ }
77+
78+ boolean timeout () {
79+ return time .nanoseconds () >= timeoutAtNanos ;
80+ }
81+ }
82+
83+ private final long pollTimeoutMillis ;
2184 private final Consumer <K , V > consumer ;
2285 private final ConsumerRecordHandler <K , V > handler ;
2386 private final ExecutorCompletionService <ConsumerRecord <K , V >> service ;
2487 private final Map <ConsumerRecord <K , V >, Future <ConsumerRecord <K , V >>> pendingFutures ;
2588 private final CommitPolicy <K , V > policy ;
26- private final long gracefulShutdownMillis ;
89+ private final long gracefulShutdownTimeoutNanos ;
2790 private final CompletableFuture <UnsubscribedStatus > unsubscribeStatusFuture ;
91+ private final long handleRecordTimeoutNanos ;
2892 private volatile boolean closed ;
2993
3094 Fetcher (LcKafkaConsumerBuilder <K , V > consumerBuilder ) {
3195 this (consumerBuilder .getConsumer (), consumerBuilder .getPollTimeout (), consumerBuilder .getConsumerRecordHandler (),
32- consumerBuilder .getWorkerPool (), consumerBuilder .getPolicy (), consumerBuilder .gracefulShutdownMillis ());
96+ consumerBuilder .getWorkerPool (), consumerBuilder .getPolicy (), consumerBuilder .gracefulShutdownTimeout (),
97+ consumerBuilder .handleRecordTimeout ());
3398 }
3499
35100 Fetcher (Consumer <K , V > consumer ,
36- long pollTimeout ,
101+ Duration pollTimeout ,
37102 ConsumerRecordHandler <K , V > handler ,
38103 ExecutorService workerPool ,
39104 CommitPolicy <K , V > policy ,
40- long gracefulShutdownMillis ) {
105+ Duration gracefulShutdownTimeout ,
106+ Duration handleRecordTimeout ) {
41107 this .pendingFutures = new HashMap <>();
42108 this .consumer = consumer ;
43- this .pollTimeout = pollTimeout ;
109+ this .pollTimeoutMillis = pollTimeout . toMillis () ;
44110 this .handler = handler ;
45111 this .service = new ExecutorCompletionService <>(workerPool );
46112 this .policy = policy ;
47- this .gracefulShutdownMillis = gracefulShutdownMillis ;
113+ this .gracefulShutdownTimeoutNanos = gracefulShutdownTimeout . toNanos () ;
48114 this .unsubscribeStatusFuture = new CompletableFuture <>();
115+ this .handleRecordTimeoutNanos = handleRecordTimeout .toNanos ();
49116 }
50117
51118 @ Override
52119 public void run () {
53120 logger .debug ("Fetcher thread started." );
54- final long pollTimeout = this .pollTimeout ;
121+ final long pollTimeoutMillis = this .pollTimeoutMillis ;
55122 final Consumer <K , V > consumer = this .consumer ;
56123 UnsubscribedStatus unsubscribedStatus = UnsubscribedStatus .CLOSED ;
57124 while (true ) {
58125 try {
59- final ConsumerRecords <K , V > records = consumer .poll (pollTimeout );
126+ final ConsumerRecords <K , V > records = consumer .poll (pollTimeoutMillis );
60127
61128 if (logger .isDebugEnabled ()) {
62129 logger .debug ("Fetched " + records .count () + " records from: " + records .partitions ());
63130 }
64131
65132 dispatchFetchedRecords (records );
66133 processCompletedRecords ();
134+ processTimeoutRecords ();
67135
68136 if (!pendingFutures .isEmpty () && !records .isEmpty ()) {
69137 consumer .pause (records .partitions ());
@@ -74,7 +142,15 @@ public void run() {
74142 if (closed ()) {
75143 break ;
76144 }
145+ } catch (ExecutionException ex ) {
146+ unsubscribedStatus = UnsubscribedStatus .ERROR ;
147+ close ();
148+ break ;
77149 } catch (Exception ex ) {
150+ if (ex instanceof InterruptedException ) {
151+ Thread .currentThread ().interrupt ();
152+ }
153+
78154 unsubscribedStatus = UnsubscribedStatus .ERROR ;
79155 close ();
80156 logger .error ("Fetcher quit with unexpected exception. Will rebalance after poll timeout." , ex );
@@ -83,6 +159,7 @@ public void run() {
83159 }
84160
85161 gracefulShutdown (unsubscribedStatus );
162+ logger .debug ("Fetcher thread exit." );
86163 }
87164
88165 @ Override
@@ -111,19 +188,49 @@ private void dispatchFetchedRecords(ConsumerRecords<K, V> records) {
111188 handler .handleRecord (record );
112189 return record ;
113190 });
114- pendingFutures .put (record , future );
191+ pendingFutures .put (record , timeoutAwareFuture ( future ) );
115192 policy .addPendingRecord (record );
116193 }
117194 }
118195
196+ private TimeoutFuture <K , V > timeoutAwareFuture (Future <ConsumerRecord <K , V >> future ) {
197+ if (unlimitedHandleRecordTime ()) {
198+ return new TimeoutFuture <>(future );
199+ } else {
200+ return new TimeoutFuture <>(future , handleRecordTimeoutNanos );
201+ }
202+ }
203+
119204 private void processCompletedRecords () throws InterruptedException , ExecutionException {
120205 Future <ConsumerRecord <K , V >> f ;
121206 while ((f = service .poll ()) != null ) {
122- assert f .isDone ();
123- final ConsumerRecord <K , V > r = f .get ();
124- final Future <ConsumerRecord <K , V >> v = pendingFutures .remove (r );
125- assert v != null ;
126- policy .completeRecord (r );
207+ processCompletedRecord (f );
208+ }
209+ }
210+
211+ private void processCompletedRecord (Future <ConsumerRecord <K , V >> future ) throws InterruptedException , ExecutionException {
212+ assert future .isDone ();
213+ final ConsumerRecord <K , V > record = future .get ();
214+ assert record != null ;
215+ assert !future .isCancelled ();
216+ final Future <ConsumerRecord <K , V >> v = pendingFutures .remove (record );
217+ assert v != null ;
218+ policy .completeRecord (record );
219+ }
220+
221+ private void processTimeoutRecords () throws TimeoutException {
222+ if (unlimitedHandleRecordTime ()) {
223+ return ;
224+ }
225+
226+ for (Map .Entry <ConsumerRecord <K , V >, Future <ConsumerRecord <K , V >>> entry : pendingFutures .entrySet ()) {
227+ final TimeoutFuture <K , V > future = (TimeoutFuture <K , V >) entry .getValue ();
228+ if (future .timeout ()) {
229+ future .cancel (false );
230+ // do not wait for it again on graceful shutdown
231+ pendingFutures .remove (entry .getKey (), entry .getValue ());
232+ throw new TimeoutException ("timeout on handling record: " + entry .getKey ());
233+ }
127234 }
128235 }
129236
@@ -143,37 +250,54 @@ private void tryCommitRecordOffsets() {
143250 }
144251 }
145252
253+ private boolean unlimitedHandleRecordTime () {
254+ return handleRecordTimeoutNanos == 0 ;
255+ }
256+
146257 private void gracefulShutdown (UnsubscribedStatus unsubscribedStatus ) {
147- final long start = System .currentTimeMillis ();
148- long remain = gracefulShutdownMillis ;
258+ long shutdownTimeout = 0L ;
149259 try {
150- consumer .unsubscribe ();
151- for (Future <ConsumerRecord <K , V >> future : pendingFutures .values ()) {
152- try {
153- if (remain > 0 ) {
154- future .get (remain , TimeUnit .MILLISECONDS );
155- remain = gracefulShutdownMillis - (System .currentTimeMillis () - start );
156- } else {
157- future .cancel (false );
158- }
159- } catch (TimeoutException ex ) {
160- remain = 0 ;
161- }
260+ shutdownTimeout = waitPendingFuturesDone ();
261+ policy .partialCommit ();
262+ pendingFutures .clear ();
263+ } catch (Exception ex ) {
264+ logger .error ("Graceful shutdown got unexpected exception" , ex );
265+ } finally {
266+ try {
267+ consumer .close (shutdownTimeout , TimeUnit .NANOSECONDS );
268+ } finally {
269+ unsubscribeStatusFuture .complete (unsubscribedStatus );
162270 }
163- processCompletedRecords ();
164- } catch (InterruptedException ex ) {
165- logger .warn ("Graceful shutdown was interrupted." );
166- Thread .currentThread ().interrupt ();
167- } catch (ExecutionException ex ) {
168- logger .error ("Handle message got unexpected exception. Continue shutdown without wait handling message done." , ex );
169271 }
272+ }
170273
171- policy . partialCommit ();
172-
173- pendingFutures . clear () ;
274+ private long waitPendingFuturesDone () {
275+ final long start = System . nanoTime ();
276+ long remain = gracefulShutdownTimeoutNanos ;
174277
175- unsubscribeStatusFuture .complete (unsubscribedStatus );
278+ for (Map .Entry <ConsumerRecord <K , V >, Future <ConsumerRecord <K , V >>> entry : pendingFutures .entrySet ()) {
279+ final Future <ConsumerRecord <K , V >> future = entry .getValue ();
280+ try {
281+ assert remain >= 0 ;
282+ final ConsumerRecord <K , V > record = future .get (remain , TimeUnit .MILLISECONDS );
283+ assert record != null ;
284+ policy .completeRecord (record );
285+ } catch (TimeoutException ex ) {
286+ future .cancel (false );
287+ } catch (InterruptedException ex ) {
288+ future .cancel (false );
289+ Thread .currentThread ().interrupt ();
290+ } catch (CancellationException ex ) {
291+ // ignore
292+ } catch (ExecutionException ex ) {
293+ logger .error ("Fetcher quit with unexpected exception on handling consumer record: " + entry .getKey (), ex .getCause ());
294+ } finally {
295+ if (remain >= 0 ) {
296+ remain = Math .max (0 , gracefulShutdownTimeoutNanos - (System .nanoTime () - start ));
297+ }
298+ }
299+ }
176300
177- logger . debug ( "Fetcher thread exit." ) ;
301+ return remain ;
178302 }
179303}
0 commit comments