Skip to content

Commit 3ec88f3

Browse files
committed
(chore) more docs
1 parent aabea80 commit 3ec88f3

File tree

4 files changed

+163
-75
lines changed

4 files changed

+163
-75
lines changed

src/main/java/cn/leancloud/kafka/consumer/ImmediateExecutorService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import java.util.concurrent.TimeUnit;
77

88
/**
9-
* This is an shared {@link java.util.concurrent.ExecutorService} so it can not be shutdown
9+
* A {@link java.util.concurrent.ExecutorService} which run all the tasks in the thread which submit the task.
10+
* <p>
11+
* This is an shared {@link java.util.concurrent.ExecutorService} so it can not be shutdown.
1012
*/
1113
final class ImmediateExecutorService extends AbstractExecutorService {
1214
static final ImmediateExecutorService INSTANCE = new ImmediateExecutorService();

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
* {@code LcKafkaConsumer} is a wrapper over {@link Consumer}. It will use {@link Consumer} to consume
1212
* records from Kafka broker.
1313
* <p>
14-
* With {@link LcKafkaConsumer}, you can subscribe to several topics and handle all the records from these topic
15-
* in a dedicated thread pool without warring polling timeout or session timeout due to the polling thread failed
16-
* to poll spend too much time on process records
14+
* With {@link LcKafkaConsumer}, you can subscribe to several topics and handle all the records from these topics
15+
* in a dedicated thread pool without warring polling timeout or session timeout due to the polling thread spend
16+
* too much time on process records and failed to poll broker at least once within {@code max.poll.interval.ms}.
1717
* <p>
1818
* All the public methods in {@code LcKafkaConsumer} is thread safe.
1919
*

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 156 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public final class LcKafkaConsumerBuilder<K, V> {
2525
/**
2626
* Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
2727
*
28-
* @param kafkaConfigs the kafka configs for {@link KafkaConsumer}. Please refer
29-
* <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >this document</a> for
30-
* valid configurations.
28+
* @param kafkaConfigs the kafka configs for {@link KafkaConsumer}. Please refer
29+
* <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >this document</a> for
30+
* valid configurations.
3131
* @param consumerRecordHandler a {@link ConsumerRecordHandler} to handle the consumed record from kafka
3232
* @return a new {@code LcKafkaConsumerBuilder}
3333
*/
@@ -41,12 +41,12 @@ public static <K, V> LcKafkaConsumerBuilder<K, V> newBuilder(Map<String, Object>
4141
/**
4242
* Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
4343
*
44-
* @param kafkaConfigs the kafka configs for {@link KafkaConsumer}. Please refer
45-
* <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >this document</a> for
46-
* valid configurations.
47-
* @param consumerRecordHandler a {@link ConsumerRecordHandler} to handle the consumed record from kafka
48-
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}
49-
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}
44+
* @param kafkaConfigs the kafka configs for {@link KafkaConsumer}. Please refer
45+
* <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >this document</a> for
46+
* valid configurations.
47+
* @param consumerRecordHandler a {@link ConsumerRecordHandler} to handle the consumed record from kafka
48+
* @param keyDeserializer the deserializer for key that implements {@link Deserializer}
49+
* @param valueDeserializer the deserializer for value that implements {@link Deserializer}
5050
* @return a new {@code LcKafkaConsumerBuilder}
5151
*/
5252
public static <K, V> LcKafkaConsumerBuilder<K, V> newBuilder(Map<String, Object> kafkaConfigs,
@@ -127,7 +127,6 @@ public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMs) {
127127
* If 0, poll operation will return immediately with any records that are available currently in the buffer,
128128
* else returns empty.
129129
* <p>
130-
* Must not be negative.
131130
*
132131
* @param pollTimeout the poll timeout duration
133132
* @return this
@@ -138,12 +137,27 @@ public LcKafkaConsumerBuilder<K, V> pollTimeout(Duration pollTimeout) {
138137
return this;
139138
}
140139

141-
public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeoutMillis(long gracefulShutdownMs) {
142-
requireArgument(gracefulShutdownMs >= 0, "gracefulShutdownMillis: %s (expected >= 0)", gracefulShutdownMs);
143-
this.gracefulShutdownMillis = gracefulShutdownMs;
140+
/**
141+
* Sets the amount of time to wait after calling {@link LcKafkaConsumer#close()} for
142+
* consumed records to handle before actually shutting down.
143+
*
144+
* @param gracefulShutdownTimeoutMillis the graceful shutdown timeout in milliseconds
145+
* @return this
146+
*/
147+
public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
148+
requireArgument(gracefulShutdownTimeoutMillis >= 0,
149+
"gracefulShutdownTimeoutMillis: %s (expected >= 0)", gracefulShutdownTimeoutMillis);
150+
this.gracefulShutdownMillis = gracefulShutdownTimeoutMillis;
144151
return this;
145152
}
146153

154+
/**
155+
* Sets the amount of time to wait after calling {@link LcKafkaConsumer#close()} for
156+
* consumed records to handle before actually shutting down.
157+
*
158+
* @param gracefulShutdownTimeout the graceful shutdown timeout duration
159+
* @return this
160+
*/
147161
public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShutdownTimeout) {
148162
requireNonNull(gracefulShutdownTimeout, "gracefulShutdownTimeout");
149163
this.gracefulShutdownMillis = gracefulShutdownTimeout.toMillis();
@@ -152,7 +166,8 @@ public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShu
152166

153167
/**
154168
* When using async consumer to commit offset asynchronously, this argument can force consumer to do a synchronous
155-
* commit after there's already {@code maxPendingAsyncCommits} async commits on the fly without response from broker.
169+
* commit after there's already this ({@code maxPendingAsyncCommits}) many async commits on the fly without
170+
* response from broker.
156171
*
157172
* @param maxPendingAsyncCommits do a synchronous commit when pending async commits beyond this limit
158173
* @return this
@@ -164,38 +179,45 @@ public LcKafkaConsumerBuilder<K, V> maxPendingAsyncCommits(int maxPendingAsyncCo
164179
return this;
165180
}
166181

167-
/**
168-
* Change the {@link ConsumerRecordHandler} to handle the consumed record from kafka.
169-
*
170-
* @param consumerRecordHandler the handler to handle consumed record
171-
* @return this
172-
*/
173-
public LcKafkaConsumerBuilder<K, V> messageHandler(ConsumerRecordHandler<K, V> consumerRecordHandler) {
174-
requireNonNull(consumerRecordHandler, "consumerRecordHandler");
175-
this.consumerRecordHandler = consumerRecordHandler;
176-
return this;
177-
}
178-
179182
/**
180183
* Internal testing usage only.
181-
* Passing a {@link Consumer} as as the underlying kafka consumer. Usually this would be a {@link MockConsumer}.
184+
* <p>
185+
* Passing a {@link Consumer} as the underlying {@link Consumer}. Usually this would be a {@link MockConsumer}.
182186
*
187+
* @param mockedConsumer the injected consumer
183188
* @return this
184189
*/
185-
LcKafkaConsumerBuilder<K, V> mockKafkaConsumer(Consumer<K, V> consumer) {
186-
requireNonNull(consumer, "cn/leancloud/kafka/consumer");
187-
if (consumer instanceof KafkaConsumer) {
190+
LcKafkaConsumerBuilder<K, V> mockKafkaConsumer(Consumer<K, V> mockedConsumer) {
191+
requireNonNull(mockedConsumer, "consumer");
192+
if (mockedConsumer instanceof KafkaConsumer) {
188193
throw new IllegalArgumentException("need a mocked Consumer");
189194
}
190-
this.consumer = consumer;
195+
this.consumer = mockedConsumer;
191196
return this;
192197
}
193198

194199
/**
195-
* The thread pool used by consumer to handle the consumed messages from kafka. Please note that if you are
196-
* using auto commit consumer, this thread pool is not be used.
200+
* The thread pool used by consumer to handle the consumed records from Kafka broker. If no worker pool is provided,
201+
* the created {@link LcKafkaConsumer} will use {@link ImmediateExecutorService} to handle records in
202+
* the records polling thread instead.
203+
* <p>
204+
* When a worker pool is provided, after each poll, the polling thread will take one thread from this worker pool
205+
* for each polled {@link org.apache.kafka.clients.consumer.ConsumerRecord} to handle the record. Please tune
206+
* the <code>max.poll.records</code> in kafka configs to limit the number of records polled at each time do not
207+
* exceed the max size of the provided worker thread pool. Otherwise, a
208+
* {@link java.util.concurrent.RejectedExecutionException} will thrown when the polling thread submitting too much
209+
* tasks to the pool. Then this exception will lead the only polling thread to exit.
210+
* <p>
211+
* If you are using partial sync/async commit consumer by building {@link LcKafkaConsumer} with
212+
* {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}, without
213+
* a worker pool, they degrade to sync/async commit consumer as built with {@link LcKafkaConsumerBuilder#buildSync()}
214+
* or {@link LcKafkaConsumerBuilder#buildAsync()}.
215+
* <p>
216+
* If no worker pool provided, you also need to tune {@code max.poll.interval.ms} in kafka configs, to ensure the
217+
* polling thread can at least poll once within {@code max.poll.interval.ms} during handling consumed messages
218+
* to prevent itself from session timeout or polling timeout.
197219
*
198-
* @param workerPool a thread pool to handle consumed messages
220+
* @param workerPool a thread pool to handle consumed records
199221
* @param shutdownOnStop true to shutdown the input worker pool when this consumer closed
200222
* @return this
201223
*/
@@ -207,33 +229,23 @@ public LcKafkaConsumerBuilder<K, V> workerPool(ExecutorService workerPool, boole
207229
}
208230

209231
/**
210-
* Build a consumer which commit offset automatically in fixed interval. This consumer will not using other thread
211-
* pool to handle consumed messages and will not pause any partition when after polling. This consumer is
212-
* equivalent to:
213-
* <pre>
214-
* while (true) {
215-
* final ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
216-
* for (ConsumerRecord<K, V> record : records) {
217-
* handler.handleRecord(record.topic(), record.value());
218-
* }
219-
* }
220-
* </pre>
221-
*
232+
* Build a consumer which commits offset automatically at fixed interval. It is both OK for with or without a
233+
* worker thread pool. But without a worker pool, please tune the {@code max.poll.interval.ms} in
234+
* Kafka configs as mentioned in {@link LcKafkaConsumerBuilder#workerPool(ExecutorService, boolean)}.
222235
* <p>
223-
* Please note that this consumer requires these kafka configs must be set, otherwise
236+
* This kind of consumer requires the following kafka configs must be set, otherwise
224237
* {@link IllegalArgumentException} will be thrown:
225238
* <ol>
226-
* <li><code>max.poll.interval.ms</code></li>
227-
* <li><code>max.poll.records</code></li>
228-
* <li><code>auto.commit.interval.ms</code></li>
239+
* <li><code>max.poll.interval.ms</code></li>
240+
* <li><code>max.poll.records</code></li>
241+
* <li><code>auto.offset.reset</code></li>
242+
* <li><code>auto.commit.interval.ms</code></li>
229243
* </ol>
230244
* <p>
231245
* Though all of these configs have default values in kafka, we still require every user to set them specifically.
232-
* Because these configs is vital for using this consumer safely. You should tune them to ensure the polling thread
233-
* in this consumer can at least poll once within {@code max.poll.interval.ms} during handling consumed messages
234-
* to prevent itself from session timeout.
246+
* Because these configs is vital for using this consumer safely.
235247
* <p>
236-
* Note that if you set {@code enable.auto.commit} to false, this consumer will set it to true by itself.
248+
* If you set {@code enable.auto.commit} to false, this consumer will set it to true by itself.
237249
*
238250
* @return this
239251
*/
@@ -244,24 +256,114 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAuto() {
244256
return doBuild();
245257
}
246258

259+
/**
260+
* Build a consumer in which the polling thread always does a sync commit after all the polled records has been handled.
261+
* Because it only commits after all the polled records handled, so the longer the records handling process,
262+
* the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record
263+
* when the consumer crash.
264+
* <p>
265+
* This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
266+
* consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
267+
* otherwise an {@link IllegalArgumentException} will be thrown:
268+
* <ol>
269+
* <li><code>max.poll.records</code></li>
270+
* <li><code>auto.offset.reset</code></li>
271+
* </ol>
272+
* <p>
273+
* Though all of these configs have default values in kafka, we still require every user to set them specifically.
274+
* Because these configs is vital for using this consumer safely.
275+
* <p>
276+
* If you set {@code enable.auto.commit} to true, this consumer will set it to false by itself.
277+
*
278+
* @return this
279+
*/
247280
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
248281
consumer = buildConsumer(false);
249282
policy = new SyncCommitPolicy<>(consumer);
250283
return doBuild();
251284
}
252285

286+
/**
287+
* Build a consumer in which the polling thread does a sync commits whenever there's any handled consumer records. It
288+
* commits often, so after a consumer crash, comparatively little records may be handled more than once. But also
289+
* due to commit often, the overhead causing by committing is relatively high.
290+
* <p>
291+
* This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
292+
* consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
293+
* otherwise an {@link IllegalArgumentException} will be thrown:
294+
* <ol>
295+
* <li><code>max.poll.records</code></li>
296+
* <li><code>auto.offset.reset</code></li>
297+
* </ol>
298+
* <p>
299+
* Though all of these configs have default values in kafka, we still require every user to set them specifically.
300+
* Because these configs is vital for using this consumer safely.
301+
* <p>
302+
* If you set {@code enable.auto.commit} to true, this consumer will set it to false by itself.
303+
*
304+
* @return this
305+
*/
253306
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
254307
consumer = buildConsumer(false);
255308
policy = new PartialSyncCommitPolicy<>(consumer);
256309
return doBuild();
257310
}
258311

312+
/**
313+
* Build a consumer in which the polling thread always does a async commit after all the polled records has been handled.
314+
* Because it only commits after all the polled records handled, so the longer the records handling process,
315+
* the longer the interval between each commits, the bigger of the possibility to repeatedly consume a same record
316+
* when the consumer crash.
317+
* <p>
318+
* If any async commit is failed or the number of pending async commits is beyond the limit set by
319+
* {@link LcKafkaConsumerBuilder#maxPendingAsyncCommits(int)}, this consumer will do a sync commit to commit all the
320+
* records which have been handled.
321+
* <p>
322+
* This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
323+
* consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
324+
* otherwise an {@link IllegalArgumentException} will be thrown:
325+
* <ol>
326+
* <li><code>max.poll.records</code></li>
327+
* <li><code>auto.offset.reset</code></li>
328+
* </ol>
329+
* <p>
330+
* Though all of these configs have default values in kafka, we still require every user to set them specifically.
331+
* Because these configs is vital for using this consumer safely.
332+
* <p>
333+
* If you set {@code enable.auto.commit} to true, this consumer will set it to false by itself.
334+
*
335+
* @return this
336+
*/
259337
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
260338
consumer = buildConsumer(false);
261339
policy = new AsyncCommitPolicy<>(consumer, maxPendingAsyncCommits);
262340
return doBuild();
263341
}
264342

343+
/**
344+
* Build a consumer in which the polling thread does a async commits whenever there's any handled consumer records. It
345+
* commits often, so after a consumer crash, comparatively little records may be handled more than once. It use
346+
* async commit to mitigate the overhead causing by high committing times.
347+
* <p>
348+
* If any async commit is failed or the number of pending async commits is beyond the limit set by
349+
* {@link LcKafkaConsumerBuilder#maxPendingAsyncCommits(int)}, this consumer will do a sync commit to commit all the
350+
* records which have been handled.
351+
* <p>
352+
* This kind of consumer ensures to do a sync commit to commit all the finished records at that time when the
353+
* consumer is shutdown or any partition was revoked. It requires the following kafka configs must be set,
354+
* otherwise an {@link IllegalArgumentException} will be thrown:
355+
* <ol>
356+
* <li><code>max.poll.records</code></li>
357+
* <li><code>auto.offset.reset</code></li>
358+
* </ol>
359+
* <p>
360+
* Though all of these configs have default values in kafka, we still require every user to set them specifically.
361+
* Because these configs is vital for using this consumer safely.
362+
* <p>
363+
* If you set {@code enable.auto.commit} to true, this consumer will set it to false by itself.
364+
*
365+
* @return this
366+
*/
265367
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialAsync() {
266368
consumer = buildConsumer(false);
267369
policy = new PartialAsyncCommitPolicy<>(consumer, maxPendingAsyncCommits);

src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,6 @@ public void testNullKafkaConfigs() {
5757
.hasMessage("kafkaConfigs");
5858
}
5959

60-
@Test
61-
public void testNullMessageHandler() {
62-
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, null))
63-
.isInstanceOf(NullPointerException.class)
64-
.hasMessage("consumerRecordHandler");
65-
66-
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, null, keyDeserializer, valueDeserializer))
67-
.isInstanceOf(NullPointerException.class)
68-
.hasMessage("consumerRecordHandler");
69-
70-
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
71-
.messageHandler(null))
72-
.isInstanceOf(NullPointerException.class)
73-
.hasMessage("consumerRecordHandler");
74-
}
75-
7660
@Test
7761
public void testNullDeserializers() {
7862
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, null, valueDeserializer))
@@ -104,7 +88,7 @@ public void testNegativeShutdownTimeout() {
10488
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
10589
.gracefulShutdownTimeoutMillis(-1 * ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE)))
10690
.isInstanceOf(IllegalArgumentException.class)
107-
.hasMessageContaining("gracefulShutdownMillis");
91+
.hasMessageContaining("gracefulShutdownTimeoutMillis");
10892
}
10993

11094
@Test

0 commit comments

Comments
 (0)