2525import org .apache .kafka .clients .consumer .Consumer ;
2626import org .apache .kafka .common .TopicPartition ;
2727
28- import org .springframework .beans .factory .annotation .Qualifier ;
2928import org .springframework .context .ApplicationListener ;
3029import org .springframework .core .log .LogAccessor ;
31- import org .springframework .core .task .TaskExecutor ;
3230import org .springframework .kafka .event .ListenerContainerPartitionIdleEvent ;
3331import org .springframework .lang .Nullable ;
34- import org .springframework .retry .backoff .Sleeper ;
3532
3633/**
3734 *
4239 * so the Manager can resume the partition consumption.
4340 *
4441 * Note that when a record backs off the partition consumption gets paused for
45- * approximately that amount of time, so you must have a fixed backoff value per partition
46- * in order to make sure no record waits more than it should.
42+ * approximately that amount of time, so you must have a fixed backoff value per partition.
4743 *
4844 * @author Tomaz Fernandes
4945 * @author Gary Russell
5349public class KafkaConsumerBackoffManager implements ApplicationListener <ListenerContainerPartitionIdleEvent > {
5450
5551 private static final LogAccessor LOGGER = new LogAccessor (LogFactory .getLog (KafkaConsumerBackoffManager .class ));
56- /**
57- * Internal Back Off Clock Bean Name.
58- */
59- public static final String INTERNAL_BACKOFF_CLOCK_BEAN_NAME = "internalBackOffClock" ;
60-
61- private static final int TIMING_CORRECTION_THRESHOLD = 100 ;
6252
63- private static final int POLL_TIMEOUTS_FOR_CORRECTION_WINDOW = 2 ;
64-
65- private final ListenerContainerRegistry registry ;
53+ private final ListenerContainerRegistry listenerContainerRegistry ;
6654
6755 private final Map <TopicPartition , Context > backOffContexts ;
6856
6957 private final Clock clock ;
7058
71- private final TaskExecutor taskExecutor ;
59+ private final KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster ;
60+
61+ /**
62+ * Constructs an instance with the provided {@link ListenerContainerRegistry} and
63+ * {@link KafkaConsumerTimingAdjuster}.
64+ *
65+ * The ListenerContainerRegistry is used to fetch the {@link MessageListenerContainer}
66+ * that will be backed off / resumed.
67+ *
68+ * The KafkaConsumerTimingAdjuster is used to make timing adjustments
69+ * in the message consumption so that it processes the message closer
70+ * to its due time rather than later.
71+ *
72+ * @param listenerContainerRegistry the listenerContainerRegistry to use.
73+ * @param kafkaConsumerTimingAdjuster the kafkaConsumerTimingAdjuster to use.
74+ */
75+ public KafkaConsumerBackoffManager (ListenerContainerRegistry listenerContainerRegistry ,
76+ KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster ) {
77+
78+ this .listenerContainerRegistry = listenerContainerRegistry ;
79+ this .kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster ;
80+ this .clock = Clock .systemUTC ();
81+ this .backOffContexts = new HashMap <>();
82+ }
83+
84+ /**
85+ * Constructs an instance with the provided {@link ListenerContainerRegistry}
86+ * and with no timing adjustment capabilities.
87+ *
88+ * The ListenerContainerRegistry is used to fetch the {@link MessageListenerContainer}
89+ * that will be backed off / resumed.
90+ *
91+ * @param listenerContainerRegistry the listenerContainerRegistry to use.
92+ */
93+ public KafkaConsumerBackoffManager (ListenerContainerRegistry listenerContainerRegistry ) {
94+
95+ this .listenerContainerRegistry = listenerContainerRegistry ;
96+ this .kafkaConsumerTimingAdjuster = null ;
97+ this .clock = Clock .systemUTC ();
98+ this .backOffContexts = new HashMap <>();
99+ }
72100
73- private final Sleeper sleeper ;
101+ /**
102+ * Creates an instance with the provided {@link ListenerContainerRegistry},
103+ * {@link KafkaConsumerTimingAdjuster} and {@link Clock}.
104+ *
105+ * @param listenerContainerRegistry the listenerContainerRegistry to use.
106+ * @param kafkaConsumerTimingAdjuster the kafkaConsumerTimingAdjuster to use.
107+ * @param clock the clock to use.
108+ */
109+ public KafkaConsumerBackoffManager (ListenerContainerRegistry listenerContainerRegistry ,
110+ KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster ,
111+ Clock clock ) {
112+
113+ this .listenerContainerRegistry = listenerContainerRegistry ;
114+ this .clock = clock ;
115+ this .kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster ;
116+ this .backOffContexts = new HashMap <>();
117+ }
74118
75- public KafkaConsumerBackoffManager (ListenerContainerRegistry registry ,
76- @ Qualifier (INTERNAL_BACKOFF_CLOCK_BEAN_NAME ) Clock clock ,
77- TaskExecutor taskExecutor ,
78- Sleeper sleeper ) {
119+ /**
120+ * Creates an instance with the provided {@link ListenerContainerRegistry}
121+ * and {@link Clock}, with no timing adjustment capabilities.
122+ *
123+ * @param listenerContainerRegistry the listenerContainerRegistry to use.
124+ * @param clock the clock to use.
125+ */
126+ public KafkaConsumerBackoffManager (ListenerContainerRegistry listenerContainerRegistry , Clock clock ) {
79127
80- this .registry = registry ;
128+ this .listenerContainerRegistry = listenerContainerRegistry ;
81129 this .clock = clock ;
82- this .taskExecutor = taskExecutor ;
83- this .sleeper = sleeper ;
130+ this .kafkaConsumerTimingAdjuster = null ;
84131 this .backOffContexts = new HashMap <>();
85132 }
86133
@@ -112,26 +159,27 @@ public void onApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdle
112159 getCurrentMillisFromClock (), partitionIdleEvent .getTopicPartition ()));
113160
114161 Context backOffContext = getBackOffContext (partitionIdleEvent .getTopicPartition ());
115-
116- if (backOffContext == null ) {
117- return ;
118- }
119162 maybeResumeConsumption (backOffContext );
120163 }
121164
122165 private long getCurrentMillisFromClock () {
123166 return Instant .now (this .clock ).toEpochMilli ();
124167 }
125168
126- private void maybeResumeConsumption (Context context ) {
169+ private void maybeResumeConsumption (@ Nullable Context context ) {
170+ if (context == null ) {
171+ return ;
172+ }
127173 long now = getCurrentMillisFromClock ();
128174 long timeUntilDue = context .dueTimestamp - now ;
129175 long pollTimeout = getListenerContainerFromContext (context )
130176 .getContainerProperties ()
131177 .getPollTimeout ();
132178 boolean isDue = timeUntilDue <= pollTimeout ;
133179
134- if (maybeApplyTimingCorrection (context , pollTimeout , timeUntilDue ) || isDue ) {
180+ long adjustedAmount = applyTimingAdjustment (context , timeUntilDue , pollTimeout );
181+
182+ if (adjustedAmount != 0L || isDue ) {
135183 resumePartition (context );
136184 }
137185 else {
@@ -140,52 +188,25 @@ private void maybeResumeConsumption(Context context) {
140188 }
141189 }
142190
191+ private long applyTimingAdjustment (Context context , long timeUntilDue , long pollTimeout ) {
192+ if (this .kafkaConsumerTimingAdjuster == null || context .consumerForTimingAdjustment == null ) {
193+ LOGGER .debug (() -> String .format (
194+ "Skipping timing adjustment for TopicPartition %s." , context .topicPartition ));
195+ return 0L ;
196+ }
197+ return this .kafkaConsumerTimingAdjuster .adjustTiming (
198+ context .consumerForTimingAdjustment , context .topicPartition , pollTimeout , timeUntilDue );
199+ }
200+
143201 private void resumePartition (Context context ) {
144202 MessageListenerContainer container = getListenerContainerFromContext (context );
145203 LOGGER .debug (() -> "Resuming partition at " + getCurrentMillisFromClock ());
146204 container .resumePartition (context .topicPartition );
147205 removeBackoff (context .topicPartition );
148206 }
149207
150- private boolean maybeApplyTimingCorrection (Context context , long pollTimeout , long timeUntilDue ) {
151- // Correction can only be applied to ConsumerAwareMessageListener
152- // listener instances.
153- if (context .consumerForTimingCorrection == null ) {
154- return false ;
155- }
156-
157- boolean isInCorrectionWindow = timeUntilDue > pollTimeout && timeUntilDue <=
158- pollTimeout * POLL_TIMEOUTS_FOR_CORRECTION_WINDOW ;
159-
160- long correctionAmount = timeUntilDue % pollTimeout ;
161- if (isInCorrectionWindow && correctionAmount > TIMING_CORRECTION_THRESHOLD ) {
162- this .taskExecutor .execute (() -> doApplyTimingCorrection (context , correctionAmount ));
163- return true ;
164- }
165- return false ;
166- }
167-
168- private void doApplyTimingCorrection (Context context , long correctionAmount ) {
169- try {
170- LOGGER .debug (() -> String .format ("Applying correction of %s millis at %s for TopicPartition %s" ,
171- correctionAmount , getCurrentMillisFromClock (), context .topicPartition ));
172- this .sleeper .sleep (correctionAmount );
173- LOGGER .debug (() -> "Waking up consumer for partition topic: " + context .topicPartition );
174- context .consumerForTimingCorrection .wakeup ();
175- }
176- catch (InterruptedException e ) {
177- Thread .currentThread ().interrupt ();
178- throw new IllegalStateException ("Interrupted waking up consumer while applying correction " +
179- "for TopicPartition " + context .topicPartition , e );
180- }
181- catch (Exception e ) { // NOSONAR
182- LOGGER .error (e , () -> "Error waking up consumer while applying correction " +
183- "for TopicPartition " + context .topicPartition );
184- }
185- }
186-
187208 private MessageListenerContainer getListenerContainerFromContext (Context context ) {
188- return this .registry .getListenerContainer (context .listenerId );
209+ return this .listenerContainerRegistry .getListenerContainer (context .listenerId );
189210 }
190211
191212 protected void addBackoff (Context context , TopicPartition topicPartition ) {
@@ -194,8 +215,7 @@ protected void addBackoff(Context context, TopicPartition topicPartition) {
194215 }
195216 }
196217
197- @ Nullable
198- protected Context getBackOffContext (TopicPartition topicPartition ) {
218+ protected @ Nullable Context getBackOffContext (TopicPartition topicPartition ) {
199219 synchronized (this .backOffContexts ) {
200220 return this .backOffContexts .get (topicPartition );
201221 }
@@ -208,8 +228,8 @@ protected void removeBackoff(TopicPartition topicPartition) {
208228 }
209229
210230 public Context createContext (long dueTimestamp , String listenerId , TopicPartition topicPartition ,
211- @ Nullable Consumer <?, ?> consumerForTimingCorrection ) {
212- return new Context (dueTimestamp , topicPartition , listenerId , consumerForTimingCorrection );
231+ @ Nullable Consumer <?, ?> consumerForTimingAdjustment ) {
232+ return new Context (dueTimestamp , topicPartition , listenerId , consumerForTimingAdjustment );
213233 }
214234
215235 /**
@@ -237,14 +257,14 @@ public static class Context {
237257 /**
238258 * The consumer of the message, if present.
239259 */
240- private final Consumer <?, ?> consumerForTimingCorrection ; // NOSONAR
260+ private final Consumer <?, ?> consumerForTimingAdjustment ; // NOSONAR
241261
242262 Context (long dueTimestamp , TopicPartition topicPartition , String listenerId ,
243- @ Nullable Consumer <?, ?> consumerForTimingCorrection ) {
263+ @ Nullable Consumer <?, ?> consumerForTimingAdjustment ) {
244264 this .dueTimestamp = dueTimestamp ;
245265 this .listenerId = listenerId ;
246266 this .topicPartition = topicPartition ;
247- this .consumerForTimingCorrection = consumerForTimingCorrection ;
267+ this .consumerForTimingAdjustment = consumerForTimingAdjustment ;
248268 }
249269 }
250270}
0 commit comments