1616
1717package org .springframework .kafka .listener ;
1818
19+ import java .time .Duration ;
1920import java .util .Map ;
2021import java .util .Objects ;
2122import java .util .function .Supplier ;
2223
24+ import org .apache .kafka .clients .consumer .Consumer ;
2325import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
2426import org .jspecify .annotations .Nullable ;
2527
3436 * @author Francois Rosiere
3537 * @author Antonio Tomac
3638 * @author Wang Zhiyang
39+ * @author Sanghyeok An
3740 * @since 2.0
3841 *
3942 */
@@ -147,6 +150,38 @@ public static void conditionalSleep(Supplier<Boolean> shouldSleepCondition, long
147150 while (System .currentTimeMillis () < timeout );
148151 }
149152
153+ /**
154+ * Sleep for the desired timeout, as long as shouldSleepCondition supplies true.
155+ * This method requires that the consumer is paused; otherwise, ConsumerRecord may be lost.
156+ * Periodically calls {@code Consumer.poll(Duration.ZERO)} to prevent a paused consumer from being rebalanced.
157+ * @param shouldSleepCondition to.
158+ * @param interval the timeout.
159+ * @param consumer the kafka consumer to call poll().
160+ * @throws InterruptedException if the thread is interrupted.
161+ */
162+ public static void conditionalSleepWithPoll (Supplier <Boolean > shouldSleepCondition ,
163+ long interval ,
164+ Consumer <?, ?> consumer ) throws InterruptedException {
165+ boolean isFirst = true ;
166+ long timeout = System .currentTimeMillis () + interval ;
167+ long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL ;
168+ do {
169+ Thread .sleep (sleepInterval );
170+ if (!shouldSleepCondition .get ()) {
171+ break ;
172+ }
173+
174+ if (isFirst ) {
175+ isFirst = false ;
176+ }
177+ else {
178+ // To prevent consumer group rebalancing during retry backoff.
179+ consumer .poll (Duration .ZERO );
180+ }
181+ }
182+ while (System .currentTimeMillis () < timeout );
183+ }
184+
150185 /**
151186 * Create a new {@link OffsetAndMetadata} using the given container and offset.
152187 * @param container a container.
@@ -165,4 +200,3 @@ public static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListene
165200 return new OffsetAndMetadata (offset );
166201 }
167202}
168-
0 commit comments