diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcOutboundGateway.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcOutboundGateway.java index 88ce8b59ac3..699ecd5a3e9 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcOutboundGateway.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcOutboundGateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.Collections; import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Stream; import javax.sql.DataSource; @@ -36,6 +38,7 @@ * @author Gunnar Hillert * @author Artem Bilan * @author Gary Russell + * @author Jiandong Ma * * @since 2.0 */ @@ -53,6 +56,10 @@ public class JdbcOutboundGateway extends AbstractReplyProducingMessageHandler { private Integer maxRows; + private boolean queryForStream = false; + + private Consumer streamConsumer; + /** * Construct an instance based on the provided {@link DataSource} and update SQL. * @param dataSource the {@link DataSource} for execution. @@ -158,7 +165,6 @@ public void setRequestPreparedStatementSetter(MessagePreparedStatementSetter req } /** - /** * Set a {@link SqlParameterSourceFactory} for select query. * @param sqlParameterSourceFactory the {@link SqlParameterSourceFactory} to use. */ @@ -175,6 +181,26 @@ public void setRowMapper(RowMapper rowMapper) { this.poller.setRowMapper(rowMapper); } + /** + * Set whether to use {@code queryForStream} for message polling. + * @param queryForStream the {@code queryForStream} to use. + * @since 7.0.0 + */ + public void setQueryForStream(boolean queryForStream) { + this.queryForStream = queryForStream; + } + + /** + * Set stream consumer for processing {@code Stream} elements. + * This consumer is only applicable when {@code queryForStream} is set to true. + * @param streamConsumer the {@link Consumer to use}. + * @since 7.0.0 + */ + public void setStreamConsumer(Consumer streamConsumer) { + Assert.notNull(streamConsumer, "'streamConsumer' cannot be null"); + this.streamConsumer = streamConsumer; + } + @Override public String getComponentType() { return "jdbc:outbound-gateway"; @@ -186,6 +212,10 @@ protected void doInit() { Assert.notNull(this.poller, "If you want to set 'maxRows', then you must provide a 'selectQuery'."); this.poller.setMaxRows(this.maxRows); } + if (this.queryForStream) { + Assert.notNull(this.streamConsumer, "If you want to use 'queryForStream', then you must provide a 'streamConsumer'."); + this.poller.setMaxRows(0); // select all records in stream mode. + } BeanFactory beanFactory = getBeanFactory(); if (this.handler != null) { @@ -218,7 +248,15 @@ protected Object handleRequestMessage(Message requestMessage) { sqlQueryParameterSource = this.sqlParameterSourceFactory.createParameterSource(list); } } - list = this.poller.doPoll(sqlQueryParameterSource); + if (this.queryForStream) { + try (Stream stream = this.poller.doPollForStream(sqlQueryParameterSource)) { + stream.forEach(this.streamConsumer); + } + return Collections.emptyList(); + } + else { + list = this.poller.doPoll(sqlQueryParameterSource); + } } Object payload = list; if (list.size() == 1 && (this.maxRows == null || this.maxRows == 1)) { diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcPollingChannelAdapter.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcPollingChannelAdapter.java index a83618f75f3..9747ab22a1c 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcPollingChannelAdapter.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcPollingChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.function.Consumer; +import java.util.stream.Stream; import javax.sql.DataSource; @@ -49,6 +51,7 @@ * @author Jonas Partner * @author Dave Syer * @author Artem Bilan + * @author Jiandong Ma * * @since 2.0 */ @@ -194,7 +197,7 @@ public String getComponentType() { @Override protected Object doReceive() { List payload = doPoll(this.sqlQueryParameterSource); - if (payload.size() < 1) { + if (payload.isEmpty()) { payload = null; } if (payload != null && this.updateSql != null) { @@ -224,6 +227,20 @@ protected List doPoll(@Nullable SqlParameterSource sqlQueryParameterSource) { } } + /** + * Perform a select against provided {@link SqlParameterSource}. + * @param sqlQueryParameterSource the {@link SqlParameterSource} to use. Optional. + * @return the {@link Stream} of the query. + */ + protected Stream doPollForStream(@Nullable SqlParameterSource sqlQueryParameterSource) { + if (sqlQueryParameterSource != null) { + return this.jdbcOperations.queryForStream(this.selectQuery, sqlQueryParameterSource, this.rowMapper); + } + else { + return this.jdbcOperations.queryForStream(this.selectQuery, new HashMap<>(), this.rowMapper); + } + } + private void executeUpdateQuery(Object obj) { this.jdbcOperations.update(this.updateSql, this.sqlParameterSourceFactory.createParameterSource(obj)); } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcOutboundGatewayTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcOutboundGatewayTests.java index 76f80c204bb..199dbad0f8d 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcOutboundGatewayTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcOutboundGatewayTests.java @@ -16,14 +16,27 @@ package org.springframework.integration.jdbc; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import java.util.ArrayList; +import java.util.List; + +import javax.sql.DataSource; + import org.junit.jupiter.api.Test; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.support.MessageBuilder; import org.springframework.jdbc.core.JdbcOperations; -import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -34,28 +47,23 @@ * @author Gunnar Hillert * @author Gary Russell * @author Artem Bilan + * @author Jiandong Ma * * @since 2.1 * */ +@SpringJUnitConfig +@DirtiesContext public class JdbcOutboundGatewayTests { - private static EmbeddedDatabase dataSource; + @Autowired + private DataSource dataSource; - @BeforeAll - public static void setup() { - dataSource = new EmbeddedDatabaseBuilder().build(); - } - - @AfterAll - public static void teardown() { - dataSource.shutdown(); - } + @Autowired + private JdbcTemplate jdbcTemplate; @Test public void testSetMaxRowsPerPollWithoutSelectQuery() { - EmbeddedDatabase dataSource = new EmbeddedDatabaseBuilder().build(); - JdbcOutboundGateway jdbcOutboundGateway = new JdbcOutboundGateway(dataSource, "update something"); try { @@ -69,8 +77,6 @@ public void testSetMaxRowsPerPollWithoutSelectQuery() { assertThat(e.getMessage()) .isEqualTo("If you want to set 'maxRows', then you must provide a 'selectQuery'."); } - - dataSource.shutdown(); } @Test @@ -118,4 +124,54 @@ public void testSetMaxRowsPerPoll() { } } + @Test + public void testQueryForStream() { + // GIVEN + int rowCnt = 30_000; + for (int i = 0; i < rowCnt; i++) { + jdbcTemplate.update("insert into item values(%s,0)".formatted(i + 1)); + } + JdbcOutboundGateway jdbcOutboundGateway = new JdbcOutboundGateway(dataSource, null, "select * from item"); + jdbcOutboundGateway.setRowMapper((RowMapper) (rs, rowNum) -> new Item(rs.getInt(1), rs.getInt(2))); + jdbcOutboundGateway.setQueryForStream(true); + List resultList = new ArrayList<>(); + jdbcOutboundGateway.setStreamConsumer(obj -> { + Item item = (Item) obj; + resultList.add(item); + }); + QueueChannel replyChannel = new QueueChannel(); + jdbcOutboundGateway.setOutputChannel(replyChannel); + jdbcOutboundGateway.setBeanFactory(mock(BeanFactory.class)); + jdbcOutboundGateway.afterPropertiesSet(); + // WHEN + jdbcOutboundGateway.handleMessage(MessageBuilder.withPayload("foo").build()); + // THEN + assertThat(resultList).hasSize(rowCnt); + for (int i = 0; i < rowCnt; i++) { + assertThat(resultList.get(i).id).isEqualTo(i + 1); + assertThat(resultList.get(i).status).isEqualTo(0); + } + Message replyMessage = replyChannel.receive(); + List payload = (List) replyMessage.getPayload(); + assertThat(payload).isEmpty(); + } + + record Item(int id, int status) { } + + @Configuration + public static class Config { + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.HSQL) + .addScript("classpath:org/springframework/integration/jdbc/jdbcOutboundGatewayTest.sql") + .build(); + } + + @Bean + public JdbcTemplate jdbcTemplate() { + return new JdbcTemplate(dataSource()); + } + } } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/jdbcOutboundGatewayTest.sql b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/jdbcOutboundGatewayTest.sql new file mode 100644 index 00000000000..1f0246d3b8b --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/jdbcOutboundGatewayTest.sql @@ -0,0 +1 @@ +create table item(id int,status int); \ No newline at end of file