Skip to content

Commit 336d49c

Browse files
committed
(chore) readme
1 parent 475cb7d commit 336d49c

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
language: java

README.md

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,74 @@
1+
# Kafka Java Consumer
2+
3+
[![Build Status](https://api.travis-ci.org/leancloud/kafka-java-consumer.svg?branch=master)](https://travis-ci.org/leancloud/kafka-java-consumer)
4+
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
5+
6+
Kafka is a greate lib which is very versatile and flexible, but many things may go wrong if you use it without great care and a good understanding about Kafka internals. Next we will talk about some pitfalls on the consumer side which are easy to encounter.
7+
8+
Usually, after we have subscribed the consumer to some topics, we need a loop to do these things:
9+
10+
* Fetch records from Kafka broker by using `poll` method on `KafkaConsumer`;
11+
* Process the fetched records;
12+
* Commit the offset of these fetched records, so they will not be consumed again;
13+
14+
We need to call `poll` constantly and ensure that the interval between each call should not too long, otherwise after a session timeout or a poll timeout, the broker may think our consumer is not alive and revoke every partitions assigned to the consumer. If we need to do a lot of things with the records we fetched, we may need to set the Kafka consumer configuration "max.poll.interval.ms" to a comparatively large value to give us enough time to process all these records. But it's not trival to set "max.poll.interval.ms" to a large value. The larger the "max.poll.interval.ms" value is, the longer time it's needed for a broker to realize that a consumer is dead when something wrong with the consuemr. In addition to tune the "max.poll.interval.ms" configuration, we can spare the polling thread only to poll records from broker and submit all the fetched records to a thread pool which is taking charge of processing these records. But to do it in this way, we need to pause the partitions of all the fetched records before processing them to prevent the polling threads from polling more records while the previous records are still processing. Of course, we should remember to resume a paused parition after we have processed all records from that partition. Futher more, after a partition reassignment, we should remember which partition we paused before the partition reassignemnt, and pause the paused partition again.
15+
16+
Kafka provides a synchronous and a asynchronous way to commit offset of records. In addition to them, Kafka also provide way to commit for specific partition and offset, and way to commit all the records fetched at once.We should remember to commit all the processed records from a partition before this partition is revoked. We should remember to commit all the processed records before the consuemr shutdown. If we commit offset for a specific record, we should remember to plus one to the offset of that record, which means assuming the record to commit have partition 0 and offset 100, we should commit partition 0 to 101 instead of 100, otherwise that processed records will be fetched again. If a consumer were assigned a parition which have no records for a long time, we should still remember to commit the committed offset of that partition periodically, otherwise after the commit log of that partition was removed from broker, because of retention timeout, broker will not remember where the commit offset of that partition for the consumer was. If the consumer set Kafka consumer configuration "auto.offset.reset" to "earliest", the cosumer will poll the records it processed and committed after a reboot.
17+
18+
All in all, Kafka is not a tool which can be used without thinking and researching. The purpose of this lib is to flatten these pits. With this lib, you can consume records from a subscribed topic and process them more safely and easily.
19+
20+
## Usage
21+
22+
Firstly, we need configurations for Kafka consumer. For example:
23+
24+
```Java
25+
final Map<String, Object> configs = new HashMap<>();
26+
configs.put("bootstrap.servers", "localhost:9092");
27+
configs.put("group.id", "LeanCloud");
28+
configs.put("auto.offset.reset", "earliest");
29+
configs.put("max.poll.records", 10);
30+
configs.put("max.poll.interval.ms", 30_000);
31+
configs.put("key.deserializer", "...");
32+
configs.put("value.deserializer", "...");
33+
```
34+
35+
Then, define how you need to handle a record consumed from Kafka. Here we just log the consumed record:
36+
37+
```java
38+
ConsumerRecordHandler<Integer, String> handler = record -> {
39+
logger.info("I got a record: {}", record);
40+
};
41+
```
42+
43+
Next, we need to choose the type of consumer to use. We have five kinds of consumers and each of them have different committing policy. Here is a simple specification for them:
44+
45+
commit policy | description
46+
------ | ------------
47+
automatic commit | Commit offsets of records consumed from kafka automatically in a fixed interval.
48+
sync commit | Commit offsets synchronously only when all the consumed records has been handled.
49+
async commit | Commit offsets asynchronously when all the consumed records has been handled. If there are too many pending async commit requests or last async commit request failed, it'll switch to synchronous mode and switch back when the next synchoronous commit success.
50+
partial sync commit | Whenever there's any handled consumer records, only commit all the handled records to broker synchronously. Leave those records still handling until they are done.
51+
partial async commit | whenever there's any handled consumer records, commit all the handled records to broker asynchronously. Leave those records still handling until they are done. If there are too many pending async commit requests or last async commit request failed, it'll switch to synchronous mode and switch back when the next synchoronous commit success.
52+
53+
Taking sync-committing consumer as an example, you can create a consumer with a thread pool and subscribe it to a topic like this:
54+
55+
```java
56+
final LcKafkaConsumer<Integer, String> consumer = LcKafkaConsumerBuilder
57+
.newBuilder(configs, handler)
58+
// true means the LcKafkaConsumer should shutdown the input thread pool when it is shutting down
59+
.workerPool(Executors.newCachedThreadPool(), true)
60+
.buildSync();
61+
consumer.subscribe(Collections.singletonList("LeanCloud-Topic"));
62+
```
63+
64+
Please note that we passed a `ExecutorService` to build the `LcKafkaConsumer` , all the records consumed from the subscribed topic will be handled by this `ExecutorService` using the input `ConsumerRecordHandler`.
65+
66+
When we are done with this consumer, we need to close it:
67+
68+
```
69+
consumer.close()
70+
```
71+
72+
For all the APIs and descriptions of all the kinds of consumers, please refer to the Java Doc.
73+
174

2-
a consumer used by leancloud

0 commit comments

Comments
 (0)