Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

import javax.sql.DataSource;

Expand All @@ -36,6 +38,7 @@
* @author Gunnar Hillert
* @author Artem Bilan
* @author Gary Russell
* @author Jiandong Ma
*
* @since 2.0
*/
Expand All @@ -53,6 +56,10 @@ public class JdbcOutboundGateway extends AbstractReplyProducingMessageHandler {

private Integer maxRows;

private boolean queryForStream = false;

private Consumer<Object> streamConsumer;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here given Object type for Consumer, user have to do the type cast , which I am not very satisfied with this, because when set RowMapper, they already give return type.
I don't have a good way. I tried to put ?, but looks it can not be applied to Stream<?> .


/**
* Construct an instance based on the provided {@link DataSource} and update SQL.
* @param dataSource the {@link DataSource} for execution.
Expand Down Expand Up @@ -158,7 +165,6 @@ public void setRequestPreparedStatementSetter(MessagePreparedStatementSetter req
}

/**
/**
* Set a {@link SqlParameterSourceFactory} for select query.
* @param sqlParameterSourceFactory the {@link SqlParameterSourceFactory} to use.
*/
Expand All @@ -175,6 +181,26 @@ public void setRowMapper(RowMapper<?> rowMapper) {
this.poller.setRowMapper(rowMapper);
}

/**
* Set whether to use {@code queryForStream} for message polling.
* @param queryForStream the {@code queryForStream} to use.
* @since 7.0.0
*/
public void setQueryForStream(boolean queryForStream) {
this.queryForStream = queryForStream;
}

/**
* Set stream consumer for processing {@code Stream} elements.
* This consumer is only applicable when {@code queryForStream} is set to true.
* @param streamConsumer the {@link Consumer to use}.
* @since 7.0.0
*/
public void setStreamConsumer(Consumer<Object> streamConsumer) {
Assert.notNull(streamConsumer, "'streamConsumer' cannot be null");
this.streamConsumer = streamConsumer;
}

@Override
public String getComponentType() {
return "jdbc:outbound-gateway";
Expand All @@ -186,6 +212,10 @@ protected void doInit() {
Assert.notNull(this.poller, "If you want to set 'maxRows', then you must provide a 'selectQuery'.");
this.poller.setMaxRows(this.maxRows);
}
if (this.queryForStream) {
Assert.notNull(this.streamConsumer, "If you want to use 'queryForStream', then you must provide a 'streamConsumer'.");
this.poller.setMaxRows(0); // select all records in stream mode.
}

BeanFactory beanFactory = getBeanFactory();
if (this.handler != null) {
Expand Down Expand Up @@ -218,7 +248,15 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
sqlQueryParameterSource = this.sqlParameterSourceFactory.createParameterSource(list);
}
}
list = this.poller.doPoll(sqlQueryParameterSource);
if (this.queryForStream) {
try (Stream<?> stream = this.poller.doPollForStream(sqlQueryParameterSource)) {
stream.forEach(this.streamConsumer);
}
return Collections.emptyList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the effort, but this is not correct.
This is not a messaging way and this is not a goal of Spring Integration channel adapter.
If we go the of callback instead of sending a message to the channel, then there is no reason in using channel adapters.
Simple JdbcTemplate with its streaming API would do exactly the same without extra messaging layer to introduce for nothing.

Having a Stream as a message payload to produce from the channel adapter it totally fine.
However still all those questions are opened: the Stream has to be closed, not clear how to update entries of this stream to satisfy concurrent polling from the table. At the same time how do transactions behave with these opened (and possible updated) streams.

I general I suggest to close this PR since we are not going the way you suggest.
Sorry about that and thank you again for your effort!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, make sense, thank you for making me think in messaging way, I was wrong from the beginning.

}
else {
list = this.poller.doPoll(sqlQueryParameterSource);
}
}
Object payload = list;
if (list.size() == 1 && (this.maxRows == null || this.maxRows == 1)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,10 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

import javax.sql.DataSource;

Expand Down Expand Up @@ -49,6 +51,7 @@
* @author Jonas Partner
* @author Dave Syer
* @author Artem Bilan
* @author Jiandong Ma
*
* @since 2.0
*/
Expand Down Expand Up @@ -194,7 +197,7 @@ public String getComponentType() {
@Override
protected Object doReceive() {
List<?> payload = doPoll(this.sqlQueryParameterSource);
if (payload.size() < 1) {
if (payload.isEmpty()) {
payload = null;
}
if (payload != null && this.updateSql != null) {
Expand Down Expand Up @@ -224,6 +227,20 @@ protected List<?> doPoll(@Nullable SqlParameterSource sqlQueryParameterSource) {
}
}

/**
* Perform a select against provided {@link SqlParameterSource}.
* @param sqlQueryParameterSource the {@link SqlParameterSource} to use. Optional.
* @return the {@link Stream} of the query.
*/
protected Stream<?> doPollForStream(@Nullable SqlParameterSource sqlQueryParameterSource) {
if (sqlQueryParameterSource != null) {
return this.jdbcOperations.queryForStream(this.selectQuery, sqlQueryParameterSource, this.rowMapper);
}
else {
return this.jdbcOperations.queryForStream(this.selectQuery, new HashMap<>(), this.rowMapper);
}
}

private void executeUpdateQuery(Object obj) {
this.jdbcOperations.update(this.updateSql, this.sqlParameterSourceFactory.createParameterSource(obj));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,27 @@

package org.springframework.integration.jdbc;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import java.util.ArrayList;
import java.util.List;

import javax.sql.DataSource;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
Expand All @@ -34,28 +47,23 @@
* @author Gunnar Hillert
* @author Gary Russell
* @author Artem Bilan
* @author Jiandong Ma
*
* @since 2.1
*
*/
@SpringJUnitConfig
@DirtiesContext
public class JdbcOutboundGatewayTests {

private static EmbeddedDatabase dataSource;
@Autowired
private DataSource dataSource;

@BeforeAll
public static void setup() {
dataSource = new EmbeddedDatabaseBuilder().build();
}

@AfterAll
public static void teardown() {
dataSource.shutdown();
}
@Autowired
private JdbcTemplate jdbcTemplate;

@Test
public void testSetMaxRowsPerPollWithoutSelectQuery() {
EmbeddedDatabase dataSource = new EmbeddedDatabaseBuilder().build();

JdbcOutboundGateway jdbcOutboundGateway = new JdbcOutboundGateway(dataSource, "update something");

try {
Expand All @@ -69,8 +77,6 @@ public void testSetMaxRowsPerPollWithoutSelectQuery() {
assertThat(e.getMessage())
.isEqualTo("If you want to set 'maxRows', then you must provide a 'selectQuery'.");
}

dataSource.shutdown();
}

@Test
Expand Down Expand Up @@ -118,4 +124,54 @@ public void testSetMaxRowsPerPoll() {
}
}

@Test
public void testQueryForStream() {
// GIVEN
int rowCnt = 30_000;
for (int i = 0; i < rowCnt; i++) {
jdbcTemplate.update("insert into item values(%s,0)".formatted(i + 1));
}
JdbcOutboundGateway jdbcOutboundGateway = new JdbcOutboundGateway(dataSource, null, "select * from item");
jdbcOutboundGateway.setRowMapper((RowMapper<Item>) (rs, rowNum) -> new Item(rs.getInt(1), rs.getInt(2)));
jdbcOutboundGateway.setQueryForStream(true);
List<Item> resultList = new ArrayList<>();
jdbcOutboundGateway.setStreamConsumer(obj -> {
Item item = (Item) obj;
resultList.add(item);
});
QueueChannel replyChannel = new QueueChannel();
jdbcOutboundGateway.setOutputChannel(replyChannel);
jdbcOutboundGateway.setBeanFactory(mock(BeanFactory.class));
jdbcOutboundGateway.afterPropertiesSet();
// WHEN
jdbcOutboundGateway.handleMessage(MessageBuilder.withPayload("foo").build());
// THEN
assertThat(resultList).hasSize(rowCnt);
for (int i = 0; i < rowCnt; i++) {
assertThat(resultList.get(i).id).isEqualTo(i + 1);
assertThat(resultList.get(i).status).isEqualTo(0);
}
Message<?> replyMessage = replyChannel.receive();
List<?> payload = (List<?>) replyMessage.getPayload();
assertThat(payload).isEmpty();
}

record Item(int id, int status) { }

@Configuration
public static class Config {

@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("classpath:org/springframework/integration/jdbc/jdbcOutboundGatewayTest.sql")
.build();
}

@Bean
public JdbcTemplate jdbcTemplate() {
return new JdbcTemplate(dataSource());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table item(id int,status int);