Skip to content

Commit 7263c77

Browse files
committed
(fix) partial async commit
1 parent 73d0bcb commit 7263c77

File tree

5 files changed

+43
-21
lines changed

5 files changed

+43
-21
lines changed

src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
3939
if (exception != null) {
4040
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
4141
forceSync = true;
42-
4342
}
4443
});
4544
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeoutMillis(long gracefulSh
143143
return this;
144144
}
145145

146-
public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeout(Duration duration) {
147-
requireNonNull(duration);
148-
this.gracefulShutdownMillis = duration.toMillis();
146+
public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeout(Duration gracefulShutdownTimeout) {
147+
requireNonNull(gracefulShutdownTimeout, "gracefulShutdownTimeout");
148+
this.gracefulShutdownMillis = gracefulShutdownTimeout.toMillis();
149149
return this;
150150
}
151151

src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,18 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
4444
consumer.commitAsync(completedTopicOffsets, (offsets, exception) -> {
4545
--pendingAsyncCommitCounter;
4646
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
47-
final Map<TopicPartition, OffsetAndMetadata> completeOffsets =
48-
offsets == completedTopicOffsets ? new HashMap<>(offsets) : offsets;
49-
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : completeOffsets.entrySet()) {
50-
completedTopicOffsets.remove(entry.getKey(), entry.getValue());
51-
topicOffsetHighWaterMark.remove(entry.getKey(), entry.getValue().offset());
52-
}
53-
5447
if (exception != null) {
48+
// if last async commit is failed, we do not clean cached completed offsets and let next
49+
// commit be a sync commit so all the complete offsets will be committed at that time
5550
logger.warn("Failed to commit offset: " + offsets + " asynchronously", exception);
5651
forceSync = true;
52+
} else {
53+
final Map<TopicPartition, OffsetAndMetadata> completeOffsets =
54+
offsets == completedTopicOffsets ? new HashMap<>(offsets) : offsets;
55+
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : completeOffsets.entrySet()) {
56+
completedTopicOffsets.remove(entry.getKey(), entry.getValue());
57+
topicOffsetHighWaterMark.remove(entry.getKey(), entry.getValue().offset());
58+
}
5759
}
5860
});
5961
}

src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,30 @@ public void testNegativePollTimeoutMs() {
8787
.hasMessageContaining("pollTimeoutMillis");
8888
}
8989

90+
@Test
91+
public void testNullPollTimeout() {
92+
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
93+
.pollTimeout(null))
94+
.isInstanceOf(NullPointerException.class)
95+
.hasMessage("pollTimeout");
96+
}
97+
98+
@Test
99+
public void testNegativeShutdownTimeout() {
100+
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
101+
.gracefulShutdownTimeoutMillis(-1 * ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE)))
102+
.isInstanceOf(IllegalArgumentException.class)
103+
.hasMessageContaining("gracefulShutdownMillis");
104+
}
105+
106+
@Test
107+
public void testNullShutdownTimeout() {
108+
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
109+
.gracefulShutdownTimeout(null))
110+
.isInstanceOf(NullPointerException.class)
111+
.hasMessage("gracefulShutdownTimeout");
112+
}
113+
90114
@Test
91115
public void testNegativeMaxPendingAsyncCommits() {
92116
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
@@ -103,14 +127,6 @@ public void testZeroMaxPendingAsyncCommits() {
103127
.hasMessageContaining("maxPendingAsyncCommits");
104128
}
105129

106-
@Test
107-
public void testNullPollTimeout() {
108-
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
109-
.pollTimeout(null))
110-
.isInstanceOf(NullPointerException.class)
111-
.hasMessage("pollTimeout");
112-
}
113-
114130
@Test
115131
public void testNullWorkerPool() {
116132
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)

src/test/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicyTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public void testForceSyncAfterAsyncCommitFailed() {
168168

169169
policy = new PartialAsyncCommitPolicy<>(mockConsumer, 10);
170170

171+
// a failed async commit on the first time
171172
final ConsumerRecord<Object, Object> triggerFailedRecord = pendingRecords.get(0);
172173
completeRecord(triggerFailedRecord);
173174
assertThat(policy.tryCommit(true))
@@ -177,11 +178,15 @@ public void testForceSyncAfterAsyncCommitFailed() {
177178
verify(mockConsumer, times(1)).commitAsync(any(), any());
178179
verify(mockConsumer, never()).commitSync(any());
179180

181+
// sync commit after the failed async commit
180182
final ConsumerRecord<Object, Object> syncRecord = pendingRecords.get(1);
181183
completeRecord(syncRecord);
182184
assertThat(policy.tryCommit(true))
183-
.hasSize(1)
184-
.isEqualTo(Collections.singleton(new TopicPartition(syncRecord.topic(), syncRecord.partition())));
185+
.hasSize(2)
186+
.containsExactlyInAnyOrderElementsOf(Arrays.asList(
187+
new TopicPartition(syncRecord.topic(), syncRecord.partition()),
188+
new TopicPartition(syncRecord.topic(), triggerFailedRecord.partition())
189+
));
185190

186191
verify(mockConsumer, times(1)).commitAsync(any(), any());
187192
verify(mockConsumer, times(1)).commitSync(any());

0 commit comments

Comments
 (0)