Skip to content

Commit 219d7be

Browse files
committed
(chore) update some docs
1 parent 221eaeb commit 219d7be

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@
55
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
66
[![Maven](https://img.shields.io/github/release/leancloud/kafka-java-consumer.svg)](https://github.com/leancloud/kafka-java-consumer/releases)
77

8-
Kafka is a greate lib which is very versatile and flexible, but many things may go wrong if you use it without great care and a good understanding about Kafka internals. Next we will talk about some pitfalls on the consumer side which are easy to encounter with.
8+
Kafka provides a Java Kafka Client to communicate with it. It's a greate lib which is very versatile and flexible, but many things may go wrong if you use it without good care or good understanding about Kafka internals. We will talk about some of the common pitfalls on the consumer side which are easily to encounter with and this lib is used to help you to overcome them peacefully.
99

1010
Usually, after we have subscribed the consumer to some topics, we need a loop to do these things:
1111

1212
* Fetch records from Kafka broker by using `poll` method on `KafkaConsumer`;
1313
* Process the fetched records;
1414
* Commit the offset of these fetched records, so they will not be consumed again;
1515

16-
We need to call `poll` constantly and ensure that the interval between each call should not too long, otherwise after a session timeout or a poll timeout, the broker may think our consumer is not alive and revoke every partitions assigned to the consumer. If we need to do a lot of things with the records we fetched, we may need to set the Kafka consumer configuration "max.poll.interval.ms" to a comparatively large value to give us enough time to process all these records. But it's not trival to set `max.poll.interval.ms` to a large value. The larger the `max.poll.interval.ms` value is, the longer time it's needed for a broker to realize that a consumer is dead when something wrong with the consuemr. In addition to tune the `max.poll.interval.ms` configuration, we can spare the polling thread only to poll records from broker and submit all the fetched records to a thread pool which is taking charge of processing these records. But to do it in this way, we need to pause the partitions of all the fetched records before processing them to prevent the polling threads from polling more records while the previous records are still processing. Of course, we should remember to resume a paused parition after we have processed all records from that partition. Futher more, after a partition reassignment, we should remember which partition we paused before the partition reassignemnt, and pause the paused partition again.
16+
We need to call `poll` constantly and ensure that the interval between each call should not too long, otherwise after a session timeout or a poll timeout, the broker may think our consumer is not alive and revoke every partitions assigned to our consumer. If we need to do a lot of things with the records we fetched, we may need to set the Kafka consumer configuration `max.poll.interval.ms` to a comparatively larger value to give us enough time to process all these records. But it's not trival to set `max.poll.interval.ms` to a large value. The larger the `max.poll.interval.ms` value is, the longer time it's needed for a broker to realize that a consumer is dead when something wrong with the consuemr. In addition to tune the `max.poll.interval.ms` configuration, we can spare the polling thread only to poll records from broker and submit all the fetched records to a thread pool which is taking charge of processing these records. But to do it in this way, we need to pause the partitions of all the fetched records before processing them to prevent the polling threads from polling more records while the previous records are still processing. Of course, we should remember to resume a paused parition after we have processed all records from that partition. Futher more, after a partition reassignment, we should remember which partition we paused before the partition reassignemnt, and pause the paused partition again.
1717

18-
Kafka provides a synchronous and a asynchronous way to commit offset of records. In addition to them, Kafka also provide way to commit for specific partition and offset, and way to commit all the records fetched at once.We should remember to commit all the processed records from a partition before this partition is revoked. We should remember to commit all the processed records before the consuemr shutdown. If we commit offset for a specific record, we should remember to plus one to the offset of that record, which means assuming the record to commit have partition 0 and offset 100, we should commit partition 0 to 101 instead of 100, otherwise that processed records will be fetched again. If a consumer were assigned a parition which have no records for a long time, we should still remember to commit the committed offset of that partition periodically, otherwise after the commit log of that partition was removed from broker, because of retention timeout, broker will not remember where the commit offset of that partition for the consumer was. If the consumer set Kafka consumer configuration `auto.offset.reset` to **earliest**, the cosumer will poll the records it processed and committed after a reboot.
18+
Kafka Client provides a synchronous and a asynchronous way to commit offset of records. In addition to them, Kafka Client also provides a way to commit for specific partition and offset, and a way to commit all the records fetched at once. We should remember to commit all the processed records from a partition before this partition is revoked. We should remember to commit all the processed records before the consuemr shutdown. If we commit offset for a specific record, we should remember to plus one to the offset of that record, such as assuming the record to commit have partition 0 and offset 100, we should commit partition 0 to 101 instead of 100, otherwise that processed records will be fetched again. If a consumer were assigned a parition which have no records for a long time, we should still remember to commit the committed offset of that partition periodically, otherwise after the commit log of that partition was removed from broker, because of retention timeout, broker will not remember where the commit offset of that partition for the consumer was. If the consumer set Kafka configuration `auto.offset.reset` to **earliest**, after a reboot, the cosumer will poll all the records from the partition for which broker forgot where we committed and process all of them over again.
1919

20-
All in all, Kafka is not a tool which can be used without thinking and doing some research. The purpose of this lib is to flatten these pits. With this lib, you can consume records from a subscribed topic and process them more safely and easily.
20+
All in all, Kafka Client is not a tool which can be used directly without good care and doing some research. But with the help of this lib, you can consume records from a subscribed topic and process them with or without a deidicated thread pool more safely and easily. It encapsulates loads of best practices to acheive that goal.
2121

2222
## Usage
2323

@@ -46,7 +46,7 @@ Next, we need to choose the type of consumer to use. We have five kinds of consu
4646

4747
commit policy | description
4848
------ | ------------
49-
automatic commit | Commit offsets of records fetched from kafka automatically in a fixed interval.
49+
automatic commit | Commit offsets of records fetched from broker automatically in a fixed interval.
5050
sync commit | Commit offsets synchronously only when all the fetched records have been processed.
5151
async commit | Commit offsets asynchronously only when all the fetched records have been processed. If there are too many pending async commit requests or the last async commit request was failed, it'll switch to synchronous mode to commit synchronously and switch back when the next synchoronous commit success.
5252
partial sync commit | Whenever there is a processed consumer record, only those records that have already been processed are committed synchronously, leaving the ones that have not been processed yet to be committed.
@@ -63,7 +63,7 @@ final LcKafkaConsumer<Integer, String> consumer = LcKafkaConsumerBuilder
6363
consumer.subscribe(Collections.singletonList("LeanCloud-Topic"));
6464
```
6565

66-
Please note that we passed a `ExecutorService` to build the `LcKafkaConsumer` , all the records consumed from the subscribed topic will be handled by this `ExecutorService` using the input `ConsumerRecordHandler`.
66+
Please note that we passed a `ExecutorService` to build the `LcKafkaConsumer`, all the records consumed from the subscribed topic will be handled by this `ExecutorService` using the input `ConsumerRecordHandler`.
6767

6868
When we are done with this consumer, we need to close it:
6969

src/main/java/cn/leancloud/kafka/consumer/ProcessRecordsProgress.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
import static java.util.Collections.emptyMap;
1010
import static java.util.stream.Collectors.toSet;
1111

12+
/**
13+
* A class to help {@link LcKafkaConsumer} to remember which records it fetched from broker, which records it processed
14+
* and where the offsets the consumer can commit to, etc.
15+
*/
1216
class ProcessRecordsProgress {
1317
private final Map<TopicPartition, Long> topicOffsetHighWaterMark;
1418
private final Map<TopicPartition, CompletedOffsets> completedOffsets;
@@ -19,7 +23,7 @@ public ProcessRecordsProgress() {
1923
}
2024

2125
/**
22-
* Mark an {@link ConsumerRecord} as pending before processing it. So {@link CommitPolicy} can know which and
26+
* Mark an {@link ConsumerRecord} as pending before processing it. So {@link LcKafkaConsumer} can know which and
2327
* how many records we need to process. It is called by {@link Fetcher} when {@code Fetcher} fetched any
2428
* {@link ConsumerRecord}s from Broker.
2529
*
@@ -40,7 +44,7 @@ void markPendingRecord(ConsumerRecord<?, ?> record) {
4044
}
4145

4246
/**
43-
* Mark an {@link ConsumerRecord} as completed after processing it. So {@link CommitPolicy} can know which and
47+
* Mark an {@link ConsumerRecord} as completed after processing it. So {@link LcKafkaConsumer} can know which and
4448
* how many records we have processed. It is called by {@link Fetcher} when {@code Fetcher} make sure that
4549
* a {@code ConsumerRecord} was processed successfully.
4650
*
@@ -54,31 +58,51 @@ void markCompletedRecord(ConsumerRecord<?, ?> record) {
5458
}
5559
}
5660

61+
/**
62+
* Clear all saved progress. Usually it is called when we are sure that all the known records we fetched have been committed.
63+
*/
5764
void clearAll() {
5865
topicOffsetHighWaterMark.clear();
5966
completedOffsets.clear();
6067
}
6168

69+
/**
70+
* Clear progress for some partitions.
71+
*
72+
* @param partitions the partitions to clear progress
73+
*/
6274
void clearFor(Collection<TopicPartition> partitions) {
6375
for (TopicPartition p : partitions) {
6476
topicOffsetHighWaterMark.remove(p);
6577
completedOffsets.remove(p);
6678
}
6779
}
6880

81+
/**
82+
* @return offsets for all the known records which have been processed and are being processed.
83+
*/
6984
@VisibleForTesting
7085
Map<TopicPartition, Long> pendingRecordOffsets() {
7186
return topicOffsetHighWaterMark;
7287
}
7388

89+
/**
90+
* @return true when no any known records have been processed and are being processed
91+
*/
7492
boolean noPendingRecords() {
7593
return topicOffsetHighWaterMark.isEmpty();
7694
}
7795

96+
/**
97+
* @return partitions for all the known records
98+
*/
7899
Set<TopicPartition> allPartitions() {
79100
return new HashSet<>(topicOffsetHighWaterMark.keySet());
80101
}
81102

103+
/**
104+
* @return true when no any known records have been processed
105+
*/
82106
boolean noCompletedRecords() {
83107
return completedOffsets.isEmpty();
84108
}

0 commit comments

Comments
 (0)