Skip to content

Commit aabea80

Browse files
committed
(feat) rename messageHandler to consumer record handler
1 parent a0235a9 commit aabea80

17 files changed

+457
-304
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.util.function.BiConsumer;
8+
9+
/**
10+
* A wrapper over a {@link ConsumerRecordHandler} to catch and swallow all the exceptions throw from the
11+
* wrapped {@code ConsumerRecordHandler} when it failed to handle a consumed record.
12+
* <p>
13+
* This handler seems good to improve the availability of the consumer because it can swallow all the exceptions
14+
* on handling a record and carry on to handle next record. But it actually can compromise
15+
* the consumer to prevent a livelock, where the application did not crash but fails to
16+
* make progress for some reason.
17+
* Please use it judiciously. Usually fail fast, let the polling thread exit on exception, is your best choice.
18+
*
19+
* @param <K> the type of key for records consumed from Kafka
20+
* @param <V> the type of value for records consumed from Kafka
21+
*/
22+
public final class CatchAllExceptionConsumerRecordHandler<K, V> implements ConsumerRecordHandler<K, V> {
23+
private static final Logger logger = LoggerFactory.getLogger(CatchAllExceptionConsumerRecordHandler.class);
24+
25+
private final ConsumerRecordHandler<K, V> wrappedHandler;
26+
private final BiConsumer<ConsumerRecord<K, V>, Throwable> errorConsumer;
27+
28+
/**
29+
* Constructor for {@code CatchAllExceptionConsumerRecordHandler} to just log the failed record when
30+
* the wrapped handler failed on calling {@link ConsumerRecordHandler#handleRecord(ConsumerRecord)}.
31+
*
32+
* @param wrappedHandler the wrapped {@link ConsumerRecordHandler}.
33+
*/
34+
public CatchAllExceptionConsumerRecordHandler(ConsumerRecordHandler<K, V> wrappedHandler) {
35+
this(wrappedHandler, (record, throwable) -> logger.error("Handle kafka consumer record: " + record + " failed.", throwable));
36+
}
37+
38+
/**
39+
* Constructor for {@code CatchAllExceptionConsumerRecordHandler} to use a {@link BiConsumer} to handle the
40+
* failed record when the wrapped handler failed on calling {@link ConsumerRecordHandler#handleRecord(ConsumerRecord)}.
41+
*
42+
* @param wrappedHandler the wrapped {@link ConsumerRecordHandler}.
43+
* @param errorConsumer a {@link BiConsumer} to consume the failed record and the exception thrown from
44+
* the {@link ConsumerRecordHandler#handleRecord(ConsumerRecord)}
45+
*/
46+
public CatchAllExceptionConsumerRecordHandler(ConsumerRecordHandler<K, V> wrappedHandler,
47+
BiConsumer<ConsumerRecord<K, V>, Throwable> errorConsumer) {
48+
this.wrappedHandler = wrappedHandler;
49+
this.errorConsumer = errorConsumer;
50+
}
51+
52+
@Override
53+
public void handleRecord(ConsumerRecord<K, V> record) {
54+
try {
55+
wrappedHandler.handleRecord(record);
56+
} catch (Exception ex) {
57+
errorConsumer.accept(record, ex);
58+
}
59+
}
60+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
5+
/**
6+
* A handler to handle all the consumer records consumed from Kafka broker.
7+
*
8+
* @param <K> the type of key for records consumed from Kafka
9+
* @param <V> the type of value for records consumed from Kafka
10+
*/
11+
public interface ConsumerRecordHandler<K, V> {
12+
/**
13+
* Handle a {@link ConsumerRecord} consumed from Kafka broker.
14+
* <p>
15+
* For the sake of fail fast, if any exception throws from this method, the thread for fetching records
16+
* from Kafka broker will exit with an error message. This will trigger rebalances of partitions
17+
* among all the consumer groups this consumer joined and let other consumer try to handle the failed and the
18+
* following records.
19+
* <p>
20+
* You can consider to use {@link RetriableConsumerRecordHandler} to retry handling process automatically.
21+
* Or use {@link CatchAllExceptionConsumerRecordHandler} as a safety net to handle all the failed records in
22+
* the same way.
23+
*
24+
* @param record the consumed {@link ConsumerRecord}
25+
*/
26+
void handleRecord(ConsumerRecord<K, V> record);
27+
}

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@ final class Fetcher<K, V> implements Runnable, Closeable {
1919

2020
private final long pollTimeout;
2121
private final Consumer<K, V> consumer;
22-
private final MessageHandler<K, V> handler;
22+
private final ConsumerRecordHandler<K, V> handler;
2323
private final ExecutorCompletionService<ConsumerRecord<K, V>> service;
2424
private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures;
2525
private final CommitPolicy<K, V> policy;
2626
private final long gracefulShutdownMillis;
2727
private volatile boolean closed;
2828

2929
Fetcher(LcKafkaConsumerBuilder<K, V> consumerBuilder) {
30-
this(consumerBuilder.getConsumer(), consumerBuilder.getPollTimeout(), consumerBuilder.getMessageHandler(),
30+
this(consumerBuilder.getConsumer(), consumerBuilder.getPollTimeout(), consumerBuilder.getConsumerRecordHandler(),
3131
consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.gracefulShutdownMillis());
3232
}
3333

3434
Fetcher(Consumer<K, V> consumer,
3535
long pollTimeout,
36-
MessageHandler<K, V> handler,
36+
ConsumerRecordHandler<K, V> handler,
3737
ExecutorService workerPool,
3838
CommitPolicy<K, V> policy,
3939
long gracefulShutdownMillis) {
@@ -96,10 +96,10 @@ private boolean closed() {
9696
}
9797

9898
private void dispatchFetchedRecords(ConsumerRecords<K, V> records) {
99-
final MessageHandler<K, V> handler = this.handler;
99+
final ConsumerRecordHandler<K, V> handler = this.handler;
100100
for (ConsumerRecord<K, V> record : records) {
101101
final Future<ConsumerRecord<K, V>> future = service.submit(() -> {
102-
handler.handleMessage(record);
102+
handler.handleRecord(record);
103103
return record;
104104
});
105105
pendingFutures.put(record, future);

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,19 @@
77
import java.util.concurrent.ExecutorService;
88
import java.util.concurrent.TimeUnit;
99

10+
/**
11+
* {@code LcKafkaConsumer} is a wrapper over {@link Consumer}. It will use {@link Consumer} to consume
12+
* records from Kafka broker.
13+
* <p>
14+
* With {@link LcKafkaConsumer}, you can subscribe to several topics and handle all the records from these topic
15+
* in a dedicated thread pool without warring polling timeout or session timeout due to the polling thread failed
16+
* to poll spend too much time on process records
17+
* <p>
18+
* All the public methods in {@code LcKafkaConsumer} is thread safe.
19+
*
20+
* @param <K> the type of key for records consumed from Kafka
21+
* @param <V> the type of value for records consumed from Kafka
22+
*/
1023
public final class LcKafkaConsumer<K, V> implements Closeable {
1124
enum State {
1225
INIT(0),
@@ -42,6 +55,13 @@ int code() {
4255
this.fetcherThread = new Thread(fetcher);
4356
}
4457

58+
/**
59+
* Subscribe some Kafka topics to consume records from them.
60+
*
61+
* @param topics the topics to consume.
62+
* @throws IllegalStateException if this {@code LcKafkaConsumer} has closed or subscribed to some topics
63+
* @throws IllegalArgumentException if the input {@code topics} is empty
64+
*/
4565
public synchronized void subscribe(Collection<String> topics) {
4666
if (topics.isEmpty()) {
4767
throw new IllegalArgumentException("subscribe empty topics");
@@ -95,7 +115,7 @@ boolean closed() {
95115
return state == State.CLOSED;
96116
}
97117

98-
CommitPolicy<K,V> policy() {
118+
CommitPolicy<K, V> policy() {
99119
return policy;
100120
}
101121
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,43 +14,50 @@
1414
import static cn.leancloud.kafka.consumer.BasicConsumerConfigs.ENABLE_AUTO_COMMIT;
1515
import static java.util.Objects.requireNonNull;
1616

17+
/**
18+
* A builder used to create a {@link LcKafkaConsumer} which uses a {@link KafkaConsumer} to consume records
19+
* from Kafka broker.
20+
*
21+
* @param <K> the type of key for records consumed from Kafka
22+
* @param <V> the type of value for records consumed from Kafka
23+
*/
1724
public final class LcKafkaConsumerBuilder<K, V> {
1825
/**
1926
* Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
2027
*
21-
* @param kafkaConfigs the kafka consumer configs. Please refer
28+
* @param kafkaConfigs the kafka configs for {@link KafkaConsumer}. Please refer
2229
* <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >this document</a> for
2330
* valid configurations.
24-
* @param messageHandler a {@link MessageHandler} to handle the consumed msg from kafka
31+
* @param consumerRecordHandler a {@link ConsumerRecordHandler} to handle the consumed record from kafka
2532
* @return a new {@code LcKafkaConsumerBuilder}
2633
*/
2734
public static <K, V> LcKafkaConsumerBuilder<K, V> newBuilder(Map<String, Object> kafkaConfigs,
28-
MessageHandler<K, V> messageHandler) {
35+
ConsumerRecordHandler<K, V> consumerRecordHandler) {
2936
requireNonNull(kafkaConfigs, "kafkaConfigs");
30-
requireNonNull(messageHandler, "messageHandler");
31-
return new LcKafkaConsumerBuilder<>(new HashMap<>(kafkaConfigs), messageHandler);
37+
requireNonNull(consumerRecordHandler, "consumerRecordHandler");
38+
return new LcKafkaConsumerBuilder<>(new HashMap<>(kafkaConfigs), consumerRecordHandler);
3239
}
3340

3441
/**
3542
* Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
3643
*
37-
* @param kafkaConfigs the kafka consumer configs. Please refer
44+
* @param kafkaConfigs the kafka configs for {@link KafkaConsumer}. Please refer
3845
* <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >this document</a> for
3946
* valid configurations.
40-
* @param messageHandler a {@link MessageHandler} to handle the consumed msg from kafka
47+
* @param consumerRecordHandler a {@link ConsumerRecordHandler} to handle the consumed record from kafka
4148
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}
4249
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}
4350
* @return a new {@code LcKafkaConsumerBuilder}
4451
*/
4552
public static <K, V> LcKafkaConsumerBuilder<K, V> newBuilder(Map<String, Object> kafkaConfigs,
46-
MessageHandler<K, V> messageHandler,
53+
ConsumerRecordHandler<K, V> consumerRecordHandler,
4754
Deserializer<K> keyDeserializer,
4855
Deserializer<V> valueDeserializer) {
4956
requireNonNull(kafkaConfigs, "kafkaConfigs");
50-
requireNonNull(messageHandler, "messageHandler");
57+
requireNonNull(consumerRecordHandler, "consumerRecordHandler");
5158
requireNonNull(keyDeserializer, "keyDeserializer");
5259
requireNonNull(valueDeserializer, "valueDeserializer");
53-
return new LcKafkaConsumerBuilder<>(new HashMap<>(kafkaConfigs), messageHandler, keyDeserializer, valueDeserializer);
60+
return new LcKafkaConsumerBuilder<>(new HashMap<>(kafkaConfigs), consumerRecordHandler, keyDeserializer, valueDeserializer);
5461
}
5562

5663
/**
@@ -68,7 +75,7 @@ private static void requireArgument(boolean expression, String template, Object.
6875
private ExecutorService workerPool = ImmediateExecutorService.INSTANCE;
6976
private boolean shutdownWorkerPoolOnStop = false;
7077
private Map<String, Object> configs;
71-
private MessageHandler<K, V> messageHandler;
78+
private ConsumerRecordHandler<K, V> consumerRecordHandler;
7279
@Nullable
7380
private Consumer<K, V> consumer;
7481
@Nullable
@@ -79,18 +86,18 @@ private static void requireArgument(boolean expression, String template, Object.
7986
private CommitPolicy<K, V> policy;
8087

8188
private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
82-
MessageHandler<K, V> messageHandler) {
83-
this(kafkaConsumerConfigs, messageHandler, null, null);
89+
ConsumerRecordHandler<K, V> consumerRecordHandler) {
90+
this(kafkaConsumerConfigs, consumerRecordHandler, null, null);
8491
}
8592

8693
private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
87-
MessageHandler<K, V> messageHandler,
94+
ConsumerRecordHandler<K, V> consumerRecordHandler,
8895
@Nullable
89-
Deserializer<K> keyDeserializer,
96+
Deserializer<K> keyDeserializer,
9097
@Nullable
91-
Deserializer<V> valueDeserializer) {
98+
Deserializer<V> valueDeserializer) {
9299
this.configs = kafkaConsumerConfigs;
93-
this.messageHandler = messageHandler;
100+
this.consumerRecordHandler = consumerRecordHandler;
94101
this.keyDeserializer = keyDeserializer;
95102
this.valueDeserializer = valueDeserializer;
96103
}
@@ -131,13 +138,13 @@ public LcKafkaConsumerBuilder<K, V> pollTimeout(Duration pollTimeout) {
131138
return this;
132139
}
133140

134-
public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeoutMillis(long gracefulShutdownMs) {
141+
public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeoutMillis(long gracefulShutdownMs) {
135142
requireArgument(gracefulShutdownMs >= 0, "gracefulShutdownMillis: %s (expected >= 0)", gracefulShutdownMs);
136143
this.gracefulShutdownMillis = gracefulShutdownMs;
137144
return this;
138145
}
139146

140-
public LcKafkaConsumerBuilder<K,V> gracefulShutdownTimeout(Duration gracefulShutdownTimeout) {
147+
public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShutdownTimeout) {
141148
requireNonNull(gracefulShutdownTimeout, "gracefulShutdownTimeout");
142149
this.gracefulShutdownMillis = gracefulShutdownTimeout.toMillis();
143150
return this;
@@ -158,14 +165,14 @@ public LcKafkaConsumerBuilder<K, V> maxPendingAsyncCommits(int maxPendingAsyncCo
158165
}
159166

160167
/**
161-
* Change the {@link MessageHandler} to handle the consumed msg from kafka.
168+
* Change the {@link ConsumerRecordHandler} to handle the consumed record from kafka.
162169
*
163-
* @param messageHandler the handler to handle consumed msg
170+
* @param consumerRecordHandler the handler to handle consumed record
164171
* @return this
165172
*/
166-
public LcKafkaConsumerBuilder<K, V> messageHandler(MessageHandler<K, V> messageHandler) {
167-
requireNonNull(messageHandler, "messageHandler");
168-
this.messageHandler = messageHandler;
173+
public LcKafkaConsumerBuilder<K, V> messageHandler(ConsumerRecordHandler<K, V> consumerRecordHandler) {
174+
requireNonNull(consumerRecordHandler, "consumerRecordHandler");
175+
this.consumerRecordHandler = consumerRecordHandler;
169176
return this;
170177
}
171178

@@ -207,7 +214,7 @@ public LcKafkaConsumerBuilder<K, V> workerPool(ExecutorService workerPool, boole
207214
* while (true) {
208215
* final ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
209216
* for (ConsumerRecord<K, V> record : records) {
210-
* handler.handleMessage(record.topic(), record.value());
217+
* handler.handleRecord(record.topic(), record.value());
211218
* }
212219
* }
213220
* </pre>
@@ -216,9 +223,9 @@ public LcKafkaConsumerBuilder<K, V> workerPool(ExecutorService workerPool, boole
216223
* Please note that this consumer requires these kafka configs must be set, otherwise
217224
* {@link IllegalArgumentException} will be thrown:
218225
* <ol>
219-
* <li><code>max.poll.interval.ms</code></li>
220-
* <li><code>max.poll.records</code></li>
221-
* <li><code>auto.commit.interval.ms</code></li>
226+
* <li><code>max.poll.interval.ms</code></li>
227+
* <li><code>max.poll.records</code></li>
228+
* <li><code>auto.commit.interval.ms</code></li>
222229
* </ol>
223230
* <p>
224231
* Though all of these configs have default values in kafka, we still require every user to set them specifically.
@@ -266,8 +273,8 @@ Consumer<K, V> getConsumer() {
266273
return consumer;
267274
}
268275

269-
MessageHandler<K, V> getMessageHandler() {
270-
return messageHandler;
276+
ConsumerRecordHandler<K, V> getConsumerRecordHandler() {
277+
return consumerRecordHandler;
271278
}
272279

273280
ExecutorService getWorkerPool() {

src/main/java/cn/leancloud/kafka/consumer/MessageHandler.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)