Skip to content

Commit b2d1423

Browse files
artembilanphilwebb
authored andcommitted
Add Spring Integration default poller auto-config
When polling consumers or source polling channel adapters are used in Spring Integration applications, they require some polling policy to be configured. This comment auto-configures a PollerMetadata bean which customized via newly added `spring.integration.poller.*` configuration properties or overriden completely be user-defined bean. See gh-27992
1 parent 5e42639 commit b2d1423

File tree

4 files changed

+201
-0
lines changed

4 files changed

+201
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.boot.autoconfigure.integration;
1818

19+
import java.time.Duration;
20+
1921
import javax.management.MBeanServer;
2022
import javax.sql.DataSource;
2123

@@ -56,10 +58,14 @@
5658
import org.springframework.integration.rsocket.ServerRSocketConnector;
5759
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
5860
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
61+
import org.springframework.integration.scheduling.PollerMetadata;
5962
import org.springframework.messaging.rsocket.RSocketRequester;
6063
import org.springframework.messaging.rsocket.RSocketStrategies;
6164
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
6265
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
66+
import org.springframework.scheduling.support.CronTrigger;
67+
import org.springframework.scheduling.support.PeriodicTrigger;
68+
import org.springframework.util.Assert;
6369
import org.springframework.util.StringUtils;
6470

6571
/**
@@ -110,6 +116,29 @@ public static org.springframework.integration.context.IntegrationProperties inte
110116
@EnableIntegration
111117
protected static class IntegrationConfiguration {
112118

119+
@Bean(PollerMetadata.DEFAULT_POLLER)
120+
@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
121+
public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) {
122+
IntegrationProperties.Poller poller = integrationProperties.getPoller();
123+
int hasCron = poller.getCron() != null ? 1 : 0;
124+
int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0;
125+
int hasFixedRate = poller.getFixedRate() != null ? 1 : 0;
126+
Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1,
127+
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.");
128+
PollerMetadata pollerMetadata = new PollerMetadata();
129+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
130+
map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
131+
map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
132+
map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger);
133+
map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate())
134+
.as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> {
135+
map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay);
136+
trigger.setFixedRate(poller.getFixedRate() != null);
137+
return trigger;
138+
}).to(pollerMetadata::setTrigger);
139+
return pollerMetadata;
140+
}
141+
113142
}
114143

115144
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.boot.autoconfigure.integration;
1818

1919
import java.net.URI;
20+
import java.time.Duration;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223

@@ -44,6 +45,8 @@ public class IntegrationProperties {
4445

4546
private final RSocket rsocket = new RSocket();
4647

48+
private final Poller poller = new Poller();
49+
4750
public Channel getChannel() {
4851
return this.channel;
4952
}
@@ -64,6 +67,10 @@ public RSocket getRsocket() {
6467
return this.rsocket;
6568
}
6669

70+
public Poller getPoller() {
71+
return this.poller;
72+
}
73+
6774
public static class Channel {
6875

6976
/**
@@ -295,4 +302,88 @@ public void setMessageMappingEnabled(boolean messageMappingEnabled) {
295302

296303
}
297304

305+
public static class Poller {
306+
307+
/**
308+
* Maximum of messages to poll per polling cycle.
309+
*/
310+
private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED
311+
312+
/**
313+
* How long to wait for messages on poll.
314+
*/
315+
private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT
316+
317+
/**
318+
* Polling delay period. Mutually explusive with 'cron' and 'fixedRate'.
319+
*/
320+
private Duration fixedDelay;
321+
322+
/**
323+
* Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'.
324+
*/
325+
private Duration fixedRate;
326+
327+
/**
328+
* Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for
329+
* 'cron'.
330+
*/
331+
private Duration initialDelay;
332+
333+
/**
334+
* Cron expression for polling. Mutually explusive with 'fixedDelay' and
335+
* 'fixedRate'.
336+
*/
337+
private String cron;
338+
339+
public int getMaxMessagesPerPoll() {
340+
return this.maxMessagesPerPoll;
341+
}
342+
343+
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
344+
this.maxMessagesPerPoll = maxMessagesPerPoll;
345+
}
346+
347+
public Duration getReceiveTimeout() {
348+
return this.receiveTimeout;
349+
}
350+
351+
public void setReceiveTimeout(Duration receiveTimeout) {
352+
this.receiveTimeout = receiveTimeout;
353+
}
354+
355+
public Duration getFixedDelay() {
356+
return this.fixedDelay;
357+
}
358+
359+
public void setFixedDelay(Duration fixedDelay) {
360+
this.fixedDelay = fixedDelay;
361+
}
362+
363+
public Duration getFixedRate() {
364+
return this.fixedRate;
365+
}
366+
367+
public void setFixedRate(Duration fixedRate) {
368+
this.fixedRate = fixedRate;
369+
}
370+
371+
public Duration getInitialDelay() {
372+
return this.initialDelay;
373+
}
374+
375+
public void setInitialDelay(Duration initialDelay) {
376+
this.initialDelay = initialDelay;
377+
}
378+
379+
public String getCron() {
380+
return this.cron;
381+
}
382+
383+
public void setCron(String cron) {
384+
this.cron = cron;
385+
}
386+
387+
}
388+
298389
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616

1717
package org.springframework.boot.autoconfigure.integration;
1818

19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.TimeUnit;
22+
1923
import javax.management.MBeanServer;
2024
import javax.sql.DataSource;
2125

2226
import io.rsocket.transport.ClientTransport;
2327
import io.rsocket.transport.netty.client.TcpClientTransport;
2428
import org.junit.jupiter.api.Test;
29+
import reactor.core.publisher.Mono;
2530

2631
import org.springframework.beans.DirectFieldAccessor;
2732
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -47,6 +52,8 @@
4752
import org.springframework.core.io.ResourceLoader;
4853
import org.springframework.integration.annotation.IntegrationComponentScan;
4954
import org.springframework.integration.annotation.MessagingGateway;
55+
import org.springframework.integration.annotation.ServiceActivator;
56+
import org.springframework.integration.channel.QueueChannel;
5057
import org.springframework.integration.config.IntegrationManagementConfigurer;
5158
import org.springframework.integration.context.IntegrationContextUtils;
5259
import org.springframework.integration.endpoint.MessageProcessorMessageSource;
@@ -55,13 +62,17 @@
5562
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
5663
import org.springframework.integration.rsocket.ServerRSocketConnector;
5764
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
65+
import org.springframework.integration.scheduling.PollerMetadata;
5866
import org.springframework.integration.support.channel.HeaderChannelRegistry;
5967
import org.springframework.jdbc.BadSqlGrammarException;
6068
import org.springframework.jdbc.core.JdbcOperations;
6169
import org.springframework.jmx.export.MBeanExporter;
6270
import org.springframework.messaging.Message;
71+
import org.springframework.messaging.MessageHandler;
6372
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
73+
import org.springframework.messaging.support.GenericMessage;
6474
import org.springframework.scheduling.TaskScheduler;
75+
import org.springframework.scheduling.support.CronTrigger;
6576

6677
import static org.assertj.core.api.Assertions.assertThat;
6778
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -390,6 +401,54 @@ void whenTheUserDefinesTheirOwnDatabaseInitializerThenTheAutoConfiguredIntegrati
390401
.hasBean("customInitializer"));
391402
}
392403

404+
@Test
405+
void defaultPoller() {
406+
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
407+
assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER)
408+
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED)
409+
.hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT)
410+
.hasFieldOrPropertyWithValue("trigger", null);
411+
412+
GenericMessage<String> testMessage = new GenericMessage<>("test");
413+
context.getBean("testChannel", QueueChannel.class).send(testMessage);
414+
@SuppressWarnings("unchecked")
415+
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
416+
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
417+
});
418+
}
419+
420+
@Test
421+
void customPollerProperties() {
422+
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
423+
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
424+
"spring.integration.poller.max-messages-per-poll=1",
425+
"spring.integration.poller.receive-timeout=10s")
426+
.run((context) -> {
427+
assertThat(context).hasSingleBean(PollerMetadata.class)
428+
.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class)
429+
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L)
430+
.hasFieldOrPropertyWithValue("receiveTimeout", 10000L)
431+
.extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class)
432+
.hasFieldOrPropertyWithValue("expression", "* * * ? * *");
433+
434+
GenericMessage<String> testMessage = new GenericMessage<>("test");
435+
context.getBean("testChannel", QueueChannel.class).send(testMessage);
436+
@SuppressWarnings("unchecked")
437+
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
438+
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
439+
});
440+
}
441+
442+
@Test
443+
void triggerPropertiesAreMutuallyExclusive() {
444+
this.contextRunner
445+
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
446+
"spring.integration.poller.fixed-delay=1s")
447+
.run((context) -> assertThat(context).hasFailed().getFailure()
448+
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining(
449+
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."));
450+
}
451+
393452
@Configuration(proxyBeanMethods = false)
394453
static class CustomMBeanExporter {
395454

@@ -478,4 +537,25 @@ IntegrationDataSourceInitializer customInitializer(DataSource dataSource, Resour
478537

479538
}
480539

540+
@Configuration(proxyBeanMethods = false)
541+
static class PollingConsumerConfiguration {
542+
543+
@Bean
544+
QueueChannel testChannel() {
545+
return new QueueChannel();
546+
}
547+
548+
@Bean
549+
BlockingQueue<Message<?>> sink() {
550+
return new LinkedBlockingQueue<>();
551+
}
552+
553+
@ServiceActivator(inputChannel = "testChannel")
554+
@Bean
555+
MessageHandler handler(BlockingQueue<Message<?>> sink) {
556+
return sink::add;
557+
}
558+
559+
}
560+
481561
}

spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport
55
If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation.
66

77
Spring Integration polling logic relies <<features#features.task-execution-and-scheduling,on the auto-configured `TaskScheduler`>>.
8+
The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties.
89

910
Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules.
1011
If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.

0 commit comments

Comments
 (0)