Skip to content

Commit 431c3cc

Browse files
committed
(fix) retrieable handler
1 parent 0083f5e commit 431c3cc

File tree

4 files changed

+34
-19
lines changed

4 files changed

+34
-19
lines changed

pom.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,6 @@
174174
<version>3.0.1</version>
175175
<configuration>
176176
<source>8</source>
177-
<!-- sorry for get rid of doc warnings because we lack so much of docs... :tada -->
178-
<additionalOptions>-Xdoclint:none</additionalOptions>
179-
<additionalJOption>-Xdoclint:none</additionalJOption>
180177
</configuration>
181178
<executions>
182179
<execution>

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public synchronized void subscribe(Collection<String> topics) {
8585

8686
/**
8787
* Get the metrics kept by the consumer
88+
*
89+
* @return the metrics kept by the consumer
8890
*/
8991
public Map<MetricName, ? extends Metric> metrics() {
9092
return consumer.metrics();

src/main/java/cn/leancloud/kafka/consumer/RetriableConsumerRecordHandler.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,22 @@ public RetriableConsumerRecordHandler(ConsumerRecordHandler<K, V> wrappedHandler
5353
@Override
5454
public void handleRecord(ConsumerRecord<K, V> record) {
5555
Exception lastException = null;
56-
int tried = 0;
57-
while (tried++ < maxRetryTimes) {
56+
int retried = 0;
57+
while (retried <= maxRetryTimes) {
5858
try {
5959
wrappedHandler.handleRecord(record);
6060
return;
6161
} catch (Exception ex) {
6262
lastException = ex;
63-
if (tried != maxRetryTimes) {
64-
try {
65-
Thread.sleep(retryInterval.toMillis());
66-
} catch (InterruptedException e) {
67-
Thread.currentThread().interrupt();
68-
break;
69-
}
63+
try {
64+
Thread.sleep(retryInterval.toMillis());
65+
} catch (InterruptedException e) {
66+
// keep the interrupt status and still retry for the next time
67+
// because interrupt means we can't block this thread, but
68+
// it does not mean we should quit our job
69+
Thread.currentThread().interrupt();
7070
}
71+
++retried;
7172
}
7273
}
7374

src/test/java/cn/leancloud/kafka/consumer/RetriableConsumerRecordHandlerTest.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void testInvalidRetryTimes() {
3434
@Test
3535
public void testRetry() {
3636
final Exception expectedEx = new RuntimeException();
37-
final int retryTimes = 10;
37+
final int retryTimes = 9;
3838

3939
doThrow(expectedEx).when(wrappedHandler).handleRecord(testingRecord);
4040

@@ -44,7 +44,7 @@ public void testRetry() {
4444
.isInstanceOf(HandleMessageFailedException.class)
4545
.hasCause(expectedEx);
4646

47-
verify(wrappedHandler, times(retryTimes)).handleRecord(testingRecord);
47+
verify(wrappedHandler, times(retryTimes + 1)).handleRecord(testingRecord);
4848
}
4949

5050
@Test
@@ -61,8 +61,8 @@ public void testNoRetry() {
6161
}
6262

6363
@Test
64-
public void testRetrySuccess() {
65-
final Exception expectedEx = new RuntimeException();
64+
public void testRetryToSuccess() {
65+
final Exception expectedEx = new RuntimeException("intended exception");
6666
final int retryTimes = 10;
6767

6868
doThrow(expectedEx).doNothing().when(wrappedHandler).handleRecord(testingRecord);
@@ -75,7 +75,7 @@ public void testRetrySuccess() {
7575
}
7676

7777
@Test
78-
public void testInterruptInRetryInterval() throws Exception {
78+
public void testInterruptInRetryIntervalAndFailForever() throws Exception {
7979
final Exception expectedEx = new RuntimeException();
8080
final int retryTimes = 10;
8181

@@ -87,8 +87,23 @@ public void testInterruptInRetryInterval() throws Exception {
8787
.isInstanceOf(HandleMessageFailedException.class)
8888
.hasCause(expectedEx);
8989

90-
assertThat(Thread.currentThread().isInterrupted()).isTrue();
91-
verify(wrappedHandler, times(1)).handleRecord(testingRecord);
90+
assertThat(Thread.interrupted()).isTrue();
91+
verify(wrappedHandler, times(retryTimes + 1)).handleRecord(testingRecord);
92+
}
93+
94+
@Test
95+
public void testInterruptInRetryIntervalAndRetryToSuccess() throws Exception {
96+
final Exception expectedEx = new RuntimeException("intended exception");
97+
final int retryTimes = 2;
98+
99+
doThrow(expectedEx).doThrow(expectedEx).doNothing().when(wrappedHandler).handleRecord(testingRecord);
100+
101+
final ConsumerRecordHandler<Object, Object> handler = new RetriableConsumerRecordHandler<>(wrappedHandler, retryTimes, Duration.ofHours(1));
102+
interruptAfter(Thread.currentThread(), 200);
103+
handler.handleRecord(testingRecord);
104+
105+
assertThat(Thread.interrupted()).isTrue();
106+
verify(wrappedHandler, times(3)).handleRecord(testingRecord);
92107
}
93108

94109
private void interruptAfter(Thread threadToInterrupt, long triggerDelayMillis) throws Exception {

0 commit comments

Comments
 (0)