Skip to content

Commit 5bd34be

Browse files
committed
Merge pull request #27992 from artembilan
* pr/27992: Polish "Add Spring Integration default poller auto-config" Add Spring Integration default poller auto-config Closes gh-27992
2 parents 5e42639 + 3274e24 commit 5bd34be

File tree

4 files changed

+217
-0
lines changed

4 files changed

+217
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.boot.autoconfigure.integration;
1818

19+
import java.time.Duration;
20+
1921
import javax.management.MBeanServer;
2022
import javax.sql.DataSource;
2123

@@ -37,6 +39,7 @@
3739
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
3840
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3941
import org.springframework.boot.context.properties.PropertyMapper;
42+
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
4043
import org.springframework.boot.task.TaskSchedulerBuilder;
4144
import org.springframework.context.annotation.Bean;
4245
import org.springframework.context.annotation.Conditional;
@@ -56,10 +59,14 @@
5659
import org.springframework.integration.rsocket.ServerRSocketConnector;
5760
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
5861
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
62+
import org.springframework.integration.scheduling.PollerMetadata;
5963
import org.springframework.messaging.rsocket.RSocketRequester;
6064
import org.springframework.messaging.rsocket.RSocketStrategies;
6165
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
66+
import org.springframework.scheduling.Trigger;
6267
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
68+
import org.springframework.scheduling.support.CronTrigger;
69+
import org.springframework.scheduling.support.PeriodicTrigger;
6370
import org.springframework.util.StringUtils;
6471

6572
/**
@@ -110,6 +117,46 @@ public static org.springframework.integration.context.IntegrationProperties inte
110117
@EnableIntegration
111118
protected static class IntegrationConfiguration {
112119

120+
@Bean(PollerMetadata.DEFAULT_POLLER)
121+
@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
122+
public PollerMetadata defaultPollerMetadata(IntegrationProperties integrationProperties) {
123+
IntegrationProperties.Poller poller = integrationProperties.getPoller();
124+
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
125+
entries.put("spring.integration.poller.cron",
126+
StringUtils.hasText(poller.getCron()) ? poller.getCron() : null);
127+
entries.put("spring.integration.poller.fixed-delay", poller.getFixedDelay());
128+
entries.put("spring.integration.poller.fixed-rate", poller.getFixedRate());
129+
});
130+
PollerMetadata pollerMetadata = new PollerMetadata();
131+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
132+
map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
133+
map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
134+
map.from(poller).as(this::asTrigger).to(pollerMetadata::setTrigger);
135+
return pollerMetadata;
136+
}
137+
138+
private Trigger asTrigger(IntegrationProperties.Poller poller) {
139+
if (StringUtils.hasText(poller.getCron())) {
140+
return new CronTrigger(poller.getCron());
141+
}
142+
if (poller.getFixedDelay() != null) {
143+
return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay(), true);
144+
}
145+
if (poller.getFixedRate() != null) {
146+
return createPeriodicTrigger(poller.getFixedRate(), poller.getInitialDelay(), false);
147+
}
148+
return null;
149+
}
150+
151+
private Trigger createPeriodicTrigger(Duration period, Duration initialDelay, boolean fixedRate) {
152+
PeriodicTrigger trigger = new PeriodicTrigger(period.toMillis());
153+
if (initialDelay != null) {
154+
trigger.setInitialDelay(initialDelay.toMillis());
155+
}
156+
trigger.setFixedRate(fixedRate);
157+
return trigger;
158+
}
159+
113160
}
114161

115162
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.boot.autoconfigure.integration;
1818

1919
import java.net.URI;
20+
import java.time.Duration;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223

@@ -44,6 +45,8 @@ public class IntegrationProperties {
4445

4546
private final RSocket rsocket = new RSocket();
4647

48+
private final Poller poller = new Poller();
49+
4750
public Channel getChannel() {
4851
return this.channel;
4952
}
@@ -64,6 +67,10 @@ public RSocket getRsocket() {
6467
return this.rsocket;
6568
}
6669

70+
public Poller getPoller() {
71+
return this.poller;
72+
}
73+
6774
public static class Channel {
6875

6976
/**
@@ -295,4 +302,88 @@ public void setMessageMappingEnabled(boolean messageMappingEnabled) {
295302

296303
}
297304

305+
public static class Poller {
306+
307+
/**
308+
* Maximum of messages to poll per polling cycle.
309+
*/
310+
private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED
311+
312+
/**
313+
* How long to wait for messages on poll.
314+
*/
315+
private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT
316+
317+
/**
318+
* Polling delay period. Mutually explusive with 'cron' and 'fixedRate'.
319+
*/
320+
private Duration fixedDelay;
321+
322+
/**
323+
* Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'.
324+
*/
325+
private Duration fixedRate;
326+
327+
/**
328+
* Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for
329+
* 'cron'.
330+
*/
331+
private Duration initialDelay;
332+
333+
/**
334+
* Cron expression for polling. Mutually explusive with 'fixedDelay' and
335+
* 'fixedRate'.
336+
*/
337+
private String cron;
338+
339+
public int getMaxMessagesPerPoll() {
340+
return this.maxMessagesPerPoll;
341+
}
342+
343+
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
344+
this.maxMessagesPerPoll = maxMessagesPerPoll;
345+
}
346+
347+
public Duration getReceiveTimeout() {
348+
return this.receiveTimeout;
349+
}
350+
351+
public void setReceiveTimeout(Duration receiveTimeout) {
352+
this.receiveTimeout = receiveTimeout;
353+
}
354+
355+
public Duration getFixedDelay() {
356+
return this.fixedDelay;
357+
}
358+
359+
public void setFixedDelay(Duration fixedDelay) {
360+
this.fixedDelay = fixedDelay;
361+
}
362+
363+
public Duration getFixedRate() {
364+
return this.fixedRate;
365+
}
366+
367+
public void setFixedRate(Duration fixedRate) {
368+
this.fixedRate = fixedRate;
369+
}
370+
371+
public Duration getInitialDelay() {
372+
return this.initialDelay;
373+
}
374+
375+
public void setInitialDelay(Duration initialDelay) {
376+
this.initialDelay = initialDelay;
377+
}
378+
379+
public String getCron() {
380+
return this.cron;
381+
}
382+
383+
public void setCron(String cron) {
384+
this.cron = cron;
385+
}
386+
387+
}
388+
298389
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616

1717
package org.springframework.boot.autoconfigure.integration;
1818

19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
1922
import javax.management.MBeanServer;
2023
import javax.sql.DataSource;
2124

2225
import io.rsocket.transport.ClientTransport;
2326
import io.rsocket.transport.netty.client.TcpClientTransport;
27+
import org.assertj.core.api.InstanceOfAssertFactories;
2428
import org.junit.jupiter.api.Test;
2529

2630
import org.springframework.beans.DirectFieldAccessor;
@@ -37,6 +41,7 @@
3741
import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
3842
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
3943
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
44+
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
4045
import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer;
4146
import org.springframework.boot.sql.init.DatabaseInitializationMode;
4247
import org.springframework.boot.sql.init.DatabaseInitializationSettings;
@@ -47,6 +52,8 @@
4752
import org.springframework.core.io.ResourceLoader;
4853
import org.springframework.integration.annotation.IntegrationComponentScan;
4954
import org.springframework.integration.annotation.MessagingGateway;
55+
import org.springframework.integration.annotation.ServiceActivator;
56+
import org.springframework.integration.channel.QueueChannel;
5057
import org.springframework.integration.config.IntegrationManagementConfigurer;
5158
import org.springframework.integration.context.IntegrationContextUtils;
5259
import org.springframework.integration.endpoint.MessageProcessorMessageSource;
@@ -55,13 +62,16 @@
5562
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
5663
import org.springframework.integration.rsocket.ServerRSocketConnector;
5764
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
65+
import org.springframework.integration.scheduling.PollerMetadata;
5866
import org.springframework.integration.support.channel.HeaderChannelRegistry;
5967
import org.springframework.jdbc.BadSqlGrammarException;
6068
import org.springframework.jdbc.core.JdbcOperations;
6169
import org.springframework.jmx.export.MBeanExporter;
6270
import org.springframework.messaging.Message;
71+
import org.springframework.messaging.MessageHandler;
6372
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
6473
import org.springframework.scheduling.TaskScheduler;
74+
import org.springframework.scheduling.support.CronTrigger;
6575

6676
import static org.assertj.core.api.Assertions.assertThat;
6777
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -390,6 +400,53 @@ void whenTheUserDefinesTheirOwnDatabaseInitializerThenTheAutoConfiguredIntegrati
390400
.hasBean("customInitializer"));
391401
}
392402

403+
@Test
404+
void defaultPoller() {
405+
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
406+
assertThat(context).hasSingleBean(PollerMetadata.class);
407+
PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
408+
assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(PollerMetadata.MAX_MESSAGES_UNBOUNDED);
409+
assertThat(metadata.getReceiveTimeout()).isEqualTo(PollerMetadata.DEFAULT_RECEIVE_TIMEOUT);
410+
assertThat(metadata.getTrigger()).isNull();
411+
});
412+
}
413+
414+
@Test
415+
void whenCustomPollerPropertiesAreSetThenTheyAreReflectedInPollerMetadata() {
416+
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
417+
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
418+
"spring.integration.poller.max-messages-per-poll=1",
419+
"spring.integration.poller.receive-timeout=10s")
420+
.run((context) -> {
421+
assertThat(context).hasSingleBean(PollerMetadata.class);
422+
PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
423+
assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(1L);
424+
assertThat(metadata.getReceiveTimeout()).isEqualTo(10000L);
425+
assertThat(metadata.getTrigger()).asInstanceOf(InstanceOfAssertFactories.type(CronTrigger.class))
426+
.satisfies((trigger) -> assertThat(trigger.getExpression()).isEqualTo("* * * ? * *"));
427+
});
428+
}
429+
430+
@Test
431+
void whenPollerPropertiesForMultipleTriggerTypesAreSetThenRefreshFails() {
432+
this.contextRunner
433+
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
434+
"spring.integration.poller.fixed-delay=1s")
435+
.run((context) -> assertThat(context).hasFailed().getFailure()
436+
.hasRootCauseExactlyInstanceOf(MutuallyExclusiveConfigurationPropertiesException.class)
437+
.getRootCause()
438+
.asInstanceOf(
439+
InstanceOfAssertFactories.type(MutuallyExclusiveConfigurationPropertiesException.class))
440+
.satisfies((ex) -> {
441+
assertThat(ex.getConfiguredNames()).containsExactlyInAnyOrder(
442+
"spring.integration.poller.cron", "spring.integration.poller.fixed-delay");
443+
assertThat(ex.getMutuallyExclusiveNames()).containsExactlyInAnyOrder(
444+
"spring.integration.poller.cron", "spring.integration.poller.fixed-delay",
445+
"spring.integration.poller.fixed-rate");
446+
}));
447+
448+
}
449+
393450
@Configuration(proxyBeanMethods = false)
394451
static class CustomMBeanExporter {
395452

@@ -478,4 +535,25 @@ IntegrationDataSourceInitializer customInitializer(DataSource dataSource, Resour
478535

479536
}
480537

538+
@Configuration(proxyBeanMethods = false)
539+
static class PollingConsumerConfiguration {
540+
541+
@Bean
542+
QueueChannel testChannel() {
543+
return new QueueChannel();
544+
}
545+
546+
@Bean
547+
BlockingQueue<Message<?>> sink() {
548+
return new LinkedBlockingQueue<>();
549+
}
550+
551+
@ServiceActivator(inputChannel = "testChannel")
552+
@Bean
553+
MessageHandler handler(BlockingQueue<Message<?>> sink) {
554+
return sink::add;
555+
}
556+
557+
}
558+
481559
}

spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport
55
If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation.
66

77
Spring Integration polling logic relies <<features#features.task-execution-and-scheduling,on the auto-configured `TaskScheduler`>>.
8+
The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties.
89

910
Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules.
1011
If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.

0 commit comments

Comments
 (0)