Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit 6cb57b3

Browse files
authored
Use custom paging provider for aggregate DAO queries (#5606)
This commit adds a custom paging provider that is used only by the aggregate DAO. This is required because the standard paging provider that ships with Spring Batch 4.x does not properly handle sort key aliases when using nested ROW_NUMBER clauses. * This also sneaks in Mac ARM64 support for DB2. Resolves #5531
1 parent 7bc89c6 commit 6cb57b3

File tree

8 files changed

+347
-9
lines changed

8 files changed

+347
-9
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,19 @@ You can follow the steps in the [MSSQL on Mac ARM64](https://github.com/spring-c
113113
114114
----
115115

116+
## Running Locally w/ IBM DB2
117+
By default, the Dataflow server jar does not include the DB2 database driver dependency.
118+
If you want to use DB2 for development/testing when running locally, you can specify the `local-dev-db2` Maven profile when building.
119+
The following command will include the DB2 driver dependency in the jar:
120+
```
121+
$ ./mvnw -s .settings.xml clean package -Plocal-dev-db2
122+
```
123+
You can follow the steps in the [DB2 on Mac ARM64](https://github.com/spring-cloud/spring-cloud-dataflow/wiki/DB2-on-Mac-ARM64#running-dataflow-locally-against-db2) Wiki to run DB2 locally in Docker with Dataflow pointing at it.
124+
125+
> **NOTE:** If you are not running Mac ARM64 just skip the steps related to Homebrew and Colima
126+
127+
----
128+
116129
## Contributing
117130

118131
We welcome contributions! See the [CONTRIBUTING](./CONTRIBUTING.adoc) guide for details.

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java

Lines changed: 206 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.dataflow.server.repository;
1818

19+
import java.lang.reflect.Field;
1920
import java.sql.ResultSet;
2021
import java.sql.SQLException;
2122
import java.time.Instant;
@@ -47,7 +48,12 @@
4748
import org.springframework.batch.core.repository.dao.JdbcJobExecutionDao;
4849
import org.springframework.batch.item.database.Order;
4950
import org.springframework.batch.item.database.PagingQueryProvider;
51+
import org.springframework.batch.item.database.support.AbstractSqlPagingQueryProvider;
52+
import org.springframework.batch.item.database.support.Db2PagingQueryProvider;
53+
import org.springframework.batch.item.database.support.OraclePagingQueryProvider;
5054
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
55+
import org.springframework.batch.item.database.support.SqlPagingQueryUtils;
56+
import org.springframework.batch.item.database.support.SqlServerPagingQueryProvider;
5157
import org.springframework.cloud.dataflow.core.DataFlowPropertyKeys;
5258
import org.springframework.cloud.dataflow.core.database.support.DatabaseType;
5359
import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions;
@@ -75,6 +81,7 @@
7581
import org.springframework.jdbc.core.RowMapper;
7682
import org.springframework.util.Assert;
7783
import org.springframework.util.ObjectUtils;
84+
import org.springframework.util.ReflectionUtils;
7885
import org.springframework.util.StringUtils;
7986

8087
/**
@@ -802,7 +809,7 @@ private PagingQueryProvider getPagingQueryProvider(String fields, String fromCla
802809
* @throws Exception if page provider is not created.
803810
*/
804811
private PagingQueryProvider getPagingQueryProvider(String fields, String fromClause, String whereClause, Map<String, Order> sortKeys) throws Exception {
805-
SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
812+
SqlPagingQueryProviderFactoryBean factory = new SafeSqlPagingQueryProviderFactoryBean();
806813
factory.setDataSource(dataSource);
807814
fromClause = "AGGREGATE_JOB_INSTANCE I JOIN AGGREGATE_JOB_EXECUTION E ON I.JOB_INSTANCE_ID=E.JOB_INSTANCE_ID AND I.SCHEMA_TARGET=E.SCHEMA_TARGET" + (fromClause == null ? "" : " " + fromClause);
808815
factory.setFromClause(fromClause);
@@ -811,7 +818,7 @@ private PagingQueryProvider getPagingQueryProvider(String fields, String fromCla
811818
}
812819
if (fields.contains("E.JOB_EXECUTION_ID") && this.useRowNumberOptimization) {
813820
Order order = sortKeys.get("E.JOB_EXECUTION_ID");
814-
String orderString = Optional.ofNullable(order).map(orderKey -> orderKey == Order.DESCENDING ? "DESC" : "ASC").orElse("DESC");
821+
String orderString = (order == null || order == Order.DESCENDING) ? "DESC" : "ASC";
815822
fields += ", ROW_NUMBER() OVER (PARTITION BY E.JOB_EXECUTION_ID ORDER BY E.JOB_EXECUTION_ID " + orderString + ") as RN";
816823
}
817824
factory.setSelectClause(fields);
@@ -832,4 +839,201 @@ private boolean determineSupportsRowNumberFunction(DataSource dataSource) {
832839
}
833840
return false;
834841
}
842+
843+
/**
844+
* A {@link SqlPagingQueryProviderFactoryBean} specialization that overrides the {@code Oracle, MSSQL, and DB2}
845+
* paging {@link SafeOraclePagingQueryProvider provider} with an implementation that properly handles sort aliases.
846+
* <p><b>NOTE:</b> nested within the aggregate DAO as this is the only place that needs this specialization.
847+
*/
848+
static class SafeSqlPagingQueryProviderFactoryBean extends SqlPagingQueryProviderFactoryBean {
849+
850+
private DataSource dataSource;
851+
852+
@Override
853+
public void setDataSource(DataSource dataSource) {
854+
super.setDataSource(dataSource);
855+
this.dataSource = dataSource;
856+
}
857+
858+
@Override
859+
public PagingQueryProvider getObject() throws Exception {
860+
PagingQueryProvider provider = super.getObject();
861+
if (provider instanceof OraclePagingQueryProvider) {
862+
provider = new SafeOraclePagingQueryProvider((AbstractSqlPagingQueryProvider) provider, this.dataSource);
863+
}
864+
else if (provider instanceof SqlServerPagingQueryProvider) {
865+
provider = new SafeSqlServerPagingQueryProvider((SqlServerPagingQueryProvider) provider, this.dataSource);
866+
}
867+
else if (provider instanceof Db2PagingQueryProvider) {
868+
provider = new SafeDb2PagingQueryProvider((Db2PagingQueryProvider) provider, this.dataSource);
869+
}
870+
return provider;
871+
}
872+
873+
}
874+
875+
/**
876+
* A {@link AbstractSqlPagingQueryProvider paging provider} for {@code Oracle} that works around the fact that the
877+
* Oracle provider in Spring Batch 4.x does not properly handle sort aliases when using nested {@code ROW_NUMBER}
878+
* clauses.
879+
*/
880+
static class SafeOraclePagingQueryProvider extends AbstractSqlPagingQueryProvider {
881+
882+
SafeOraclePagingQueryProvider(AbstractSqlPagingQueryProvider delegate, DataSource dataSource) {
883+
// Have to use reflection to retrieve the provider fields
884+
this.setFromClause(extractField(delegate, "fromClause", String.class));
885+
this.setWhereClause(extractField(delegate, "whereClause", String.class));
886+
this.setSortKeys(extractField(delegate, "sortKeys", Map.class));
887+
this.setSelectClause(extractField(delegate, "selectClause", String.class));
888+
this.setGroupClause(extractField(delegate, "groupClause", String.class));
889+
try {
890+
this.init(dataSource);
891+
}
892+
catch (Exception e) {
893+
throw new RuntimeException(e);
894+
}
895+
}
896+
897+
private <T> T extractField(AbstractSqlPagingQueryProvider target, String fieldName, Class<T> fieldType) {
898+
Field field = ReflectionUtils.findField(AbstractSqlPagingQueryProvider.class, fieldName, fieldType);
899+
ReflectionUtils.makeAccessible(field);
900+
return (T) ReflectionUtils.getField(field, target);
901+
}
902+
903+
@Override
904+
public String generateFirstPageQuery(int pageSize) {
905+
return generateRowNumSqlQuery(false, pageSize);
906+
}
907+
908+
@Override
909+
public String generateRemainingPagesQuery(int pageSize) {
910+
return generateRowNumSqlQuery(true, pageSize);
911+
}
912+
913+
@Override
914+
public String generateJumpToItemQuery(int itemIndex, int pageSize) {
915+
int page = itemIndex / pageSize;
916+
int offset = (page * pageSize);
917+
offset = (offset == 0) ? 1 : offset;
918+
String sortKeyInnerSelect = this.getSortKeySelect(true);
919+
String sortKeyOuterSelect = this.getSortKeySelect(false);
920+
return SqlPagingQueryUtils.generateRowNumSqlQueryWithNesting(this, sortKeyInnerSelect, sortKeyOuterSelect,
921+
false, "TMP_ROW_NUM = " + offset);
922+
}
923+
924+
private String getSortKeySelect(boolean withAliases) {
925+
StringBuilder sql = new StringBuilder();
926+
Map<String, Order> sortKeys = (withAliases) ? this.getSortKeys() : this.getSortKeysWithoutAliases();
927+
sql.append(sortKeys.keySet().stream().collect(Collectors.joining(",")));
928+
return sql.toString();
929+
}
930+
931+
// Taken from SqlPagingQueryUtils.generateRowNumSqlQuery but use sortKeysWithoutAlias
932+
// for outer sort condition.
933+
private String generateRowNumSqlQuery(boolean remainingPageQuery, int pageSize) {
934+
StringBuilder sql = new StringBuilder();
935+
sql.append("SELECT * FROM (SELECT ").append(getSelectClause());
936+
sql.append(" FROM ").append(this.getFromClause());
937+
if (StringUtils.hasText(this.getWhereClause())) {
938+
sql.append(" WHERE ").append(this.getWhereClause());
939+
}
940+
if (StringUtils.hasText(this.getGroupClause())) {
941+
sql.append(" GROUP BY ").append(this.getGroupClause());
942+
}
943+
// inner sort by
944+
sql.append(" ORDER BY ").append(SqlPagingQueryUtils.buildSortClause(this));
945+
sql.append(") WHERE ").append("ROWNUM <= " + pageSize);
946+
if (remainingPageQuery) {
947+
sql.append(" AND ");
948+
// For the outer sort we want to use sort keys w/o aliases. However,
949+
// SqlPagingQueryUtils.buildSortConditions does not allow sort keys to be passed in.
950+
// Therefore, we temporarily set the 'sortKeys' for the call to 'buildSortConditions'.
951+
// The alternative is to clone the 'buildSortConditions' method here and allow the sort keys to be
952+
// passed in BUT method is gigantic and this approach is the lesser of the two evils.
953+
Map<String, Order> originalSortKeys = this.getSortKeys();
954+
this.setSortKeys(this.getSortKeysWithoutAliases());
955+
try {
956+
SqlPagingQueryUtils.buildSortConditions(this, sql);
957+
}
958+
finally {
959+
this.setSortKeys(originalSortKeys);
960+
}
961+
}
962+
return sql.toString();
963+
}
964+
}
965+
966+
/**
967+
* A {@link SqlServerPagingQueryProvider paging provider} for {@code MSSQL} that works around the fact that the
968+
* MSSQL provider in Spring Batch 4.x does not properly handle sort aliases when generating jump to page queries.
969+
*/
970+
static class SafeSqlServerPagingQueryProvider extends SqlServerPagingQueryProvider {
971+
972+
SafeSqlServerPagingQueryProvider(SqlServerPagingQueryProvider delegate, DataSource dataSource) {
973+
// Have to use reflection to retrieve the provider fields
974+
this.setFromClause(extractField(delegate, "fromClause", String.class));
975+
this.setWhereClause(extractField(delegate, "whereClause", String.class));
976+
this.setSortKeys(extractField(delegate, "sortKeys", Map.class));
977+
this.setSelectClause(extractField(delegate, "selectClause", String.class));
978+
this.setGroupClause(extractField(delegate, "groupClause", String.class));
979+
try {
980+
this.init(dataSource);
981+
}
982+
catch (Exception e) {
983+
throw new RuntimeException(e);
984+
}
985+
}
986+
987+
private <T> T extractField(AbstractSqlPagingQueryProvider target, String fieldName, Class<T> fieldType) {
988+
Field field = ReflectionUtils.findField(AbstractSqlPagingQueryProvider.class, fieldName, fieldType);
989+
ReflectionUtils.makeAccessible(field);
990+
return (T) ReflectionUtils.getField(field, target);
991+
}
992+
993+
@Override
994+
protected String getOverClause() {
995+
// Overrides the parent impl to use 'getSortKeys' instead of 'getSortKeysWithoutAliases'
996+
StringBuilder sql = new StringBuilder();
997+
sql.append(" ORDER BY ").append(SqlPagingQueryUtils.buildSortClause(this.getSortKeys()));
998+
return sql.toString();
999+
}
1000+
1001+
}
1002+
1003+
/**
1004+
* A {@link Db2PagingQueryProvider paging provider} for {@code DB2} that works around the fact that the
1005+
* DB2 provider in Spring Batch 4.x does not properly handle sort aliases when generating jump to page queries.
1006+
*/
1007+
static class SafeDb2PagingQueryProvider extends Db2PagingQueryProvider {
1008+
1009+
SafeDb2PagingQueryProvider(Db2PagingQueryProvider delegate, DataSource dataSource) {
1010+
// Have to use reflection to retrieve the provider fields
1011+
this.setFromClause(extractField(delegate, "fromClause", String.class));
1012+
this.setWhereClause(extractField(delegate, "whereClause", String.class));
1013+
this.setSortKeys(extractField(delegate, "sortKeys", Map.class));
1014+
this.setSelectClause(extractField(delegate, "selectClause", String.class));
1015+
this.setGroupClause(extractField(delegate, "groupClause", String.class));
1016+
try {
1017+
this.init(dataSource);
1018+
}
1019+
catch (Exception e) {
1020+
throw new RuntimeException(e);
1021+
}
1022+
}
1023+
1024+
private <T> T extractField(AbstractSqlPagingQueryProvider target, String fieldName, Class<T> fieldType) {
1025+
Field field = ReflectionUtils.findField(AbstractSqlPagingQueryProvider.class, fieldName, fieldType);
1026+
ReflectionUtils.makeAccessible(field);
1027+
return (T) ReflectionUtils.getField(field, target);
1028+
}
1029+
1030+
@Override
1031+
protected String getOverClause() {
1032+
// Overrides the parent impl to use 'getSortKeys' instead of 'getSortKeysWithoutAliases'
1033+
StringBuilder sql = new StringBuilder();
1034+
sql.append(" ORDER BY ").append(SqlPagingQueryUtils.buildSortClause(this.getSortKeys()));
1035+
return sql.toString();
1036+
}
1037+
1038+
}
8351039
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/support/SqlPagingQueryProviderFactoryBean.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class SqlPagingQueryProviderFactoryBean implements FactoryBean<PagingQuer
5252
private final static Map<DatabaseType, AbstractSqlPagingQueryProvider> providers;
5353

5454
static {
55-
Map<DatabaseType, AbstractSqlPagingQueryProvider> providerMap = new HashMap<DatabaseType, AbstractSqlPagingQueryProvider>();
55+
Map<DatabaseType, AbstractSqlPagingQueryProvider> providerMap = new HashMap<DatabaseType, AbstractSqlPagingQueryProvider>();
5656
providerMap.put(DatabaseType.HSQL, new HsqlPagingQueryProvider());
5757
providerMap.put(DatabaseType.H2, new H2PagingQueryProvider());
5858
providerMap.put(DatabaseType.MYSQL, new MySqlPagingQueryProvider());

spring-cloud-dataflow-server/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,5 +408,15 @@
408408
</dependency>
409409
</dependencies>
410410
</profile>
411+
<profile>
412+
<id>local-dev-db2</id>
413+
<dependencies>
414+
<dependency>
415+
<groupId>com.ibm.db2</groupId>
416+
<artifactId>jcc</artifactId>
417+
<version>11.5.8.0</version>
418+
</dependency>
419+
</dependencies>
420+
</profile>
411421
</profiles>
412422
</project>

spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/server/db/migration/AbstractSmokeTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,21 @@ void shouldListJobExecutionsUsingPerformantRowNumberQuery(
144144
createdExecutionIdsBySchemaTarget.add(schemaVersionTarget, execution1.getExecutionId());
145145
TaskExecution execution2 = testUtils.createSampleJob("job2", 3, BatchStatus.COMPLETED, new JobParameters(), schemaVersionTarget);
146146
createdExecutionIdsBySchemaTarget.add(schemaVersionTarget, execution2.getExecutionId());
147+
148+
// Get all executions and ensure the count and that the row number function was (or not) used
147149
jobExecutions = taskJobService.listJobExecutionsWithStepCount(Pageable.ofSize(100));
148150
assertThat(jobExecutions).hasSize(originalCount + 4);
149151
String expectedSqlFragment = (this.supportsRowNumberFunction()) ?
150152
"as STEP_COUNT, ROW_NUMBER() OVER (PARTITION" :
151153
"as STEP_COUNT FROM AGGREGATE_JOB_INSTANCE";
152154
Awaitility.waitAtMost(Duration.ofSeconds(5))
153155
.untilAsserted(() -> assertThat(output).contains(expectedSqlFragment));
156+
157+
// Verify that paging works as well
158+
jobExecutions = taskJobService.listJobExecutionsWithStepCount(Pageable.ofSize(2).withPage(0));
159+
assertThat(jobExecutions).hasSize(2);
160+
jobExecutions = taskJobService.listJobExecutionsWithStepCount(Pageable.ofSize(2).withPage(1));
161+
assertThat(jobExecutions).hasSize(2);
154162
}
155163

156164
static Stream<SchemaVersionTarget> schemaVersionTargetsProvider() {

spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/server/db/migration/JobExecutionTestUtils.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,22 @@
2424

2525
import javax.sql.DataSource;
2626

27+
import com.zaxxer.hikari.HikariDataSource;
28+
import org.junit.jupiter.api.Disabled;
29+
import org.junit.jupiter.api.Test;
30+
2731
import org.springframework.batch.core.BatchStatus;
2832
import org.springframework.batch.core.JobExecution;
2933
import org.springframework.batch.core.JobInstance;
3034
import org.springframework.batch.core.JobParameters;
3135
import org.springframework.batch.core.repository.dao.JdbcJobInstanceDao;
3236
import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory;
37+
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
3338
import org.springframework.cloud.dataflow.core.database.support.DatabaseType;
3439
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory;
3540
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
41+
import org.springframework.cloud.dataflow.schema.service.SchemaService;
42+
import org.springframework.cloud.dataflow.schema.service.impl.DefaultSchemaService;
3643
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
3744
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
3845
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
@@ -130,4 +137,37 @@ private JobExecution saveJobExecution(JobExecution jobExecution, JdbcTemplate jd
130137
private Timestamp timestampFromDate(Date date) {
131138
return (date != null) ? Timestamp.valueOf(date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime()) : null;
132139
}
140+
141+
142+
/**
143+
* Test utility that generates hundreds of job executions which can be useful when debugging paging issues.
144+
* <p>To run, adjust the datasource properties accordingly and then execute the test manually in your editor.
145+
*/
146+
@Disabled
147+
static class JobExecutionTestDataGenerator {
148+
149+
@Test
150+
void generateJobExecutions() {
151+
// Adjust these properties as necessary to point to your env
152+
DataSourceProperties dataSourceProperties = new DataSourceProperties();
153+
dataSourceProperties.setUrl("jdbc:oracle:thin:@localhost:1521/dataflow");
154+
dataSourceProperties.setUsername("spring");
155+
dataSourceProperties.setPassword("spring");
156+
dataSourceProperties.setDriverClassName("oracle.jdbc.OracleDriver");
157+
158+
DataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
159+
SchemaService schemaService = new DefaultSchemaService();
160+
TaskExecutionDaoContainer taskExecutionDaoContainer = new TaskExecutionDaoContainer(dataSource, schemaService);
161+
TaskBatchDaoContainer taskBatchDaoContainer = new TaskBatchDaoContainer(dataSource, schemaService);
162+
JobExecutionTestUtils generator = new JobExecutionTestUtils(taskExecutionDaoContainer, taskBatchDaoContainer);
163+
generator.createSampleJob(jobName("boot2"), 200, BatchStatus.COMPLETED, new JobParameters(),
164+
schemaService.getTarget("boot2"));
165+
generator.createSampleJob(jobName("boot3"), 200, BatchStatus.COMPLETED, new JobParameters(),
166+
schemaService.getTarget("boot3"));
167+
}
168+
169+
private String jobName(String schemaTarget) {
170+
return schemaTarget + "-job-" + System.currentTimeMillis();
171+
}
172+
}
133173
}

0 commit comments

Comments
 (0)