44import org .apache .kafka .clients .consumer .KafkaConsumer ;
55import org .apache .kafka .clients .consumer .MockConsumer ;
66import org .apache .kafka .common .serialization .Deserializer ;
7+ import org .slf4j .Logger ;
8+ import org .slf4j .LoggerFactory ;
79
810import javax .annotation .Nullable ;
911import java .time .Duration ;
2224 * @param <V> the type of value for records consumed from Kafka
2325 */
2426public final class LcKafkaConsumerBuilder <K , V > {
27+ private static final Logger logger = LoggerFactory .getLogger (LcKafkaConsumerBuilder .class );
28+
2529 /**
2630 * Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
2731 *
@@ -84,6 +88,8 @@ private static void requireArgument(boolean expression, String template, Object.
8488 private Deserializer <V > valueDeserializer ;
8589 @ Nullable
8690 private CommitPolicy <K , V > policy ;
91+ @ Nullable
92+ private Duration forceWholeCommitInterval ;
8793
8894 private LcKafkaConsumerBuilder (Map <String , Object > kafkaConsumerConfigs ,
8995 ConsumerRecordHandler <K , V > consumerRecordHandler ) {
@@ -109,14 +115,14 @@ private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
109115 * If 0, poll operation will return immediately with any records that are available currently in the buffer,
110116 * else returns empty.
111117 * <p>
112- * Must not be negative.
118+ * Must not be negative. And the default {@code pollTimeoutMillis} is 100.
113119 *
114- * @param pollTimeoutMs the poll timeout in milliseconds
120+ * @param pollTimeoutMillis the poll timeout in milliseconds
115121 * @return this
116122 */
117- public LcKafkaConsumerBuilder <K , V > pollTimeoutMillis (long pollTimeoutMs ) {
118- requireArgument (pollTimeoutMs >= 0 , "pollTimeoutMillis: %s (expect >= 0)" , pollTimeoutMs );
119- this .pollTimeout = pollTimeoutMs ;
123+ public LcKafkaConsumerBuilder <K , V > pollTimeoutMillis (long pollTimeoutMillis ) {
124+ requireArgument (pollTimeoutMillis >= 0 , "pollTimeoutMillis: %s (expect >= 0)" , pollTimeoutMillis );
125+ this .pollTimeout = pollTimeoutMillis ;
120126 return this ;
121127 }
122128
@@ -127,6 +133,7 @@ public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMs) {
127133 * If 0, poll operation will return immediately with any records that are available currently in the buffer,
128134 * else returns empty.
129135 * <p>
136+ * The default {@code pollTimeout} is 100 millis seconds.
130137 *
131138 * @param pollTimeout the poll timeout duration
132139 * @return this
@@ -140,6 +147,8 @@ public LcKafkaConsumerBuilder<K, V> pollTimeout(Duration pollTimeout) {
140147 /**
141148 * Sets the amount of time to wait after calling {@link LcKafkaConsumer#close()} for
142149 * consumed records to handle before actually shutting down.
150+ * <p>
151+ * The default {@code gracefulShutdownTimeoutMillis} is 10_000.
143152 *
144153 * @param gracefulShutdownTimeoutMillis the graceful shutdown timeout in milliseconds
145154 * @return this
@@ -154,6 +163,8 @@ public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeoutMillis(long gracefulS
154163 /**
155164 * Sets the amount of time to wait after calling {@link LcKafkaConsumer#close()} for
156165 * consumed records to handle before actually shutting down.
166+ * <p>
167+ * The default {@code gracefulShutdownTimeout} is 10 seconds.
157168 *
158169 * @param gracefulShutdownTimeout the graceful shutdown timeout duration
159170 * @return this
@@ -168,6 +179,8 @@ public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShu
168179 * When using async consumer to commit offset asynchronously, this argument can force consumer to do a synchronous
169180 * commit after there's already this ({@code maxPendingAsyncCommits}) many async commits on the fly without
170181 * response from broker.
182+ * <p>
183+ * The default {@code maxPendingAsyncCommits} is 10.
171184 *
172185 * @param maxPendingAsyncCommits do a synchronous commit when pending async commits beyond this limit
173186 * @return this
@@ -179,6 +192,56 @@ public LcKafkaConsumerBuilder<K, V> maxPendingAsyncCommits(int maxPendingAsyncCo
179192 return this ;
180193 }
181194
195+ /**
196+ * The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer.
197+ * <p>
198+ * This configuration is only valid and is required on partial commit consumer build with
199+ * {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}.
200+ * For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from
201+ * that partition and all these consumed records was handled successfully. But we must periodically commit those
202+ * subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout,
203+ * Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the
204+ * consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may
205+ * consume a already consumed record again. So please make sure that {@code forceWholeCommitIntervalInMillis}
206+ * is within log retention time set on Kafka broker.
207+ * <p>
208+ * The default {@code forceWholeCommitInterval} is 1 hour.
209+ *
210+ * @param forceWholeCommitIntervalInMillis the interval in millis seconds to do a whole commit
211+ * @return this
212+ */
213+ public LcKafkaConsumerBuilder <K , V > forceWholeCommitIntervalInMillis (long forceWholeCommitIntervalInMillis ) {
214+ requireArgument (forceWholeCommitIntervalInMillis > 0 ,
215+ "forceWholeCommitIntervalInMillis: %s (expected > 0)" , forceWholeCommitIntervalInMillis );
216+
217+ this .forceWholeCommitInterval = Duration .ofMillis (forceWholeCommitIntervalInMillis );
218+ return this ;
219+ }
220+
221+ /**
222+ * The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer.
223+ * <p>
224+ * This configuration is only valid on partial commit consumer build with
225+ * {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}.
226+ * For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from
227+ * that partition and all these consumed records was handled successfully. But we must periodically commit those
228+ * subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout,
229+ * Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the
230+ * consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may
231+ * consume a already consumed record again. So please make sure that {@code forceWholeCommitInterval}
232+ * is within log retention time set on Kafka broker.
233+ * <p>
234+ * The default {@code forceWholeCommitInterval} is 1 hour.
235+ *
236+ * @param forceWholeCommitInterval the interval to do a whole commit
237+ * @return this
238+ */
239+ public LcKafkaConsumerBuilder <K , V > forceWholeCommitInterval (Duration forceWholeCommitInterval ) {
240+ requireNonNull (forceWholeCommitInterval , "forceWholeCommitInterval" );
241+ this .forceWholeCommitInterval = forceWholeCommitInterval ;
242+ return this ;
243+ }
244+
182245 /**
183246 * Internal testing usage only.
184247 * <p>
@@ -304,8 +367,15 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
304367 * @return this
305368 */
306369 public <K1 extends K , V1 extends V > LcKafkaConsumer <K1 , V1 > buildPartialSync () {
370+ if (forceWholeCommitInterval == null ) {
371+ logger .warn ("Force whole commit interval is not set for a partial commit consumer, the default " +
372+ "interval of 1 hour will be used." );
373+ forceWholeCommitInterval = Duration .ofHours (1 );
374+ }
375+ assert forceWholeCommitInterval != null ;
376+
307377 consumer = buildConsumer (false );
308- policy = new PartialSyncCommitPolicy <>(consumer );
378+ policy = new PartialSyncCommitPolicy <>(consumer , forceWholeCommitInterval );
309379 return doBuild ();
310380 }
311381
@@ -365,8 +435,15 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
365435 * @return this
366436 */
367437 public <K1 extends K , V1 extends V > LcKafkaConsumer <K1 , V1 > buildPartialAsync () {
438+ if (forceWholeCommitInterval == null ) {
439+ logger .warn ("Force whole commit interval is not set for a partial commit consumer, the default " +
440+ "interval of 30 seconds will be used." );
441+ forceWholeCommitInterval = Duration .ofSeconds (30 );
442+ }
443+ assert forceWholeCommitInterval != null ;
444+
368445 consumer = buildConsumer (false );
369- policy = new PartialAsyncCommitPolicy <>(consumer , maxPendingAsyncCommits );
446+ policy = new PartialAsyncCommitPolicy <>(consumer , forceWholeCommitInterval , maxPendingAsyncCommits );
370447 return doBuild ();
371448 }
372449
0 commit comments