diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java index 846bfe094a..4b8c016d66 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java @@ -32,6 +32,7 @@ import org.springframework.core.convert.support.ConfigurableConversionService; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.jdbc.core.simple.JdbcClient; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -41,6 +42,7 @@ * * @author Robert Kasanicky * @author Mahmoud Ben Hassine + * @author Yanming Zhou */ public abstract class AbstractJdbcBatchMetadataDao implements InitializingBean { @@ -61,6 +63,8 @@ public abstract class AbstractJdbcBatchMetadataDao implements InitializingBean { private @Nullable ConfigurableConversionService conversionService; + private @Nullable JdbcClient jdbcClient; + protected String getQuery(String base) { return StringUtils.replace(base, "%PREFIX%", tablePrefix); } @@ -80,12 +84,17 @@ public void setTablePrefix(String tablePrefix) { public void setJdbcTemplate(JdbcOperations jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; + this.jdbcClient = JdbcClient.create(jdbcTemplate); } @Nullable protected JdbcOperations getJdbcTemplate() { return jdbcTemplate; } + @Nullable protected JdbcClient getJdbcClient() { + return jdbcClient; + } + public int getClobTypeToUse() { return clobTypeToUse; } @@ -110,6 +119,7 @@ public void setConversionService(ConfigurableConversionService conversionService @Override public void afterPropertiesSet() throws Exception { Assert.state(jdbcTemplate != null, "JdbcOperations is required"); + Assert.state(jdbcClient != null, "JdbcClient is required"); if (this.conversionService == null) { DefaultConversionService conversionService = new DefaultConversionService(); conversionService.addConverter(new DateToStringConverter()); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java index 1feaf76ef7..d5a9d8e9fb 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java @@ -24,10 +24,10 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.locks.Lock; @@ -64,7 +64,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem private static final String FIND_JOB_EXECUTION_CONTEXT = """ SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT FROM %PREFIX%JOB_EXECUTION_CONTEXT - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId """; private static final String INSERT_JOB_EXECUTION_CONTEXT = """ @@ -81,7 +81,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem private static final String FIND_STEP_EXECUTION_CONTEXT = """ SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT FROM %PREFIX%STEP_EXECUTION_CONTEXT - WHERE STEP_EXECUTION_ID = ? + WHERE STEP_EXECUTION_ID = :stepExecutionId """; private static final String INSERT_STEP_EXECUTION_CONTEXT = """ @@ -97,12 +97,12 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem private static final String DELETE_STEP_EXECUTION_CONTEXT = """ DELETE FROM %PREFIX%STEP_EXECUTION_CONTEXT - WHERE STEP_EXECUTION_ID = ? + WHERE STEP_EXECUTION_ID = :stepExecutionId """; private static final String DELETE_JOB_EXECUTION_CONTEXT = """ DELETE FROM %PREFIX%JOB_EXECUTION_CONTEXT - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId """; private Charset charset = StandardCharsets.UTF_8; @@ -153,14 +153,12 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) { Long executionId = jobExecution.getId(); Assert.notNull(executionId, "ExecutionId must not be null."); - List results = getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTION_CONTEXT), - new ExecutionContextRowMapper(), executionId); - if (!results.isEmpty()) { - return results.get(0); - } - else { - return new ExecutionContext(); - } + return getJdbcClient().sql(getQuery(FIND_JOB_EXECUTION_CONTEXT)) + .param("jobExecutionId", executionId) + .query(new ExecutionContextRowMapper()) + .optional() + .orElseGet(ExecutionContext::new); + } @Override @@ -168,14 +166,11 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) { Long executionId = stepExecution.getId(); Assert.notNull(executionId, "ExecutionId must not be null."); - List results = getJdbcTemplate().query(getQuery(FIND_STEP_EXECUTION_CONTEXT), - new ExecutionContextRowMapper(), executionId); - if (results.size() > 0) { - return results.get(0); - } - else { - return new ExecutionContext(); - } + return getJdbcClient().sql(getQuery(FIND_STEP_EXECUTION_CONTEXT)) + .param("stepExecutionId", executionId) + .query(new ExecutionContextRowMapper()) + .optional() + .orElseGet(ExecutionContext::new); } @Override @@ -255,7 +250,9 @@ public void saveExecutionContexts(Collection stepExecutions) { */ @Override public void deleteExecutionContext(JobExecution jobExecution) { - getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION_CONTEXT), jobExecution.getId()); + getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION_CONTEXT)) + .param("jobExecutionId", jobExecution.getId()) + .update(); } /** @@ -264,7 +261,9 @@ public void deleteExecutionContext(JobExecution jobExecution) { */ @Override public void deleteExecutionContext(StepExecution stepExecution) { - getJdbcTemplate().update(getQuery(DELETE_STEP_EXECUTION_CONTEXT), stepExecution.getId()); + getJdbcClient().sql(getQuery(DELETE_STEP_EXECUTION_CONTEXT)) + .param("stepExecutionId", stepExecution.getId()) + .update(); } @Override @@ -293,16 +292,13 @@ private void persistSerializedContext(Long executionId, String serializedContext longContext = null; } - getJdbcTemplate().update(getQuery(sql), ps -> { - ps.setString(1, shortContext); - if (longContext != null) { - ps.setString(2, longContext); - } - else { - ps.setNull(2, getClobTypeToUse()); - } - ps.setLong(3, executionId); - }); + getJdbcClient().sql(getQuery(sql)) + // @formatter:off + .param(1, shortContext, Types.VARCHAR) + .param(2, longContext, getClobTypeToUse()) + .param(3, executionId, Types.BIGINT) + // @formatter:on + .update(); } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java index 1bfb5a3027..e68120c56b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java @@ -75,13 +75,13 @@ public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements private static final String CHECK_JOB_EXECUTION_EXISTS = """ SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId """; private static final String GET_STATUS = """ SELECT STATUS FROM %PREFIX%JOB_EXECUTION - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId """; private static final String UPDATE_JOB_EXECUTION = """ @@ -98,27 +98,27 @@ SELECT COUNT(*) private static final String GET_LAST_JOB_EXECUTION_ID = """ SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION - WHERE JOB_INSTANCE_ID = ? AND JOB_EXECUTION_ID IN (SELECT MAX(JOB_EXECUTION_ID) FROM %PREFIX%JOB_EXECUTION E2 WHERE E2.JOB_INSTANCE_ID = ?) + WHERE JOB_INSTANCE_ID = :jobInstanceId AND JOB_EXECUTION_ID IN (SELECT MAX(JOB_EXECUTION_ID) FROM %PREFIX%JOB_EXECUTION E2 WHERE E2.JOB_INSTANCE_ID = :jobInstanceId) """; - private static final String GET_EXECUTION_BY_ID = GET_JOB_EXECUTIONS + " WHERE JOB_EXECUTION_ID = ?"; + private static final String GET_EXECUTION_BY_ID = GET_JOB_EXECUTIONS + " WHERE JOB_EXECUTION_ID = :jobExecutionId"; private static final String GET_RUNNING_EXECUTION_FOR_INSTANCE = """ SELECT E.JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I - WHERE E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID AND I.JOB_INSTANCE_ID=? AND E.STATUS IN ('STARTING', 'STARTED', 'STOPPING') + WHERE E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID AND I.JOB_INSTANCE_ID=:jobInstanceId AND E.STATUS IN ('STARTING', 'STARTED', 'STOPPING') """; private static final String CURRENT_VERSION_JOB_EXECUTION = """ SELECT VERSION FROM %PREFIX%JOB_EXECUTION - WHERE JOB_EXECUTION_ID=? + WHERE JOB_EXECUTION_ID= :jobExecutionId """; private static final String FIND_PARAMS_FROM_ID = """ SELECT JOB_EXECUTION_ID, PARAMETER_NAME, PARAMETER_TYPE, PARAMETER_VALUE, IDENTIFYING FROM %PREFIX%JOB_EXECUTION_PARAMS - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId """; private static final String CREATE_JOB_PARAMETERS = """ @@ -128,22 +128,22 @@ SELECT COUNT(*) private static final String DELETE_JOB_EXECUTION = """ DELETE FROM %PREFIX%JOB_EXECUTION - WHERE JOB_EXECUTION_ID = ? AND VERSION = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId AND VERSION = :version """; private static final String DELETE_JOB_EXECUTION_PARAMETERS = """ DELETE FROM %PREFIX%JOB_EXECUTION_PARAMS - WHERE JOB_EXECUTION_ID = ? + WHERE JOB_EXECUTION_ID = :jobExecutionId """; private static final String GET_JOB_INSTANCE_ID_FROM_JOB_EXECUTION_ID = """ SELECT JI.JOB_INSTANCE_ID FROM %PREFIX%JOB_INSTANCE JI, %PREFIX%JOB_EXECUTION JE - WHERE JOB_EXECUTION_ID = ? AND JI.JOB_INSTANCE_ID = JE.JOB_INSTANCE_ID + WHERE JOB_EXECUTION_ID = :jobExecutionId AND JI.JOB_INSTANCE_ID = JE.JOB_INSTANCE_ID """; private static final String GET_JOB_EXECUTION_IDS_BY_INSTANCE_ID = """ - SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE JOB_INSTANCE_ID = ? + SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE JOB_INSTANCE_ID = :jobInstanceId """; JdbcJobInstanceDao jobInstanceDao; @@ -202,9 +202,20 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jo Object[] parameters = new Object[] { jobExecution.getId(), jobInstance.getId(), startTime, endTime, jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(), jobExecution.getVersion(), createTime, lastUpdated }; - getJdbcTemplate().update(getQuery(SAVE_JOB_EXECUTION), parameters, - new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP }); + getJdbcClient().sql(getQuery(SAVE_JOB_EXECUTION)) + // @formatter:off + .param(1, jobExecution.getId(), Types.BIGINT) + .param(2, jobExecution.getJobInstanceId(), Types.BIGINT) + .param(3, startTime, Types.TIMESTAMP) + .param(4, endTime, Types.TIMESTAMP) + .param(5, jobExecution.getStatus().toString(), Types.VARCHAR) + .param(6, jobExecution.getExitStatus().getExitCode(), Types.VARCHAR) + .param(7, jobExecution.getExitStatus().getExitDescription(), Types.VARCHAR) + .param(8, jobExecution.getVersion(), Types.INTEGER) + .param(9, createTime, Types.TIMESTAMP) + .param(10, lastUpdated, Types.TIMESTAMP) + // @formatter:on + .update(); insertJobParameters(jobExecution.getId(), jobExecution.getJobParameters()); @@ -217,8 +228,10 @@ public List findJobExecutions(final JobInstance jobInstance) { Assert.notNull(jobInstance, "Job instance cannot be null."); long jobInstanceId = jobInstance.getId(); // TODO optimize to a single query with a join if possible - List jobExecutionIds = getJdbcTemplate().queryForList(getQuery(GET_JOB_EXECUTION_IDS_BY_INSTANCE_ID), - Long.class, jobInstanceId); + List jobExecutionIds = getJdbcClient().sql(getQuery(GET_JOB_EXECUTION_IDS_BY_INSTANCE_ID)) + .param("jobInstanceId", jobInstanceId) + .query(Long.class) + .list(); List jobExecutions = new ArrayList<>(jobExecutionIds.size()); for (Long jobExecutionId : jobExecutionIds) { jobExecutions.add(getJobExecution(jobExecutionId)); @@ -272,9 +285,6 @@ public void updateJobExecution(JobExecution jobExecution) { : Timestamp.valueOf(jobExecution.getCreateTime()); Timestamp lastUpdated = jobExecution.getLastUpdated() == null ? null : Timestamp.valueOf(jobExecution.getLastUpdated()); - Object[] parameters = new Object[] { startTime, endTime, jobExecution.getStatus().toString(), - jobExecution.getExitStatus().getExitCode(), exitDescription, createTime, lastUpdated, - jobExecution.getId(), jobExecution.getVersion() }; // TODO review this check, it's too late to check for the existence of the job // execution here @@ -282,19 +292,33 @@ public void updateJobExecution(JobExecution jobExecution) { // it // is invalid and // an exception should be thrown. - if (getJdbcTemplate().queryForObject(getQuery(CHECK_JOB_EXECUTION_EXISTS), Integer.class, - new Object[] { jobExecution.getId() }) != 1) { + if (getJdbcClient().sql(getQuery(CHECK_JOB_EXECUTION_EXISTS)) + .param("jobExecutionId", jobExecution.getId()) + .query(Integer.class) + .single() != 1) { throw new RuntimeException("Invalid JobExecution, ID " + jobExecution.getId() + " not found."); } - int count = getJdbcTemplate().update(getQuery(UPDATE_JOB_EXECUTION), parameters, - new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER }); + int count = getJdbcClient().sql(getQuery(UPDATE_JOB_EXECUTION)) + // @formatter:off + .param(1, startTime, Types.TIMESTAMP) + .param(2, endTime, Types.TIMESTAMP) + .param(3, jobExecution.getStatus().toString(), Types.VARCHAR) + .param(4, jobExecution.getExitStatus().getExitCode(), Types.VARCHAR) + .param(5, exitDescription, Types.VARCHAR) + .param(6, createTime, Types.TIMESTAMP) + .param(7, lastUpdated, Types.TIMESTAMP) + .param(8, jobExecution.getId(), Types.BIGINT) + .param(9, jobExecution.getVersion(), Types.INTEGER) + // @formatter:on + .update(); // Avoid concurrent modifications... if (count == 0) { - int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), - Integer.class, new Object[] { jobExecution.getId() }); + int currentVersion = getJdbcClient().sql(getQuery(CURRENT_VERSION_JOB_EXECUTION)) + .param("jobExecutionId", jobExecution.getId()) + .query(Integer.class) + .single(); throw new OptimisticLockingFailureException( "Attempt to update job execution id=" + jobExecution.getId() + " with wrong version (" + jobExecution.getVersion() + "), where current version is " + currentVersion); @@ -312,10 +336,12 @@ public void updateJobExecution(JobExecution jobExecution) { public JobExecution getLastJobExecution(JobInstance jobInstance) { long jobInstanceId = jobInstance.getId(); - Long lastJobExecutionId = getJdbcTemplate().queryForObject(getQuery(GET_LAST_JOB_EXECUTION_ID), Long.class, - jobInstanceId, jobInstanceId); - - return lastJobExecutionId != null ? getJobExecution(lastJobExecutionId) : null; + return getJdbcClient().sql(getQuery(GET_LAST_JOB_EXECUTION_ID)) + .param("jobInstanceId", jobInstanceId) + .query(Long.class) + .optional() + .map(this::getJobExecution) + .orElse(null); } @Override @@ -323,18 +349,18 @@ public JobExecution getJobExecution(long jobExecutionId) { long jobInstanceId = getJobInstanceId(jobExecutionId); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); JobParameters jobParameters = getJobParameters(jobExecutionId); - try { - return getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID), - new JobExecutionRowMapper(jobInstance, jobParameters), jobExecutionId); - } - catch (EmptyResultDataAccessException e) { - return null; - } + return getJdbcClient().sql(getQuery(GET_EXECUTION_BY_ID)) + .param("jobExecutionId", jobExecutionId) + .query(new JobExecutionRowMapper(jobInstance, jobParameters)) + .optional() + .orElse(null); } private long getJobInstanceId(long jobExecutionId) { - return getJdbcTemplate().queryForObject(getQuery(GET_JOB_INSTANCE_ID_FROM_JOB_EXECUTION_ID), Long.class, - jobExecutionId); + return getJdbcClient().sql(getQuery(GET_JOB_INSTANCE_ID_FROM_JOB_EXECUTION_ID)) + .param("jobExecutionId", jobExecutionId) + .query(Long.class) + .single(); } @Override @@ -342,8 +368,10 @@ public Set findRunningJobExecutions(String jobName) { final Set result = new HashSet<>(); List jobInstanceIds = this.jobInstanceDao.getJobInstanceIds(jobName); for (long jobInstanceId : jobInstanceIds) { - long runningJobExecutionId = getJdbcTemplate().queryForObject(getQuery(GET_RUNNING_EXECUTION_FOR_INSTANCE), - Long.class, jobInstanceId); + long runningJobExecutionId = getJdbcClient().sql(getQuery(GET_RUNNING_EXECUTION_FOR_INSTANCE)) + .param("jobInstanceId", jobInstanceId) + .query(Long.class) + .single(); JobExecution runningJobExecution = getJobExecution(runningJobExecutionId); result.add(runningJobExecution); } @@ -352,11 +380,16 @@ public Set findRunningJobExecutions(String jobName) { @Override public void synchronizeStatus(JobExecution jobExecution) { - int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), Integer.class, - jobExecution.getId()); + int currentVersion = getJdbcClient().sql(getQuery(CURRENT_VERSION_JOB_EXECUTION)) + .param("jobExecutionId", jobExecution.getId()) + .query(Integer.class) + .single(); if (currentVersion != jobExecution.getVersion()) { - String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId()); + String status = getJdbcClient().sql(getQuery(GET_STATUS)) + .param("jobExecutionId", jobExecution.getId()) + .query(String.class) + .single(); jobExecution.upgradeStatus(BatchStatus.valueOf(status)); jobExecution.setVersion(currentVersion); } @@ -368,8 +401,10 @@ public void synchronizeStatus(JobExecution jobExecution) { */ @Override public void deleteJobExecution(JobExecution jobExecution) { - int count = getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION), jobExecution.getId(), - jobExecution.getVersion()); + int count = getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION)) + .param("jobExecutionId", jobExecution.getId()) + .param("version", jobExecution.getVersion()) + .update(); if (count == 0) { throw new OptimisticLockingFailureException("Attempt to delete job execution id=" + jobExecution.getId() @@ -385,7 +420,9 @@ public void deleteJobExecution(JobExecution jobExecution) { */ @Override public void deleteJobExecutionParameters(JobExecution jobExecution) { - getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION_PARAMETERS), jobExecution.getId()); + getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION_PARAMETERS)) + .param("jobExecutionId", jobExecution.getId()) + .update(); } /** @@ -450,7 +487,7 @@ public JobParameters getJobParameters(Long executionId) { jobParameters.add(jobParameter); }; - getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), handler, executionId); + getJdbcClient().sql(getQuery(FIND_PARAMS_FROM_ID)).param("jobExecutionId", executionId).query(handler); return new JobParameters(jobParameters); } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobInstanceDao.java index 4bf34bdc86..502d3c89a2 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobInstanceDao.java @@ -32,7 +32,6 @@ import org.springframework.batch.core.repository.dao.JobInstanceDao; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.DataAccessException; -import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.RowMapper; @@ -74,30 +73,30 @@ public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements private static final String FIND_JOBS_WITH_NAME = """ SELECT JOB_INSTANCE_ID, JOB_NAME FROM %PREFIX%JOB_INSTANCE - WHERE JOB_NAME = ? + WHERE JOB_NAME = :jobName """; - private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME + " AND JOB_KEY = ?"; + private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME + " AND JOB_KEY = :jobKey"; private static final String COUNT_JOBS_WITH_NAME = """ SELECT COUNT(*) FROM %PREFIX%JOB_INSTANCE - WHERE JOB_NAME = ? + WHERE JOB_NAME = :jobName """; private static final String FIND_JOBS_WITH_EMPTY_KEY = FIND_JOBS_WITH_NAME - + " AND (JOB_KEY = ? OR JOB_KEY IS NULL)"; + + " AND (JOB_KEY = :jobKey OR JOB_KEY IS NULL)"; private static final String GET_JOB_FROM_ID = """ SELECT JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION FROM %PREFIX%JOB_INSTANCE - WHERE JOB_INSTANCE_ID = ? + WHERE JOB_INSTANCE_ID = :jobInstanceId """; private static final String GET_JOB_FROM_EXECUTION_ID = """ SELECT JI.JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, JI.VERSION FROM %PREFIX%JOB_INSTANCE JI, %PREFIX%JOB_EXECUTION JE - WHERE JOB_EXECUTION_ID = ? AND JI.JOB_INSTANCE_ID = JE.JOB_INSTANCE_ID + WHERE JOB_EXECUTION_ID = :jobExecutionId AND JI.JOB_INSTANCE_ID = JE.JOB_INSTANCE_ID """; private static final String FIND_JOB_NAMES = """ @@ -109,7 +108,7 @@ SELECT COUNT(*) private static final String FIND_LAST_JOBS_BY_NAME = """ SELECT JOB_INSTANCE_ID, JOB_NAME FROM %PREFIX%JOB_INSTANCE - WHERE JOB_NAME LIKE ? + WHERE JOB_NAME LIKE :jobName ORDER BY JOB_INSTANCE_ID DESC """; @@ -122,12 +121,12 @@ SELECT COUNT(*) private static final String FIND_LAST_JOB_INSTANCE_BY_JOB_NAME = """ SELECT JOB_INSTANCE_ID, JOB_NAME FROM %PREFIX%JOB_INSTANCE I1 - WHERE I1.JOB_NAME = ? AND I1.JOB_INSTANCE_ID = (SELECT MAX(I2.JOB_INSTANCE_ID) FROM %PREFIX%JOB_INSTANCE I2 WHERE I2.JOB_NAME = ?) + WHERE I1.JOB_NAME = :jobName AND I1.JOB_INSTANCE_ID = (SELECT MAX(I2.JOB_INSTANCE_ID) FROM %PREFIX%JOB_INSTANCE I2 WHERE I2.JOB_NAME = :jobName) """; private static final String DELETE_JOB_INSTANCE = """ DELETE FROM %PREFIX%JOB_INSTANCE - WHERE JOB_INSTANCE_ID = ? AND VERSION = ? + WHERE JOB_INSTANCE_ID = :jobInstanceId AND VERSION = :version """; private static final String GET_JOB_INSTANCE_IDS_BY_JOB_NAME = """ @@ -159,10 +158,14 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters JobInstance jobInstance = new JobInstance(jobInstanceId, jobName); jobInstance.incrementVersion(); - Object[] parameters = new Object[] { jobInstanceId, jobName, jobKeyGenerator.generateKey(jobParameters), - jobInstance.getVersion() }; - getJdbcTemplate().update(getQuery(CREATE_JOB_INSTANCE), parameters, - new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.INTEGER }); + getJdbcClient().sql(getQuery(CREATE_JOB_INSTANCE)) + // @formatter:off + .param(1, jobInstanceId, Types.BIGINT) + .param(2, jobName, Types.VARCHAR) + .param(3, jobKeyGenerator.generateKey(jobParameters), Types.VARCHAR) + .param(4, jobInstance.getVersion(), Types.INTEGER) + // @formatter:on + .update(); return jobInstance; } @@ -185,13 +188,12 @@ public JobInstance getJobInstance(final String jobName, final JobParameters jobP RowMapper rowMapper = new JobInstanceRowMapper(); - List instances; - if (StringUtils.hasLength(jobKey)) { - instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY), rowMapper, jobName, jobKey); - } - else { - instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_EMPTY_KEY), rowMapper, jobName, jobKey); - } + List instances = getJdbcClient() + .sql(getQuery(StringUtils.hasLength(jobKey) ? FIND_JOBS_WITH_KEY : FIND_JOBS_WITH_EMPTY_KEY)) + .param("jobName", jobName) + .param("jobKey", jobKey) + .query(rowMapper) + .list(); if (instances.isEmpty()) { return null; @@ -205,19 +207,16 @@ public JobInstance getJobInstance(final String jobName, final JobParameters jobP @Override @Nullable public JobInstance getJobInstance(long instanceId) { - - try { - return getJdbcTemplate().queryForObject(getQuery(GET_JOB_FROM_ID), new JobInstanceRowMapper(), instanceId); - } - catch (EmptyResultDataAccessException e) { - return null; - } - + return getJdbcClient().sql(getQuery(GET_JOB_FROM_ID)) + .param("jobInstanceId", instanceId) + .query(new JobInstanceRowMapper()) + .optional() + .orElse(null); } @Override public List getJobNames() { - return getJdbcTemplate().query(getQuery(FIND_JOB_NAMES), (rs, rowNum) -> rs.getString(1)); + return getJdbcClient().sql(getQuery(FIND_JOB_NAMES)).query(String.class).list(); } @Override @@ -247,7 +246,7 @@ public List extractData(ResultSet rs) throws SQLException, DataAcce jobName = jobName.replaceAll("\\" + STAR_WILDCARD, SQL_WILDCARD); } - return getJdbcTemplate().query(getQuery(FIND_LAST_JOBS_BY_NAME), extractor, jobName); + return getJdbcClient().sql(getQuery(FIND_LAST_JOBS_BY_NAME)).param("jobName", jobName).query(extractor); } /** @@ -269,13 +268,11 @@ public List getJobInstanceIds(String jobName) { @Override @Nullable public JobInstance getLastJobInstance(String jobName) { - try { - return getJdbcTemplate().queryForObject(getQuery(FIND_LAST_JOB_INSTANCE_BY_JOB_NAME), - new JobInstanceRowMapper(), jobName, jobName); - } - catch (EmptyResultDataAccessException e) { - return null; - } + return getJdbcClient().sql(getQuery(FIND_LAST_JOB_INSTANCE_BY_JOB_NAME)) + .param("jobName", jobName) + .query(new JobInstanceRowMapper()) + .optional() + .orElse(null); } @Override @@ -284,14 +281,11 @@ public JobInstance getLastJobInstance(String jobName) { // TODO clients should use // JobExecutionDao.getJobExecution(jobExecutionId).getJobInstance() instead public JobInstance getJobInstance(JobExecution jobExecution) { - - try { - return getJdbcTemplate().queryForObject(getQuery(GET_JOB_FROM_EXECUTION_ID), new JobInstanceRowMapper(), - jobExecution.getId()); - } - catch (EmptyResultDataAccessException e) { - return null; - } + return getJdbcClient().sql(getQuery(GET_JOB_FROM_EXECUTION_ID)) + .param("jobExecutionId", jobExecution.getId()) + .query(new JobInstanceRowMapper()) + .optional() + .orElse(null); } @Override @@ -299,7 +293,7 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException { if (!getJobNames().contains(jobName)) { throw new NoSuchJobException("No job instances were found for job name " + jobName); } - return getJdbcTemplate().queryForObject(getQuery(COUNT_JOBS_WITH_NAME), Long.class, jobName); + return getJdbcClient().sql(getQuery(COUNT_JOBS_WITH_NAME)).param("jobName", jobName).query(Long.class).single(); } /** @@ -308,8 +302,10 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException { */ @Override public void deleteJobInstance(JobInstance jobInstance) { - int count = getJdbcTemplate().update(getQuery(DELETE_JOB_INSTANCE), jobInstance.getId(), - jobInstance.getVersion()); + int count = getJdbcClient().sql(getQuery(DELETE_JOB_INSTANCE)) + .param("jobInstanceId", jobInstance.getId()) + .param("version", jobInstance.getVersion()) + .update(); if (count == 0) { throw new OptimisticLockingFailureException("Attempt to delete job instance id=" + jobInstance.getId() diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java index a2133f699d..ca6caf81e6 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java @@ -16,15 +16,11 @@ package org.springframework.batch.core.repository.dao.jdbc; -import java.sql.PreparedStatement; -import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -41,7 +37,7 @@ import org.springframework.batch.core.repository.dao.StepExecutionDao; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.OptimisticLockingFailureException; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.simple.JdbcClient; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -90,38 +86,39 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement """; private static final String GET_STEP_EXECUTIONS = GET_RAW_STEP_EXECUTIONS - + " WHERE JOB_EXECUTION_ID = ? ORDER BY STEP_EXECUTION_ID"; + + " WHERE JOB_EXECUTION_ID = :jobExecutionId ORDER BY STEP_EXECUTION_ID"; - private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " WHERE STEP_EXECUTION_ID = ?"; + private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + + " WHERE STEP_EXECUTION_ID = :stepExecutionId"; private static final String GET_LAST_STEP_EXECUTION = """ SELECT SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION, SE.CREATE_TIME, JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION FROM %PREFIX%JOB_EXECUTION JE JOIN %PREFIX%STEP_EXECUTION SE ON SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID - WHERE JE.JOB_INSTANCE_ID = ? AND SE.STEP_NAME = ? + WHERE JE.JOB_INSTANCE_ID = :jobInstanceId AND SE.STEP_NAME = :stepName """; private static final String CURRENT_VERSION_STEP_EXECUTION = """ SELECT VERSION FROM %PREFIX%STEP_EXECUTION - WHERE STEP_EXECUTION_ID=? + WHERE STEP_EXECUTION_ID = :stepExecutionId """; private static final String COUNT_STEP_EXECUTIONS = """ SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION JE JOIN %PREFIX%STEP_EXECUTION SE ON SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID - WHERE JE.JOB_INSTANCE_ID = ? AND SE.STEP_NAME = ? + WHERE JE.JOB_INSTANCE_ID = :jobInstanceId AND SE.STEP_NAME = :stepName """; private static final String DELETE_STEP_EXECUTION = """ DELETE FROM %PREFIX%STEP_EXECUTION - WHERE STEP_EXECUTION_ID = ? and VERSION = ? + WHERE STEP_EXECUTION_ID = :stepExecutionId and VERSION = :version """; private static final String GET_JOB_EXECUTION_ID_FROM_STEP_EXECUTION_ID = """ SELECT JE.JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION JE, %PREFIX%STEP_EXECUTION SE - WHERE SE.STEP_EXECUTION_ID = ? AND JE.JOB_EXECUTION_ID = SE.JOB_EXECUTION_ID + WHERE SE.STEP_EXECUTION_ID = :stepExecutionId AND JE.JOB_EXECUTION_ID = SE.JOB_EXECUTION_ID """; private static final Comparator BY_CREATE_TIME_DESC_ID_DESC = Comparator @@ -174,7 +171,11 @@ public StepExecution createStepExecution(String stepName, JobExecution jobExecut parameterTypes[i] = (Integer) parameters.get(1)[i]; } - getJdbcTemplate().update(getQuery(SAVE_STEP_EXECUTION), parameterValues, parameterTypes); + JdbcClient.StatementSpec statement = getJdbcClient().sql(getQuery(SAVE_STEP_EXECUTION)); + for (int i = 0; i < parameterTypes.length; i++) { + statement.param(i + 1, parameterValues[i], parameterTypes[i]); + } + statement.update(); return stepExecution; } @@ -241,21 +242,34 @@ public void updateStepExecution(StepExecution stepExecution) { : Timestamp.valueOf(stepExecution.getEndTime()); Timestamp lastUpdated = stepExecution.getLastUpdated() == null ? null : Timestamp.valueOf(stepExecution.getLastUpdated()); - Object[] parameters = new Object[] { startTime, endTime, stepExecution.getStatus().toString(), - stepExecution.getCommitCount(), stepExecution.getReadCount(), stepExecution.getFilterCount(), - stepExecution.getWriteCount(), stepExecution.getExitStatus().getExitCode(), exitDescription, - stepExecution.getReadSkipCount(), stepExecution.getProcessSkipCount(), - stepExecution.getWriteSkipCount(), stepExecution.getRollbackCount(), lastUpdated, - stepExecution.getId(), stepExecution.getVersion() }; - int count = getJdbcTemplate().update(getQuery(UPDATE_STEP_EXECUTION), parameters, - new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER }); + + int count = getJdbcClient().sql(getQuery(UPDATE_STEP_EXECUTION)) + // @formatter:off + .param(1, startTime, Types.TIMESTAMP) + .param(2, endTime, Types.TIMESTAMP) + .param(3, stepExecution.getStatus().toString(), Types.VARCHAR) + .param(4, stepExecution.getCommitCount(), Types.BIGINT) + .param(5, stepExecution.getReadCount(), Types.BIGINT) + .param(6, stepExecution.getFilterCount(), Types.BIGINT) + .param(7, stepExecution.getWriteCount(), Types.BIGINT) + .param(8, stepExecution.getExitStatus().getExitCode(), Types.VARCHAR) + .param(9, exitDescription, Types.VARCHAR) + .param(10, stepExecution.getReadSkipCount(), Types.BIGINT) + .param(11, stepExecution.getProcessSkipCount(), Types.BIGINT) + .param(12, stepExecution.getWriteSkipCount(), Types.BIGINT) + .param(13, stepExecution.getRollbackCount(), Types.BIGINT) + .param(14, lastUpdated, Types.TIMESTAMP) + .param(15, stepExecution.getId(), Types.BIGINT) + .param(16, stepExecution.getVersion(), Types.INTEGER) + // @formatter:on + .update(); // Avoid concurrent modifications... if (count == 0) { - int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_STEP_EXECUTION), - Integer.class, stepExecution.getId()); + int currentVersion = getJdbcClient().sql(getQuery(CURRENT_VERSION_STEP_EXECUTION)) + .param("stepExecutionId", stepExecution.getId()) + .query(Integer.class) + .single(); throw new OptimisticLockingFailureException( "Attempt to update step execution id=" + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion() + "), where current version is " + currentVersion); @@ -297,16 +311,20 @@ public StepExecution getStepExecution(long stepExecutionId) { } private long getJobExecutionId(long stepExecutionId) { - return getJdbcTemplate().queryForObject(getQuery(GET_JOB_EXECUTION_ID_FROM_STEP_EXECUTION_ID), Long.class, - stepExecutionId); + return getJdbcClient().sql(getQuery(GET_JOB_EXECUTION_ID_FROM_STEP_EXECUTION_ID)) + .param("stepExecutionId", stepExecutionId) + .query(Long.class) + .single(); } @Override @Nullable @Deprecated(since = "6.0", forRemoval = true) public StepExecution getStepExecution(JobExecution jobExecution, long stepExecutionId) { - List executions = getJdbcTemplate().query(getQuery(GET_STEP_EXECUTION), - new StepExecutionRowMapper(jobExecution), stepExecutionId); + List executions = getJdbcClient().sql(getQuery(GET_STEP_EXECUTION)) + .param("stepExecutionId", stepExecutionId) + .query(new StepExecutionRowMapper(jobExecution)) + .list(); Assert.state(executions.size() <= 1, "There can be at most one step execution with given name for single job execution"); @@ -320,19 +338,23 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut @Override public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { - List executions = getJdbcTemplate().query(getQuery(GET_LAST_STEP_EXECUTION), (rs, rowNum) -> { - long jobExecutionId = rs.getLong(19); - JobExecution jobExecution = new JobExecution(jobExecutionId, jobInstance, - jobExecutionDao.getJobParameters(jobExecutionId)); - jobExecution.setStartTime(rs.getTimestamp(20) == null ? null : rs.getTimestamp(20).toLocalDateTime()); - jobExecution.setEndTime(rs.getTimestamp(21) == null ? null : rs.getTimestamp(21).toLocalDateTime()); - jobExecution.setStatus(BatchStatus.valueOf(rs.getString(22))); - jobExecution.setExitStatus(new ExitStatus(rs.getString(23), rs.getString(24))); - jobExecution.setCreateTime(rs.getTimestamp(25) == null ? null : rs.getTimestamp(25).toLocalDateTime()); - jobExecution.setLastUpdated(rs.getTimestamp(26) == null ? null : rs.getTimestamp(26).toLocalDateTime()); - jobExecution.setVersion(rs.getInt(27)); - return new StepExecutionRowMapper(jobExecution).mapRow(rs, rowNum); - }, jobInstance.getInstanceId(), stepName); + List executions = getJdbcClient().sql(getQuery(GET_LAST_STEP_EXECUTION)) + .param("jobInstanceId", jobInstance.getInstanceId()) + .param("stepName", stepName) + .query((rs, rowNum) -> { + Long jobExecutionId = rs.getLong(19); + JobExecution jobExecution = new JobExecution(jobExecutionId, jobInstance, + jobExecutionDao.getJobParameters(jobExecutionId)); + jobExecution.setStartTime(rs.getTimestamp(20) == null ? null : rs.getTimestamp(20).toLocalDateTime()); + jobExecution.setEndTime(rs.getTimestamp(21) == null ? null : rs.getTimestamp(21).toLocalDateTime()); + jobExecution.setStatus(BatchStatus.valueOf(rs.getString(22))); + jobExecution.setExitStatus(new ExitStatus(rs.getString(23), rs.getString(24))); + jobExecution.setCreateTime(rs.getTimestamp(25) == null ? null : rs.getTimestamp(25).toLocalDateTime()); + jobExecution.setLastUpdated(rs.getTimestamp(26) == null ? null : rs.getTimestamp(26).toLocalDateTime()); + jobExecution.setVersion(rs.getInt(27)); + return new StepExecutionRowMapper(jobExecution).mapRow(rs, rowNum); + }) + .list(); executions.sort(BY_CREATE_TIME_DESC_ID_DESC); if (executions.isEmpty()) { return null; @@ -352,14 +374,19 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa */ @Override public List getStepExecutions(JobExecution jobExecution) { - return getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution), - jobExecution.getId()); + return getJdbcClient().sql(getQuery(GET_STEP_EXECUTIONS)) + .param("jobExecutionId", jobExecution.getId()) + .query(new StepExecutionRowMapper(jobExecution)) + .list(); } @Override public long countStepExecutions(JobInstance jobInstance, String stepName) { - return getJdbcTemplate().queryForObject(getQuery(COUNT_STEP_EXECUTIONS), Long.class, - jobInstance.getInstanceId(), stepName); + return getJdbcClient().sql(getQuery(COUNT_STEP_EXECUTIONS)) + .param("jobInstanceId", jobInstance.getInstanceId()) + .param("stepName", stepName) + .query(Long.class) + .single(); } /** @@ -368,8 +395,10 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) { */ @Override public void deleteStepExecution(StepExecution stepExecution) { - int count = getJdbcTemplate().update(getQuery(DELETE_STEP_EXECUTION), stepExecution.getId(), - stepExecution.getVersion()); + int count = getJdbcClient().sql(getQuery(DELETE_STEP_EXECUTION)) + .param("stepExecutionId", stepExecution.getId()) + .param("version", stepExecution.getVersion()) + .update(); if (count == 0) { throw new OptimisticLockingFailureException("Attempt to delete step execution id=" + stepExecution.getId()