Skip to content

Commit 3274e24

Browse files
wilkinsonaphilwebb
andcommitted
Polish "Add Spring Integration default poller auto-config"
See gh-27992 Co-authored-by: Phillip Webb <[email protected]>
1 parent b2d1423 commit 3274e24

File tree

2 files changed

+59
-43
lines changed

2 files changed

+59
-43
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
4040
import org.springframework.boot.context.properties.EnableConfigurationProperties;
4141
import org.springframework.boot.context.properties.PropertyMapper;
42+
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
4243
import org.springframework.boot.task.TaskSchedulerBuilder;
4344
import org.springframework.context.annotation.Bean;
4445
import org.springframework.context.annotation.Conditional;
@@ -62,10 +63,10 @@
6263
import org.springframework.messaging.rsocket.RSocketRequester;
6364
import org.springframework.messaging.rsocket.RSocketStrategies;
6465
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
66+
import org.springframework.scheduling.Trigger;
6567
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
6668
import org.springframework.scheduling.support.CronTrigger;
6769
import org.springframework.scheduling.support.PeriodicTrigger;
68-
import org.springframework.util.Assert;
6970
import org.springframework.util.StringUtils;
7071

7172
/**
@@ -118,27 +119,44 @@ protected static class IntegrationConfiguration {
118119

119120
@Bean(PollerMetadata.DEFAULT_POLLER)
120121
@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
121-
public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) {
122+
public PollerMetadata defaultPollerMetadata(IntegrationProperties integrationProperties) {
122123
IntegrationProperties.Poller poller = integrationProperties.getPoller();
123-
int hasCron = poller.getCron() != null ? 1 : 0;
124-
int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0;
125-
int hasFixedRate = poller.getFixedRate() != null ? 1 : 0;
126-
Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1,
127-
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.");
124+
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
125+
entries.put("spring.integration.poller.cron",
126+
StringUtils.hasText(poller.getCron()) ? poller.getCron() : null);
127+
entries.put("spring.integration.poller.fixed-delay", poller.getFixedDelay());
128+
entries.put("spring.integration.poller.fixed-rate", poller.getFixedRate());
129+
});
128130
PollerMetadata pollerMetadata = new PollerMetadata();
129131
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
130132
map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
131133
map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
132-
map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger);
133-
map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate())
134-
.as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> {
135-
map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay);
136-
trigger.setFixedRate(poller.getFixedRate() != null);
137-
return trigger;
138-
}).to(pollerMetadata::setTrigger);
134+
map.from(poller).as(this::asTrigger).to(pollerMetadata::setTrigger);
139135
return pollerMetadata;
140136
}
141137

138+
private Trigger asTrigger(IntegrationProperties.Poller poller) {
139+
if (StringUtils.hasText(poller.getCron())) {
140+
return new CronTrigger(poller.getCron());
141+
}
142+
if (poller.getFixedDelay() != null) {
143+
return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay(), true);
144+
}
145+
if (poller.getFixedRate() != null) {
146+
return createPeriodicTrigger(poller.getFixedRate(), poller.getInitialDelay(), false);
147+
}
148+
return null;
149+
}
150+
151+
private Trigger createPeriodicTrigger(Duration period, Duration initialDelay, boolean fixedRate) {
152+
PeriodicTrigger trigger = new PeriodicTrigger(period.toMillis());
153+
if (initialDelay != null) {
154+
trigger.setInitialDelay(initialDelay.toMillis());
155+
}
156+
trigger.setFixedRate(fixedRate);
157+
return trigger;
158+
}
159+
142160
}
143161

144162
/**

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818

1919
import java.util.concurrent.BlockingQueue;
2020
import java.util.concurrent.LinkedBlockingQueue;
21-
import java.util.concurrent.TimeUnit;
2221

2322
import javax.management.MBeanServer;
2423
import javax.sql.DataSource;
2524

2625
import io.rsocket.transport.ClientTransport;
2726
import io.rsocket.transport.netty.client.TcpClientTransport;
27+
import org.assertj.core.api.InstanceOfAssertFactories;
2828
import org.junit.jupiter.api.Test;
29-
import reactor.core.publisher.Mono;
3029

3130
import org.springframework.beans.DirectFieldAccessor;
3231
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -42,6 +41,7 @@
4241
import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
4342
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
4443
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
44+
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
4545
import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer;
4646
import org.springframework.boot.sql.init.DatabaseInitializationMode;
4747
import org.springframework.boot.sql.init.DatabaseInitializationSettings;
@@ -70,7 +70,6 @@
7070
import org.springframework.messaging.Message;
7171
import org.springframework.messaging.MessageHandler;
7272
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
73-
import org.springframework.messaging.support.GenericMessage;
7473
import org.springframework.scheduling.TaskScheduler;
7574
import org.springframework.scheduling.support.CronTrigger;
7675

@@ -404,49 +403,48 @@ void whenTheUserDefinesTheirOwnDatabaseInitializerThenTheAutoConfiguredIntegrati
404403
@Test
405404
void defaultPoller() {
406405
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
407-
assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER)
408-
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED)
409-
.hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT)
410-
.hasFieldOrPropertyWithValue("trigger", null);
411-
412-
GenericMessage<String> testMessage = new GenericMessage<>("test");
413-
context.getBean("testChannel", QueueChannel.class).send(testMessage);
414-
@SuppressWarnings("unchecked")
415-
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
416-
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
406+
assertThat(context).hasSingleBean(PollerMetadata.class);
407+
PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
408+
assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(PollerMetadata.MAX_MESSAGES_UNBOUNDED);
409+
assertThat(metadata.getReceiveTimeout()).isEqualTo(PollerMetadata.DEFAULT_RECEIVE_TIMEOUT);
410+
assertThat(metadata.getTrigger()).isNull();
417411
});
418412
}
419413

420414
@Test
421-
void customPollerProperties() {
415+
void whenCustomPollerPropertiesAreSetThenTheyAreReflectedInPollerMetadata() {
422416
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
423417
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
424418
"spring.integration.poller.max-messages-per-poll=1",
425419
"spring.integration.poller.receive-timeout=10s")
426420
.run((context) -> {
427-
assertThat(context).hasSingleBean(PollerMetadata.class)
428-
.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class)
429-
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L)
430-
.hasFieldOrPropertyWithValue("receiveTimeout", 10000L)
431-
.extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class)
432-
.hasFieldOrPropertyWithValue("expression", "* * * ? * *");
433-
434-
GenericMessage<String> testMessage = new GenericMessage<>("test");
435-
context.getBean("testChannel", QueueChannel.class).send(testMessage);
436-
@SuppressWarnings("unchecked")
437-
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
438-
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
421+
assertThat(context).hasSingleBean(PollerMetadata.class);
422+
PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
423+
assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(1L);
424+
assertThat(metadata.getReceiveTimeout()).isEqualTo(10000L);
425+
assertThat(metadata.getTrigger()).asInstanceOf(InstanceOfAssertFactories.type(CronTrigger.class))
426+
.satisfies((trigger) -> assertThat(trigger.getExpression()).isEqualTo("* * * ? * *"));
439427
});
440428
}
441429

442430
@Test
443-
void triggerPropertiesAreMutuallyExclusive() {
431+
void whenPollerPropertiesForMultipleTriggerTypesAreSetThenRefreshFails() {
444432
this.contextRunner
445433
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
446434
"spring.integration.poller.fixed-delay=1s")
447435
.run((context) -> assertThat(context).hasFailed().getFailure()
448-
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining(
449-
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."));
436+
.hasRootCauseExactlyInstanceOf(MutuallyExclusiveConfigurationPropertiesException.class)
437+
.getRootCause()
438+
.asInstanceOf(
439+
InstanceOfAssertFactories.type(MutuallyExclusiveConfigurationPropertiesException.class))
440+
.satisfies((ex) -> {
441+
assertThat(ex.getConfiguredNames()).containsExactlyInAnyOrder(
442+
"spring.integration.poller.cron", "spring.integration.poller.fixed-delay");
443+
assertThat(ex.getMutuallyExclusiveNames()).containsExactlyInAnyOrder(
444+
"spring.integration.poller.cron", "spring.integration.poller.fixed-delay",
445+
"spring.integration.poller.fixed-rate");
446+
}));
447+
450448
}
451449

452450
@Configuration(proxyBeanMethods = false)

0 commit comments

Comments
 (0)