Skip to content

Commit ca0120f

Browse files
committed
Migrate Kafka Binder (& Streams) to Core Retry
The Spring Retry project goes to its sunset; therefore, a goal for the whole portfolio is to get rid of its dependency while putting the project into a maintenance mode. Use Spring Core `RetryTemplate` API instead * Fix `@StreamRetryTemplate` docs to talk about Core `RetryTemplate`. And fix respective tests to use a new import * Migrate `AbstractBinder` to Core Retry * Migrate `DefaultPollableMessageSource` to Core Retry and supporting retry API from Spring Integration * Migrate `KafkaMessageChannelBinder`, including respective tests * Fix typos in Javadocs * Fix for some Spring Boot 4.0 breaking changes to make project to be built at least at some level * Migrate Kafka Streams module to Core Retry * Fix parent POM to not use `-local` repositories to avoid authentication * Remove redundant `repositories` section from the Kafka binder as it is inherited from the parent * Comment out `spring-cloud-stream-integration-tests` module since it fail with not related problems * The Rabbit Binder would be fixed separately when Spring Boot is ready While this is a breaking change internally, this does not affect the end-user API too much. Moreover, the rest of Spring projects are already doing such a breaking change migration. So, aim for `spring-retry` removal in the end anyway The `spring-cloud-stream-schema-registry-server` fails for some Spring Boot incompatibility or my out-dated local SNAPSHOTs
1 parent 86d4010 commit ca0120f

File tree

29 files changed

+333
-383
lines changed

29 files changed

+333
-383
lines changed

binders/kafka-binder/pom.xml

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -123,46 +123,6 @@
123123
</build>
124124
</profile>
125125
</profiles>
126-
<repositories>
127-
<repository>
128-
<id>spring-snapshots</id>
129-
<name>Spring Snapshots</name>
130-
<url>https://repo.spring.io/libs-snapshot-local</url>
131-
</repository>
132-
<repository>
133-
<id>spring-milestones</id>
134-
<name>Spring milestones</name>
135-
<url>https://repo.spring.io/libs-milestone-local</url>
136-
</repository>
137-
<repository>
138-
<id>spring-releases</id>
139-
<name>Spring Releases</name>
140-
<url>https://repo.spring.io/release</url>
141-
</repository>
142-
</repositories>
143-
<pluginRepositories>
144-
<pluginRepository>
145-
<id>spring-snapshots</id>
146-
<name>Spring Snapshots</name>
147-
<url>https://repo.spring.io/snapshot</url>
148-
<snapshots>
149-
<enabled>true</enabled>
150-
</snapshots>
151-
</pluginRepository>
152-
<pluginRepository>
153-
<id>spring-milestones</id>
154-
<name>Spring Milestones</name>
155-
<url>https://repo.spring.io/milestone</url>
156-
<snapshots>
157-
<enabled>false</enabled>
158-
</snapshots>
159-
</pluginRepository>
160-
<pluginRepository>
161-
<id>spring-releases</id>
162-
<name>Spring Releases</name>
163-
<url>https://repo.spring.io/release</url>
164-
</pluginRepository>
165-
</pluginRepositories>
166126
<reporting>
167127
<plugins>
168128
<plugin>

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.stream.binder.kafka.provisioning;
1818

19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collection;
@@ -70,12 +71,12 @@
7071
import org.springframework.cloud.stream.provisioning.ProducerDestination;
7172
import org.springframework.cloud.stream.provisioning.ProvisioningException;
7273
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
74+
import org.springframework.core.retry.RetryException;
75+
import org.springframework.core.retry.RetryPolicy;
7376
import org.springframework.kafka.core.ConsumerFactory;
7477
import org.springframework.kafka.core.ProducerFactory;
75-
import org.springframework.retry.RetryOperations;
76-
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
77-
import org.springframework.retry.policy.SimpleRetryPolicy;
78-
import org.springframework.retry.support.RetryTemplate;
78+
import org.springframework.core.retry.RetryOperations;
79+
import org.springframework.core.retry.RetryTemplate;
7980
import org.springframework.util.Assert;
8081
import org.springframework.util.CollectionUtils;
8182
import org.springframework.util.ObjectUtils;
@@ -94,6 +95,7 @@
9495
* @author Omer Celik
9596
* @author Byungjun You
9697
* @author Roman Akentev
98+
* @author Artem Bilan
9799
*/
98100
public class KafkaTopicProvisioner implements
99101
// @checkstyle:off
@@ -211,18 +213,13 @@ public void setMetadataRetryOperations(RetryOperations metadataRetryOperations)
211213
@Override
212214
public void afterPropertiesSet() {
213215
if (this.metadataRetryOperations == null) {
214-
RetryTemplate retryTemplate = new RetryTemplate();
215-
216-
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
217-
simpleRetryPolicy.setMaxAttempts(10);
218-
retryTemplate.setRetryPolicy(simpleRetryPolicy);
219-
220-
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
221-
backOffPolicy.setInitialInterval(100);
222-
backOffPolicy.setMultiplier(2);
223-
backOffPolicy.setMaxInterval(1000);
224-
retryTemplate.setBackOffPolicy(backOffPolicy);
225-
this.metadataRetryOperations = retryTemplate;
216+
RetryPolicy retryPolicy = RetryPolicy.builder()
217+
.maxAttempts(10)
218+
.delay(Duration.ofMillis(100))
219+
.multiplier(2)
220+
.maxDelay(Duration.ofSeconds(1))
221+
.build();
222+
this.metadataRetryOperations = new RetryTemplate(retryPolicy);
226223
}
227224
}
228225

@@ -311,21 +308,21 @@ private int getPartitionsForTopic(String topicName, AdminClient adminClient) {
311308
}
312309

313310
private Map<String, TopicDescription> retrieveTopicDescriptions(String topicName, AdminClient adminClient) {
314-
return this.metadataRetryOperations.execute(context -> {
315-
try {
311+
try {
312+
return this.metadataRetryOperations.execute(() -> {
313+
316314
if (logger.isDebugEnabled()) {
317315
logger.debug("Attempting to retrieve the description for the topic: " + topicName);
318316
}
319317
DescribeTopicsResult describeTopicsResult = adminClient
320318
.describeTopics(Collections.singletonList(topicName));
321-
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult
322-
.allTopicNames();
319+
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.allTopicNames();
323320
return all.get(this.operationTimeout, TimeUnit.SECONDS);
324-
}
325-
catch (Exception ex) {
326-
throw new ProvisioningException("Problems encountered with partitions finding for: " + topicName, ex);
327-
}
328-
});
321+
});
322+
}
323+
catch (RetryException ex) {
324+
throw new ProvisioningException("Problems encountered with partitions finding for: " + topicName, ex);
325+
}
329326
}
330327

331328
AdminClient createAdminClient() {
@@ -505,7 +502,7 @@ else if (tolerateLowerPartitionsOnBroker) {
505502
// always consider minPartitionCount for topic creation
506503
final int effectivePartitionCount = Math.max(
507504
this.configurationProperties.getMinPartitionCount(), partitionCount);
508-
this.metadataRetryOperations.execute((context) -> {
505+
this.metadataRetryOperations.execute(() -> {
509506

510507
NewTopic newTopic;
511508
Map<Integer, List<Integer>> replicasAssignments = topicProperties
@@ -660,7 +657,7 @@ public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount,
660657
final boolean tolerateLowerPartitionsOnBroker,
661658
final Callable<Collection<PartitionInfo>> callable, final String topicName) {
662659
try {
663-
return this.metadataRetryOperations.execute((context) -> {
660+
return this.metadataRetryOperations.execute(() -> {
664661
Collection<PartitionInfo> partitions = Collections.emptyList();
665662

666663
try {

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
3333
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
3434
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
35+
import org.springframework.core.retry.RetryTemplate;
3536
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
36-
import org.springframework.retry.support.RetryTemplate;
3737
import org.springframework.util.StringUtils;
3838

3939
/**
@@ -75,7 +75,6 @@ public GlobalKTableBinder(
7575
}
7676

7777
@Override
78-
@SuppressWarnings("unchecked")
7978
protected Binding<GlobalKTable<Object, Object>> doBindConsumer(String name,
8079
String group, GlobalKTable<Object, Object> inputTarget,
8180
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java

Lines changed: 83 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.stream.binder.kafka.streams;
1818

19+
import java.time.Duration;
1920
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
@@ -38,23 +39,24 @@
3839
import org.apache.kafka.streams.state.QueryableStoreType;
3940

4041
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
41-
import org.springframework.retry.RetryPolicy;
42-
import org.springframework.retry.backoff.FixedBackOffPolicy;
43-
import org.springframework.retry.policy.SimpleRetryPolicy;
44-
import org.springframework.retry.support.RetryTemplate;
42+
import org.springframework.core.retry.RetryException;
43+
import org.springframework.core.retry.RetryPolicy;
44+
import org.springframework.core.retry.RetryTemplate;
45+
import org.springframework.util.ReflectionUtils;
4546
import org.springframework.util.StringUtils;
4647

4748
/**
4849
* Services pertinent to the interactive query capabilities of Kafka Streams. This class
4950
* provides services such as querying for a particular store, which instance is hosting a
50-
* particular store etc. This is part of the public API of the kafka streams binder and
51+
* particular store etc. This is part of the public API of the kafka streams binder, and
5152
* the users can inject this service in their applications to make use of it.
5253
*
5354
* @author Soby Chacko
5455
* @author Renwei Han
5556
* @author Serhii Siryi
5657
* @author Nico Pommerening
5758
* @author Chris Bono
59+
* @author Artem Bilan
5860
* @since 2.1.0
5961
*/
6062
public class InteractiveQueryService {
@@ -100,58 +102,64 @@ public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType
100102

101103
AtomicReference<StoreQueryParameters<T>> storeQueryParametersAtomicReference = new AtomicReference<>(storeQueryParams);
102104

103-
return getRetryTemplate().execute(context -> {
104-
T store = null;
105-
Throwable throwable = null;
106-
if (contextSpecificKafkaStreams != null) {
107-
try {
108-
store = contextSpecificKafkaStreams.store(storeQueryParametersAtomicReference.get());
109-
}
110-
catch (InvalidStateStoreException e) {
111-
throwable = e;
105+
try {
106+
return getRetryTemplate().execute(() -> {
107+
T store = null;
108+
Throwable throwable = null;
109+
if (contextSpecificKafkaStreams != null) {
110+
try {
111+
store = contextSpecificKafkaStreams.store(storeQueryParametersAtomicReference.get());
112+
}
113+
catch (InvalidStateStoreException e) {
114+
throwable = e;
115+
}
112116
}
113-
}
114-
if (store != null) {
115-
return store;
116-
}
117-
if (contextSpecificKafkaStreams != null) {
118-
LOG.warn("Store (" + storeName + ") could not be found in Streams context, falling back to all known Streams instances");
119-
}
120-
121-
// Find all apps that know about the store
122-
Map<KafkaStreams, T> candidateStores = new HashMap<>();
123-
for (KafkaStreams kafkaStreamApp : kafkaStreamsRegistry.getKafkaStreams()) {
124-
try {
125-
candidateStores.put(kafkaStreamApp, kafkaStreamApp.store(storeQueryParametersAtomicReference.get()));
117+
if (store != null) {
118+
return store;
126119
}
127-
catch (Exception ex) {
128-
throwable = ex;
120+
if (contextSpecificKafkaStreams != null) {
121+
LOG.warn("Store (" + storeName + ") could not be found in Streams context, falling back to all known Streams instances");
129122
}
130-
}
131-
132-
// Store exists in a single app - no further resolution required
133-
if (candidateStores.size() == 1) {
134-
return candidateStores.values().stream().findFirst().get();
135-
}
136123

137-
// If the store is in multiple streams apps - discard any apps that do not actually have the store
138-
if (candidateStores.size() > 1) {
139-
140-
candidateStores = candidateStores.entrySet().stream()
141-
.filter((e) -> this.topologyInfoFacade.streamsAppActuallyHasStore(e.getKey(), storeName))
142-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
124+
// Find all apps that know about the store
125+
Map<KafkaStreams, T> candidateStores = new HashMap<>();
126+
for (KafkaStreams kafkaStreamApp : kafkaStreamsRegistry.getKafkaStreams()) {
127+
try {
128+
candidateStores.put(kafkaStreamApp, kafkaStreamApp.store(storeQueryParametersAtomicReference.get()));
129+
}
130+
catch (Exception ex) {
131+
throwable = ex;
132+
}
133+
}
143134

135+
// Store exists in a single app - no further resolution required
144136
if (candidateStores.size() == 1) {
145137
return candidateStores.values().stream().findFirst().get();
146138
}
147139

148-
throwable = (candidateStores.size() == 0) ?
149-
new UnknownStateStoreException("Store (" + storeName + ") not available to Streams instance") :
150-
new InvalidStateStoreException("Store (" + storeName + ") available to more than one Streams instance");
140+
// If the store is in multiple streams apps - discard any apps that do not actually have the store
141+
if (candidateStores.size() > 1) {
142+
143+
candidateStores = candidateStores.entrySet().stream()
144+
.filter((e) -> this.topologyInfoFacade.streamsAppActuallyHasStore(e.getKey(), storeName))
145+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
146+
147+
if (candidateStores.size() == 1) {
148+
return candidateStores.values().stream().findFirst().get();
149+
}
151150

152-
}
153-
throw new IllegalStateException("Error retrieving state store: " + storeName, throwable);
154-
});
151+
throwable = (candidateStores.isEmpty()) ?
152+
new UnknownStateStoreException("Store (" + storeName + ") not available to Streams instance") :
153+
new InvalidStateStoreException("Store (" + storeName + ") available to more than one Streams instance");
154+
155+
}
156+
throw new IllegalStateException("Error retrieving state store: " + storeName, throwable);
157+
});
158+
}
159+
catch (RetryException ex) {
160+
ReflectionUtils.rethrowRuntimeException(ex.getCause());
161+
return null;
162+
}
155163
}
156164

157165
/**
@@ -218,38 +226,40 @@ public HostInfo getCurrentHostInfo() {
218226
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
219227
final RetryTemplate retryTemplate = getRetryTemplate();
220228

221-
222-
return retryTemplate.execute(context -> {
223-
Throwable throwable = null;
224-
try {
225-
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
226-
.stream()
227-
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
228-
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
229-
if (keyQueryMetadata != null) {
230-
return keyQueryMetadata.activeHost();
229+
try {
230+
return retryTemplate.execute(() -> {
231+
Throwable throwable = null;
232+
try {
233+
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
234+
.stream()
235+
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
236+
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
237+
if (keyQueryMetadata != null) {
238+
return keyQueryMetadata.activeHost();
239+
}
240+
}
241+
catch (Exception e) {
242+
throwable = e;
231243
}
232-
}
233-
catch (Exception e) {
234-
throwable = e;
235-
}
236-
throw new IllegalStateException(
237-
"Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
238-
});
244+
throw new IllegalStateException(
245+
"Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
246+
});
247+
}
248+
catch (RetryException ex) {
249+
ReflectionUtils.rethrowRuntimeException(ex.getCause());
250+
return null;
251+
}
239252
}
240253

241254
private RetryTemplate getRetryTemplate() {
242-
RetryTemplate retryTemplate = new RetryTemplate();
243-
244-
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
245-
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
246-
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
247-
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
248255

249-
retryTemplate.setBackOffPolicy(backOffPolicy);
250-
retryTemplate.setRetryPolicy(retryPolicy);
256+
var stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
257+
RetryPolicy retryPolicy = RetryPolicy.builder()
258+
.maxAttempts(stateStoreRetry.getMaxAttempts())
259+
.delay(Duration.ofMillis(stateStoreRetry.getBackoffPeriod()))
260+
.build();
251261

252-
return retryTemplate;
262+
return new RetryTemplate(retryPolicy);
253263
}
254264

255265
/**

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
4545
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
4646
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
47-
import org.springframework.retry.support.RetryTemplate;
47+
import org.springframework.core.retry.RetryTemplate;
4848
import org.springframework.util.StringUtils;
4949

5050
/**

0 commit comments

Comments
 (0)