Skip to content

Commit 99ff5cf

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-1762: Extract KafkaConsumerBackoffManager
Resolves #1762 Remove unnecessary NOSONAR comments.
1 parent d0471e5 commit 99ff5cf

File tree

9 files changed

+301
-245
lines changed

9 files changed

+301
-245
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java

Lines changed: 32 additions & 207 deletions
Original file line numberDiff line numberDiff line change
@@ -16,255 +16,80 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.time.Clock;
20-
import java.time.Instant;
21-
import java.util.HashMap;
22-
import java.util.Map;
23-
24-
import org.apache.commons.logging.LogFactory;
2519
import org.apache.kafka.clients.consumer.Consumer;
2620
import org.apache.kafka.common.TopicPartition;
2721

28-
import org.springframework.context.ApplicationListener;
29-
import org.springframework.core.log.LogAccessor;
30-
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
3122
import org.springframework.lang.Nullable;
3223

3324
/**
34-
*
35-
* A manager that backs off consumption for a given topic if the timestamp provided is not
36-
* due. Use with {@link SeekToCurrentErrorHandler} to guarantee that the message is read
37-
* again after partition consumption is resumed (or seek it manually by other means).
38-
* It's also necessary to set a {@link ContainerProperties#setIdlePartitionEventInterval(Long)}
39-
* so the Manager can resume the partition consumption.
40-
*
41-
* Note that when a record backs off the partition consumption gets paused for
42-
* approximately that amount of time, so you must have a fixed backoff value per partition.
25+
* Interface for backing off a {@link MessageListenerContainer}
26+
* until a given dueTimestamp, if such timestamp is in the future.
4327
*
4428
* @author Tomaz Fernandes
4529
* @author Gary Russell
4630
* @since 2.7
47-
* @see SeekToCurrentErrorHandler
4831
*/
49-
public class KafkaConsumerBackoffManager implements ApplicationListener<ListenerContainerPartitionIdleEvent> {
50-
51-
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
52-
53-
private final ListenerContainerRegistry listenerContainerRegistry;
32+
public interface KafkaConsumerBackoffManager {
5433

55-
private final Map<TopicPartition, Context> backOffContexts;
56-
57-
private final Clock clock;
58-
59-
private final KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster;
60-
61-
/**
62-
* Constructs an instance with the provided {@link ListenerContainerRegistry} and
63-
* {@link KafkaConsumerTimingAdjuster}.
64-
*
65-
* The ListenerContainerRegistry is used to fetch the {@link MessageListenerContainer}
66-
* that will be backed off / resumed.
67-
*
68-
* The KafkaConsumerTimingAdjuster is used to make timing adjustments
69-
* in the message consumption so that it processes the message closer
70-
* to its due time rather than later.
71-
*
72-
* @param listenerContainerRegistry the listenerContainerRegistry to use.
73-
* @param kafkaConsumerTimingAdjuster the kafkaConsumerTimingAdjuster to use.
74-
*/
75-
public KafkaConsumerBackoffManager(ListenerContainerRegistry listenerContainerRegistry,
76-
KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster) {
34+
void backOffIfNecessary(Context context);
7735

78-
this.listenerContainerRegistry = listenerContainerRegistry;
79-
this.kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster;
80-
this.clock = Clock.systemUTC();
81-
this.backOffContexts = new HashMap<>();
82-
}
83-
84-
/**
85-
* Constructs an instance with the provided {@link ListenerContainerRegistry}
86-
* and with no timing adjustment capabilities.
87-
*
88-
* The ListenerContainerRegistry is used to fetch the {@link MessageListenerContainer}
89-
* that will be backed off / resumed.
90-
*
91-
* @param listenerContainerRegistry the listenerContainerRegistry to use.
92-
*/
93-
public KafkaConsumerBackoffManager(ListenerContainerRegistry listenerContainerRegistry) {
94-
95-
this.listenerContainerRegistry = listenerContainerRegistry;
96-
this.kafkaConsumerTimingAdjuster = null;
97-
this.clock = Clock.systemUTC();
98-
this.backOffContexts = new HashMap<>();
99-
}
100-
101-
/**
102-
* Creates an instance with the provided {@link ListenerContainerRegistry},
103-
* {@link KafkaConsumerTimingAdjuster} and {@link Clock}.
104-
*
105-
* @param listenerContainerRegistry the listenerContainerRegistry to use.
106-
* @param kafkaConsumerTimingAdjuster the kafkaConsumerTimingAdjuster to use.
107-
* @param clock the clock to use.
108-
*/
109-
public KafkaConsumerBackoffManager(ListenerContainerRegistry listenerContainerRegistry,
110-
KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster,
111-
Clock clock) {
112-
113-
this.listenerContainerRegistry = listenerContainerRegistry;
114-
this.clock = clock;
115-
this.kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster;
116-
this.backOffContexts = new HashMap<>();
117-
}
118-
119-
/**
120-
* Creates an instance with the provided {@link ListenerContainerRegistry}
121-
* and {@link Clock}, with no timing adjustment capabilities.
122-
*
123-
* @param listenerContainerRegistry the listenerContainerRegistry to use.
124-
* @param clock the clock to use.
125-
*/
126-
public KafkaConsumerBackoffManager(ListenerContainerRegistry listenerContainerRegistry, Clock clock) {
127-
128-
this.listenerContainerRegistry = listenerContainerRegistry;
129-
this.clock = clock;
130-
this.kafkaConsumerTimingAdjuster = null;
131-
this.backOffContexts = new HashMap<>();
132-
}
133-
134-
/**
135-
* Backs off if the current time is before the dueTimestamp provided
136-
* in the {@link Context} object.
137-
* @param context the back off context for this execution.
138-
*/
139-
public void maybeBackoff(Context context) {
140-
long backoffTime = context.dueTimestamp - getCurrentMillisFromClock();
141-
if (backoffTime > 0) {
142-
pauseConsumptionAndThrow(context, backoffTime);
143-
}
144-
}
145-
146-
private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException {
147-
TopicPartition topicPartition = context.topicPartition;
148-
getListenerContainerFromContext(context).pausePartition(topicPartition);
149-
addBackoff(context, topicPartition);
150-
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
151-
"backing off for approx. %s millis.", context.topicPartition.partition(),
152-
context.topicPartition.topic(), backOffTime),
153-
topicPartition, context.listenerId, context.dueTimestamp);
154-
}
155-
156-
@Override
157-
public void onApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdleEvent) {
158-
LOGGER.debug(() -> String.format("partitionIdleEvent received at %s. Partition: %s",
159-
getCurrentMillisFromClock(), partitionIdleEvent.getTopicPartition()));
160-
161-
Context backOffContext = getBackOffContext(partitionIdleEvent.getTopicPartition());
162-
maybeResumeConsumption(backOffContext);
163-
}
164-
165-
private long getCurrentMillisFromClock() {
166-
return Instant.now(this.clock).toEpochMilli();
167-
}
168-
169-
private void maybeResumeConsumption(@Nullable Context context) {
170-
if (context == null) {
171-
return;
172-
}
173-
long now = getCurrentMillisFromClock();
174-
long timeUntilDue = context.dueTimestamp - now;
175-
long pollTimeout = getListenerContainerFromContext(context)
176-
.getContainerProperties()
177-
.getPollTimeout();
178-
boolean isDue = timeUntilDue <= pollTimeout;
179-
180-
long adjustedAmount = applyTimingAdjustment(context, timeUntilDue, pollTimeout);
181-
182-
if (adjustedAmount != 0L || isDue) {
183-
resumePartition(context);
184-
}
185-
else {
186-
LOGGER.debug(() -> String.format("TopicPartition %s not due. DueTimestamp: %s Now: %s ",
187-
context.topicPartition, context.dueTimestamp, now));
188-
}
189-
}
190-
191-
private long applyTimingAdjustment(Context context, long timeUntilDue, long pollTimeout) {
192-
if (this.kafkaConsumerTimingAdjuster == null || context.consumerForTimingAdjustment == null) {
193-
LOGGER.debug(() -> String.format(
194-
"Skipping timing adjustment for TopicPartition %s.", context.topicPartition));
195-
return 0L;
196-
}
197-
return this.kafkaConsumerTimingAdjuster.adjustTiming(
198-
context.consumerForTimingAdjustment, context.topicPartition, pollTimeout, timeUntilDue);
199-
}
200-
201-
private void resumePartition(Context context) {
202-
MessageListenerContainer container = getListenerContainerFromContext(context);
203-
LOGGER.debug(() -> "Resuming partition at " + getCurrentMillisFromClock());
204-
container.resumePartition(context.topicPartition);
205-
removeBackoff(context.topicPartition);
206-
}
207-
208-
private MessageListenerContainer getListenerContainerFromContext(Context context) {
209-
return this.listenerContainerRegistry.getListenerContainer(context.listenerId);
210-
}
211-
212-
protected void addBackoff(Context context, TopicPartition topicPartition) {
213-
synchronized (this.backOffContexts) {
214-
this.backOffContexts.put(topicPartition, context);
215-
}
216-
}
217-
218-
protected @Nullable Context getBackOffContext(TopicPartition topicPartition) {
219-
synchronized (this.backOffContexts) {
220-
return this.backOffContexts.get(topicPartition);
221-
}
222-
}
223-
224-
protected void removeBackoff(TopicPartition topicPartition) {
225-
synchronized (this.backOffContexts) {
226-
this.backOffContexts.remove(topicPartition);
227-
}
228-
}
229-
230-
public Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition,
231-
@Nullable Consumer<?, ?> consumerForTimingAdjustment) {
232-
return new Context(dueTimestamp, topicPartition, listenerId, consumerForTimingAdjustment);
36+
default Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition,
37+
@Nullable Consumer<?, ?> messageConsumer) {
38+
return new Context(dueTimestamp, topicPartition, listenerId, messageConsumer);
23339
}
23440

23541
/**
23642
* Provides the state that will be used for backing off.
23743
* @since 2.7
23844
*/
239-
public static class Context {
45+
class Context {
24046

24147
/**
24248
* The time after which the message should be processed,
24349
* in milliseconds since epoch.
24450
*/
245-
private final long dueTimestamp; // NOSONAR
51+
private final long dueTimestamp;
24652

24753
/**
24854
* The id for the listener that should be paused.
24955
*/
250-
private final String listenerId; // NOSONAR
56+
private final String listenerId;
25157

25258
/**
25359
* The topic that contains the partition to be paused.
25460
*/
255-
private final TopicPartition topicPartition; // NOSONAR
61+
private final TopicPartition topicPartition;
25662

25763
/**
25864
* The consumer of the message, if present.
25965
*/
260-
private final Consumer<?, ?> consumerForTimingAdjustment; // NOSONAR
66+
private final Consumer<?, ?> consumerForTimingAdjustment;
26167

26268
Context(long dueTimestamp, TopicPartition topicPartition, String listenerId,
263-
@Nullable Consumer<?, ?> consumerForTimingAdjustment) {
69+
@Nullable Consumer<?, ?> consumerForTimingAdjustment) {
70+
26471
this.dueTimestamp = dueTimestamp;
26572
this.listenerId = listenerId;
26673
this.topicPartition = topicPartition;
26774
this.consumerForTimingAdjustment = consumerForTimingAdjustment;
26875
}
76+
77+
public long getDueTimestamp() {
78+
return this.dueTimestamp;
79+
}
80+
81+
public String getListenerId() {
82+
return this.listenerId;
83+
}
84+
85+
public TopicPartition getTopicPartition() {
86+
return this.topicPartition;
87+
}
88+
89+
public @Nullable Consumer<?, ?> getConsumerForTimingAdjustment() {
90+
return this.consumerForTimingAdjustment;
91+
}
92+
26993
}
94+
27095
}
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* @author Tomaz Fernandes
3333
* @since 2.7
3434
*/
35-
public class TimingAdjustingKafkaBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {
35+
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {
3636

3737
private boolean timingAdjustmentEnabled = true;
3838

@@ -48,7 +48,7 @@ public class TimingAdjustingKafkaBackOffManagerFactory extends AbstractKafkaBack
4848
*
4949
* @param timingAdjustmentManager the {@link KafkaConsumerTimingAdjuster} to be used.
5050
*/
51-
public TimingAdjustingKafkaBackOffManagerFactory(KafkaConsumerTimingAdjuster timingAdjustmentManager) {
51+
public PartitionPausingBackOffManagerFactory(KafkaConsumerTimingAdjuster timingAdjustmentManager) {
5252
this.clock = getDefaultClock();
5353
setTimingAdjustmentManager(timingAdjustmentManager);
5454
}
@@ -59,7 +59,7 @@ public TimingAdjustingKafkaBackOffManagerFactory(KafkaConsumerTimingAdjuster tim
5959
*
6060
* @param timingAdjustmentManagerTaskExecutor the {@link TaskExecutor} to be used.
6161
*/
62-
public TimingAdjustingKafkaBackOffManagerFactory(TaskExecutor timingAdjustmentManagerTaskExecutor) {
62+
public PartitionPausingBackOffManagerFactory(TaskExecutor timingAdjustmentManagerTaskExecutor) {
6363
this.clock = getDefaultClock();
6464
setTaskExecutor(timingAdjustmentManagerTaskExecutor);
6565
}
@@ -70,7 +70,7 @@ public TimingAdjustingKafkaBackOffManagerFactory(TaskExecutor timingAdjustmentMa
7070
*
7171
* @param timingAdjustmentEnabled the {@link KafkaConsumerTimingAdjuster} to be used.
7272
*/
73-
public TimingAdjustingKafkaBackOffManagerFactory(boolean timingAdjustmentEnabled) {
73+
public PartitionPausingBackOffManagerFactory(boolean timingAdjustmentEnabled) {
7474
this.clock = getDefaultClock();
7575
setTimingAdjustmentEnabled(timingAdjustmentEnabled);
7676
}
@@ -80,15 +80,15 @@ public TimingAdjustingKafkaBackOffManagerFactory(boolean timingAdjustmentEnabled
8080
*
8181
* @param listenerContainerRegistry the {@link ListenerContainerRegistry} to be used.
8282
*/
83-
public TimingAdjustingKafkaBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
83+
public PartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
8484
super(listenerContainerRegistry);
8585
this.clock = getDefaultClock();
8686
}
8787

8888
/**
8989
* Constructs a factory instance with default dependencies.
9090
*/
91-
public TimingAdjustingKafkaBackOffManagerFactory() {
91+
public PartitionPausingBackOffManagerFactory() {
9292
this.clock = getDefaultClock();
9393
}
9494

@@ -97,7 +97,7 @@ public TimingAdjustingKafkaBackOffManagerFactory() {
9797
* with the provided {@link Clock}.
9898
* @param clock the clock instance to be used.
9999
*/
100-
public TimingAdjustingKafkaBackOffManagerFactory(Clock clock) {
100+
public PartitionPausingBackOffManagerFactory(Clock clock) {
101101
this.clock = clock;
102102
}
103103

@@ -133,7 +133,7 @@ public final void setTaskExecutor(TaskExecutor taskExecutor) {
133133

134134
@Override
135135
protected KafkaConsumerBackoffManager doCreateManager(ListenerContainerRegistry registry) {
136-
KafkaConsumerBackoffManager kafkaConsumerBackoffManager = getKafkaConsumerBackoffManager(registry);
136+
PartitionPausingBackoffManager kafkaConsumerBackoffManager = getKafkaConsumerBackoffManager(registry);
137137
super.addApplicationListener(kafkaConsumerBackoffManager);
138138
return kafkaConsumerBackoffManager;
139139
}
@@ -142,10 +142,10 @@ protected final Clock getDefaultClock() {
142142
return Clock.systemUTC();
143143
}
144144

145-
private KafkaConsumerBackoffManager getKafkaConsumerBackoffManager(ListenerContainerRegistry registry) {
145+
private PartitionPausingBackoffManager getKafkaConsumerBackoffManager(ListenerContainerRegistry registry) {
146146
return this.timingAdjustmentEnabled
147-
? new KafkaConsumerBackoffManager(registry, getOrCreateBackOffTimingAdjustmentManager(), this.clock)
148-
: new KafkaConsumerBackoffManager(registry, this.clock);
147+
? new PartitionPausingBackoffManager(registry, getOrCreateBackOffTimingAdjustmentManager(), this.clock)
148+
: new PartitionPausingBackoffManager(registry, this.clock);
149149
}
150150

151151
private KafkaConsumerTimingAdjuster getOrCreateBackOffTimingAdjustmentManager() {

0 commit comments

Comments
 (0)