diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java index 149c0555e3..f2bc812035 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java @@ -15,6 +15,11 @@ */ package org.springframework.batch.core.repository.dao.mongodb; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -23,17 +28,20 @@ import org.springframework.batch.core.job.JobInstance; import org.springframework.batch.core.job.parameters.JobParameters; import org.springframework.batch.core.repository.dao.JobExecutionDao; +import org.springframework.batch.core.repository.persistence.JobParameter; import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.Query; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; +import org.springframework.util.CollectionUtils; import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.query; /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class MongoJobExecutionDao implements JobExecutionDao { @@ -84,13 +92,12 @@ public void updateJobExecution(JobExecution jobExecution) { @Override public List findJobExecutions(JobInstance jobInstance) { - Query query = query(where("jobInstanceId").is(jobInstance.getId())); + Query query = query(where("jobInstanceId").is(jobInstance.getId())) + .with(Sort.by(Sort.Direction.DESC, "jobExecutionId")); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, JOB_EXECUTIONS_COLLECTION_NAME); - return jobExecutions.stream() - .map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance)) - .toList(); + return jobExecutions.stream().map(jobExecution -> convert(jobExecution, jobInstance)).toList(); } @Override @@ -101,7 +108,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) { query.with(Sort.by(sortOrder)), org.springframework.batch.core.repository.persistence.JobExecution.class, JOB_EXECUTIONS_COLLECTION_NAME); - return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null; + return jobExecution != null ? convert(jobExecution, jobInstance) : null; } @Override @@ -115,7 +122,7 @@ public Set findRunningJobExecutions(String jobName) { .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, JOB_EXECUTIONS_COLLECTION_NAME) .stream() - .map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance)) + .map(jobExecution -> convert(jobExecution, jobInstance)) .forEach(runningJobExecutions::add); } return runningJobExecutions; @@ -132,7 +139,7 @@ public JobExecution getJobExecution(long executionId) { } org.springframework.batch.core.job.JobInstance jobInstance = this.jobInstanceDao .getJobInstance(jobExecution.getJobInstanceId()); - return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance); + return convert(jobExecution, jobInstance); } @Override @@ -146,4 +153,43 @@ public void synchronizeStatus(JobExecution jobExecution) { // synchronizeStatus } + @Override + public void deleteJobExecution(JobExecution jobExecution) { + this.mongoOperations.remove(query(where("jobExecutionId").is(jobExecution.getId())), + JOB_EXECUTIONS_COLLECTION_NAME); + + } + + private JobExecution convert(org.springframework.batch.core.repository.persistence.JobExecution jobExecution, + org.springframework.batch.core.job.JobInstance jobInstance) { + Set> parameters = jobExecution.getJobParameters(); + if (!CollectionUtils.isEmpty(parameters)) { + // MongoDB restore temporal value as Date + Set> converted = new HashSet<>(); + for (JobParameter parameter : parameters) { + if (LocalDate.class.getName().equals(parameter.type()) && parameter.value() instanceof Date date) { + converted.add(new JobParameter<>(parameter.name(), + date.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(), parameter.type(), + parameter.identifying())); + } + else if (LocalTime.class.getName().equals(parameter.type()) && parameter.value() instanceof Date date) { + converted.add(new JobParameter<>(parameter.name(), + date.toInstant().atZone(ZoneId.systemDefault()).toLocalTime(), parameter.type(), + parameter.identifying())); + } + else if (LocalDateTime.class.getName().equals(parameter.type()) + && parameter.value() instanceof Date date) { + converted.add(new JobParameter<>(parameter.name(), + date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(), parameter.type(), + parameter.identifying())); + } + else { + converted.add(parameter); + } + } + jobExecution.setJobParameters(converted); + } + return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance); + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java index d7d9c564a3..760bdbba55 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java @@ -15,6 +15,7 @@ */ package org.springframework.batch.core.repository.dao.mongodb; +import java.util.Collections; import java.util.List; import org.springframework.batch.core.job.DefaultJobKeyGenerator; @@ -36,6 +37,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class MongoJobInstanceDao implements JobInstanceDao { @@ -117,7 +119,10 @@ public List getJobInstances(String jobName, int start, int count) { org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) .stream() .toList(); - return jobInstances.subList(start, jobInstances.size()) + if (jobInstances.size() <= start) { + return Collections.emptyList(); + } + return jobInstances.subList(start, Math.min(jobInstances.size(), start + jobInstances.size())) .stream() .map(this.jobInstanceConverter::toJobInstance) .limit(count) @@ -198,4 +203,9 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException { return this.mongoOperations.count(query, COLLECTION_NAME); } + @Override + public void deleteJobInstance(JobInstance jobInstance) { + this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), COLLECTION_NAME); + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index 481e3de370..abb6c964fa 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -36,6 +36,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class MongoStepExecutionDao implements StepExecutionDao { @@ -95,8 +96,8 @@ public StepExecution getStepExecution(long stepExecutionId) { org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations .findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class, STEP_EXECUTIONS_COLLECTION_NAME); - JobExecution jobExecution = jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId()); - return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null; + return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, + jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId())) : null; } @Deprecated(since = "6.0", forRemoval = true) @@ -163,24 +164,22 @@ public List getStepExecutions(JobExecution jobExecution) { @Override public long countStepExecutions(JobInstance jobInstance, String stepName) { - long count = 0; - // TODO optimize the count query Query query = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, JOB_EXECUTIONS_COLLECTION_NAME); - for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) { - List stepExecutions = jobExecution - .getStepExecutions(); - for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) { - if (stepExecution.getName().equals(stepName)) { - count++; - } - } - } - return count; + return this.mongoOperations.count( + query(where("jobExecutionId").in(jobExecutions.stream() + .map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId) + .toList())), + org.springframework.batch.core.repository.persistence.StepExecution.class, + STEP_EXECUTIONS_COLLECTION_NAME); } - // TODO implement deleteStepExecution(StepExecution stepExecution) + @Override + public void deleteStepExecution(StepExecution stepExecution) { + this.mongoOperations.remove(query(where("stepExecutionId").is(stepExecution.getId())), + STEP_EXECUTIONS_COLLECTION_NAME); + } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/AbstractMongoDBDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/AbstractMongoDBDaoIntegrationTests.java new file mode 100644 index 0000000000..25375257e7 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/AbstractMongoDBDaoIntegrationTests.java @@ -0,0 +1,106 @@ +/* + * Copyright 2008-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import org.junit.jupiter.api.BeforeEach; +import org.springframework.batch.core.repository.dao.mongodb.MongoExecutionContextDao; +import org.springframework.batch.core.repository.dao.mongodb.MongoJobExecutionDao; +import org.springframework.batch.core.repository.dao.mongodb.MongoJobInstanceDao; +import org.springframework.batch.core.repository.dao.mongodb.MongoStepExecutionDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.stream.Stream; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author Yanming Zhou + */ +@Testcontainers(disabledWithoutDocker = true) +@SpringJUnitConfig({ MongoDBIntegrationTestConfiguration.class, + AbstractMongoDBDaoIntegrationTests.MongoDBDaoConfiguration.class }) +abstract class AbstractMongoDBDaoIntegrationTests { + + @BeforeEach + void setUp(@Autowired MongoTemplate mongoTemplate) throws IOException { + mongoTemplate.dropCollection("BATCH_JOB_INSTANCE"); + mongoTemplate.dropCollection("BATCH_JOB_EXECUTION"); + mongoTemplate.dropCollection("BATCH_STEP_EXECUTION"); + mongoTemplate.createCollection("BATCH_JOB_INSTANCE"); + mongoTemplate.createCollection("BATCH_JOB_EXECUTION"); + mongoTemplate.createCollection("BATCH_STEP_EXECUTION"); + // sequences + mongoTemplate.dropCollection("BATCH_SEQUENCES"); + Resource resource = new FileSystemResource( + "src/main/resources/org/springframework/batch/core/schema-mongodb.jsonl"); + try (Stream lines = Files.lines(resource.getFilePath())) { + lines.forEach(mongoTemplate::executeCommand); + } + } + + protected void assertTemporalEquals(LocalDateTime lhs, LocalDateTime rhs) { + assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs, + rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null); + } + + protected void assertTemporalEquals(LocalTime lhs, LocalTime rhs) { + assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs, + rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null); + } + + @Configuration + static class MongoDBDaoConfiguration { + + @Bean + MongoJobInstanceDao jobInstanceDao(MongoOperations mongoOperations) { + return new MongoJobInstanceDao(mongoOperations); + } + + @Bean + MongoJobExecutionDao jobExecutionDao(MongoOperations mongoOperations, MongoJobInstanceDao jobInstanceDao) { + MongoJobExecutionDao jobExecutionDao = new MongoJobExecutionDao(mongoOperations); + jobExecutionDao.setJobInstanceDao(jobInstanceDao); + return jobExecutionDao; + } + + @Bean + MongoStepExecutionDao stepExecutionDao(MongoOperations mongoOperations, MongoJobExecutionDao jobExecutionDao) { + MongoStepExecutionDao stepExecutionDao = new MongoStepExecutionDao(mongoOperations); + stepExecutionDao.setJobExecutionDao(jobExecutionDao); + return stepExecutionDao; + } + + @Bean + MongoExecutionContextDao executionContextDao(MongoOperations mongoOperations) { + return new MongoExecutionContextDao(mongoOperations); + } + + } + +} \ No newline at end of file diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java index 5db103725c..5cb23a97ee 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java @@ -15,11 +15,8 @@ */ package org.springframework.batch.core.repository.support; -import java.io.IOException; -import java.nio.file.Files; import java.time.LocalDateTime; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.batch.core.job.Job; import org.springframework.batch.core.job.JobExecution; @@ -29,19 +26,8 @@ import org.springframework.batch.core.step.StepExecution; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.repository.dao.ExecutionContextDao; -import org.springframework.batch.core.repository.dao.mongodb.MongoExecutionContextDao; -import org.springframework.batch.core.repository.support.MongoExecutionContextDaoIntegrationTests.ExecutionContextDaoConfiguration; import org.springframework.batch.infrastructure.item.ExecutionContext; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.io.FileSystemResource; -import org.springframework.core.io.Resource; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -51,17 +37,7 @@ * @author Henning Pƶttker * @author Yanming Zhou */ -@DirtiesContext -@Testcontainers(disabledWithoutDocker = true) -@SpringJUnitConfig({ MongoDBIntegrationTestConfiguration.class, ExecutionContextDaoConfiguration.class }) -public class MongoExecutionContextDaoIntegrationTests { - - @BeforeAll - static void setUp(@Autowired MongoTemplate mongoTemplate) throws IOException { - Resource resource = new FileSystemResource( - "src/main/resources/org/springframework/batch/core/schema-mongodb.jsonl"); - Files.lines(resource.getFilePath()).forEach(line -> mongoTemplate.executeCommand(line)); - } +public class MongoExecutionContextDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests { @Test void testGetJobExecutionWithEmptyResult(@Autowired ExecutionContextDao executionContextDao) { @@ -131,14 +107,4 @@ void testSaveStepExecution(@Autowired JobOperator jobOperator, @Autowired Job jo assertEquals("bar", actual.get("foo")); } - @Configuration - static class ExecutionContextDaoConfiguration { - - @Bean - ExecutionContextDao executionContextDao(MongoOperations mongoOperations) { - return new MongoExecutionContextDao(mongoOperations); - } - - } - } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoJobExecutionDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoJobExecutionDaoIntegrationTests.java new file mode 100644 index 0000000000..584e07ad96 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoJobExecutionDaoIntegrationTests.java @@ -0,0 +1,376 @@ +/* + * Copyright 2008-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.job.JobExecution; +import org.springframework.batch.core.job.JobInstance; +import org.springframework.batch.core.job.parameters.JobParameters; +import org.springframework.batch.core.job.parameters.JobParametersBuilder; +import org.springframework.batch.core.repository.dao.JobExecutionDao; +import org.springframework.batch.core.repository.dao.JobInstanceDao; +import org.springframework.batch.core.repository.dao.StepExecutionDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.OptimisticLockingFailureException; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author Yanming Zhou + */ +class MongoJobExecutionDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests { + + @Autowired + private JobExecutionDao dao; + + private JobInstance jobInstance; + + private JobParameters jobParameters; + + @BeforeEach + public void setUp(@Autowired JobInstanceDao jobInstanceDao) throws Exception { + jobParameters = new JobParameters(); + jobInstance = jobInstanceDao.createJobInstance("execTestJob", jobParameters); + } + + @Test + void testJobParametersPersistenceRoundTrip() { + // given + Date dateParameter = new Date(); + LocalDate localDateParameter = LocalDate.now(); + LocalTime localTimeParameter = LocalTime.now(); + LocalDateTime localDateTimeParameter = LocalDateTime.now(); + String stringParameter = "foo"; + long longParameter = 1L; + double doubleParameter = 2D; + JobParameters jobParameters = new JobParametersBuilder().addString("string", stringParameter) + .addLong("long", longParameter) + .addDouble("double", doubleParameter) + .addDate("date", dateParameter) + .addLocalDate("localDate", localDateParameter) + .addLocalTime("localTime", localTimeParameter) + .addLocalDateTime("localDateTime", localDateTimeParameter) + .toJobParameters(); + JobExecution execution = dao.createJobExecution(jobInstance, jobParameters); + + // when + execution = dao.getJobExecution(execution.getId()); + assertNotNull(execution); + + // then + JobParameters parameters = execution.getJobParameters(); + assertNotNull(parameters); + assertEquals(dateParameter, parameters.getDate("date")); + assertEquals(localDateParameter, parameters.getLocalDate("localDate")); + assertTemporalEquals(localTimeParameter, parameters.getLocalTime("localTime")); + assertTemporalEquals(localDateTimeParameter, parameters.getLocalDateTime("localDateTime")); + assertEquals(stringParameter, parameters.getString("string")); + assertEquals(longParameter, parameters.getLong("long")); + assertEquals(doubleParameter, parameters.getDouble("double")); + } + + /** + * Save and find a job execution. + */ + @Test + void testSaveAndFind() { + + JobExecution execution = dao.createJobExecution(jobInstance, jobParameters); + execution.setStartTime(LocalDateTime.now()); + execution.setLastUpdated(LocalDateTime.now()); + execution.setExitStatus(ExitStatus.UNKNOWN); + execution.setEndTime(LocalDateTime.now()); + dao.updateJobExecution(execution); + + List executions = dao.findJobExecutions(jobInstance); + assertEquals(1, executions.size()); + assertEquals(execution, executions.get(0)); + assertExecutionsAreEqual(execution, executions.get(0)); + } + + /** + * Executions should be returned in the reverse order they were saved. + */ + @Test + void testFindExecutionsOrdering() { + + List execs = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + JobExecution exec = dao.createJobExecution(jobInstance, jobParameters); + exec.setCreateTime(LocalDateTime.now().plusSeconds(i)); + execs.add(exec); + dao.updateJobExecution(exec); + } + + List retrieved = new ArrayList<>(dao.findJobExecutions(jobInstance)); + Collections.reverse(retrieved); + + for (int i = 0; i < 10; i++) { + assertExecutionsAreEqual(execs.get(i), retrieved.get(i)); + } + + } + + /** + * Save and find a job execution. + */ + @Test + void testFindNonExistentExecutions() { + List executions = dao.findJobExecutions(jobInstance); + assertEquals(0, executions.size()); + } + + /** + * Update and retrieve job execution - check attributes have changed as expected. + */ + @Test + void testUpdateExecution() { + JobExecution execution = dao.createJobExecution(jobInstance, jobParameters); + assertEquals(BatchStatus.STARTING, execution.getStatus()); + + execution.setLastUpdated(LocalDateTime.now()); + execution.setStatus(BatchStatus.COMPLETED); + dao.updateJobExecution(execution); + + JobExecution updated = dao.findJobExecutions(jobInstance).get(0); + assertEquals(execution, updated); + assertEquals(BatchStatus.COMPLETED, updated.getStatus()); + assertExecutionsAreEqual(execution, updated); + } + + /** + * Check the execution with most recent start time is returned + */ + @Test + void testGetLastExecution() throws Exception { + JobExecution exec1 = dao.createJobExecution(jobInstance, jobParameters); + + TimeUnit.MILLISECONDS.sleep(10); + JobExecution exec2 = dao.createJobExecution(jobInstance, jobParameters); + + assertTrue(exec2.getCreateTime().isAfter(exec1.getCreateTime())); + + JobExecution last = dao.getLastJobExecution(jobInstance); + assertEquals(exec2, last); + } + + /** + * Check the execution is returned + */ + @Test + void testGetMissingLastExecution() { + JobExecution value = dao.getLastJobExecution(jobInstance); + assertNull(value); + } + + /** + * Check the execution is returned + */ + @Test + void testFindRunningExecutions(@Autowired StepExecutionDao stepExecutionDao) { + // Normally completed JobExecution as EndTime is populated + JobExecution exec = dao.createJobExecution(jobInstance, jobParameters); + LocalDateTime now = LocalDateTime.now(); + + exec.setStartTime(now.plusSeconds(1)); + exec.setEndTime(now.plusSeconds(2)); + exec.setStatus(BatchStatus.COMPLETED); + exec.setLastUpdated(now.plusSeconds(3)); + dao.updateJobExecution(exec); + + // BATCH-2675 + // Abnormal JobExecution as both StartTime and EndTime are null + // This can occur when TaskExecutorJobLauncher#run() submission to taskExecutor + // throws a TaskRejectedException + exec = dao.createJobExecution(jobInstance, jobParameters); + exec.setLastUpdated(now.plusSeconds(3)); + dao.updateJobExecution(exec); + + // Stopping JobExecution as status is STOPPING + exec = dao.createJobExecution(jobInstance, jobParameters); + exec.setStartTime(now.plusSeconds(6)); + exec.setStatus(BatchStatus.STOPPING); + exec.setLastUpdated(now.plusSeconds(7)); + dao.updateJobExecution(exec); + + // Running JobExecution as StartTime is populated but EndTime is null + exec = dao.createJobExecution(jobInstance, jobParameters); + exec.setStartTime(now.plusSeconds(2)); + exec.setStatus(BatchStatus.STARTED); + exec.setLastUpdated(now.plusSeconds(3)); + exec.addStepExecution(stepExecutionDao.createStepExecution("step", exec)); + dao.updateJobExecution(exec); + + Set values = dao.findRunningJobExecutions(exec.getJobInstance().getJobName()); + + assertEquals(3, values.size()); + Long jobExecutionId = exec.getId(); + JobExecution value = values.stream() + .filter(jobExecution -> jobExecutionId.equals(jobExecution.getId())) + .findFirst() + .orElseThrow(); + assertTemporalEquals(now.plusSeconds(3), value.getLastUpdated()); + + } + + /** + * Check the execution is returned + */ + @Test + void testNoRunningExecutions() { + Set values = dao.findRunningJobExecutions("no-such-job"); + assertEquals(0, values.size()); + } + + /** + * Check the execution is returned + */ + @Test + void testGetExecution(@Autowired StepExecutionDao stepExecutionDao) { + JobExecution exec = dao.createJobExecution(jobInstance, jobParameters); + exec.setCreateTime(LocalDateTime.now()); + exec.addStepExecution(stepExecutionDao.createStepExecution("step", exec)); + + JobExecution value = dao.getJobExecution(exec.getId()); + + assertEquals(exec, value); + // N.B. the job instance is not re-hydrated in the JDBC case... + } + + /** + * Check the execution is returned + */ + @Test + void testGetMissingExecution() { + JobExecution value = dao.getJobExecution(54321L); + assertNull(value); + } + + /** + * Exception should be raised when the version of update argument doesn't match the + * version of persisted entity. + */ + @Disabled("Not supported yet") + @Test + void testConcurrentModificationException() { + + JobExecution exec1 = dao.createJobExecution(jobInstance, jobParameters); + + JobExecution exec2 = dao.getJobExecution(exec1.getId()); + assertNotNull(exec2); + + assertEquals((Integer) 0, exec1.getVersion()); + assertEquals(exec1.getVersion(), exec2.getVersion()); + + dao.updateJobExecution(exec1); + assertEquals((Integer) 1, exec1.getVersion()); + + assertThrows(OptimisticLockingFailureException.class, () -> dao.updateJobExecution(exec2)); + } + + /** + * Successful synchronization from STARTED to STOPPING status. + */ + @Test + void testSynchronizeStatusUpgrade() { + + JobExecution exec1 = dao.createJobExecution(jobInstance, jobParameters); + exec1.setStatus(BatchStatus.STOPPING); + dao.updateJobExecution(exec1); + + JobExecution exec2 = dao.getJobExecution(exec1.getId()); + assertNotNull(exec2); + exec2.setStatus(BatchStatus.STARTED); + // exec2.setVersion(7); + // assertNotSame(exec1.getVersion(), exec2.getVersion()); + assertNotSame(exec1.getStatus(), exec2.getStatus()); + + dao.synchronizeStatus(exec2); + + // assertEquals(exec1.getVersion(), exec2.getVersion()); + assertEquals(exec1.getStatus(), exec2.getStatus()); + } + + /** + * UNKNOWN status won't be changed by synchronizeStatus, because it is the 'largest' + * BatchStatus (will not downgrade). + */ + @Test + void testSynchronizeStatusDowngrade() { + + JobExecution exec1 = dao.createJobExecution(jobInstance, jobParameters); + exec1.setStatus(BatchStatus.STARTED); + dao.updateJobExecution(exec1); + + JobExecution exec2 = dao.getJobExecution(exec1.getId()); + assertNotNull(exec2); + + exec2.setStatus(BatchStatus.UNKNOWN); + // exec2.setVersion(7); + // assertNotSame(exec1.getVersion(), exec2.getVersion()); + assertTrue(exec1.getStatus().isLessThan(exec2.getStatus())); + + dao.synchronizeStatus(exec2); + + // assertEquals(exec1.getVersion(), exec2.getVersion()); + assertEquals(BatchStatus.UNKNOWN, exec2.getStatus()); + } + + @Test + void testDeleteJobExecution() { + // given + JobExecution execution = dao.createJobExecution(jobInstance, new JobParameters()); + + // when + dao.deleteJobExecution(execution); + + // then + assertNull(dao.getJobExecution(execution.getId())); + } + + /* + * Check to make sure the executions are equal. Normally, comparing the id's is + * sufficient. However, for testing purposes, especially of a DAO, we need to make + * sure all the fields are being stored/retrieved correctly. + */ + + private void assertExecutionsAreEqual(JobExecution lhs, JobExecution rhs) { + + assertEquals(lhs.getId(), rhs.getId()); + assertTemporalEquals(lhs.getStartTime(), rhs.getStartTime()); + assertEquals(lhs.getStatus(), rhs.getStatus()); + assertTemporalEquals(lhs.getEndTime(), rhs.getEndTime()); + assertTemporalEquals(lhs.getCreateTime(), rhs.getCreateTime()); + assertTemporalEquals(lhs.getLastUpdated(), rhs.getLastUpdated()); + assertEquals(lhs.getVersion(), rhs.getVersion()); + } + +} \ No newline at end of file diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoJobInstanceDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoJobInstanceDaoIntegrationTests.java new file mode 100644 index 0000000000..fe059bd06e --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoJobInstanceDaoIntegrationTests.java @@ -0,0 +1,315 @@ +/* + * Copyright 2008-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.job.DefaultJobKeyGenerator; +import org.springframework.batch.core.job.JobExecution; +import org.springframework.batch.core.job.JobInstance; +import org.springframework.batch.core.job.JobKeyGenerator; +import org.springframework.batch.core.job.parameters.JobParameters; +import org.springframework.batch.core.job.parameters.JobParametersBuilder; +import org.springframework.batch.core.repository.dao.JobExecutionDao; +import org.springframework.batch.core.repository.dao.JobInstanceDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.util.ReflectionTestUtils; + +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Date; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author Yanming Zhou + */ +class MongoJobInstanceDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests { + + @Autowired + private JobInstanceDao dao; + + private final JobParameters fooParams = new JobParametersBuilder().addString("stringKey", "stringValue") + .addLong("longKey", Long.MAX_VALUE) + .addDouble("doubleKey", Double.MAX_VALUE) + .addDate("dateKey", new Date(DATE)) + .toJobParameters(); + + private static final long DATE = 777; + + private final String fooJob = "foo"; + + @Test + void testFindJobInstanceByExecution(@Autowired JobExecutionDao jobExecutionDao) { + + JobParameters jobParameters = new JobParameters(); + JobInstance jobInstance = dao.createJobInstance("testInstance", jobParameters); + JobExecution jobExecution = jobExecutionDao.createJobExecution(jobInstance, jobParameters); + + JobInstance returnedInstance = dao.getJobInstance(jobExecution); + assertEquals(jobInstance, returnedInstance); + } + + @Test + void testHexing() throws Exception { + MessageDigest digest = MessageDigest.getInstance("MD5"); + byte[] bytes = digest.digest("f78spx".getBytes(StandardCharsets.UTF_8)); + StringBuilder output = new StringBuilder(); + for (byte bite : bytes) { + output.append(String.format("%02x", bite)); + } + assertEquals(32, output.length(), "Wrong hash: " + output); + String value = String.format("%032x", new BigInteger(1, bytes)); + assertEquals(32, value.length(), "Wrong hash: " + value); + assertEquals(value, output.toString()); + } + + @Disabled("Not supported yet") + @Test + void testJobInstanceWildcard() { + dao.createJobInstance("anotherJob", new JobParameters()); + dao.createJobInstance("someJob", new JobParameters()); + + List jobInstances = dao.getJobInstances("*Job", 0, 2); + assertEquals(2, jobInstances.size()); + + for (JobInstance instance : jobInstances) { + assertTrue(instance.getJobName().contains("Job")); + } + + jobInstances = dao.getJobInstances("Job*", 0, 2); + assertTrue(jobInstances.isEmpty()); + } + + @Test + void testDeleteJobInstance() { + // given + JobInstance jobInstance = dao.createJobInstance("someTestInstance", new JobParameters()); + + // when + dao.deleteJobInstance(jobInstance); + + // then + assertNull(dao.getJobInstance(jobInstance.getId())); + } + + @Test + void testDefaultJobKeyGeneratorIsUsed() { + JobKeyGenerator jobKeyGenerator = (JobKeyGenerator) ReflectionTestUtils.getField(dao, "jobKeyGenerator"); + assertNotNull(jobKeyGenerator); + assertEquals(DefaultJobKeyGenerator.class, jobKeyGenerator.getClass()); + } + + /* + * Create and retrieve a job instance. + */ + + @Test + void testCreateAndRetrieve() { + + JobInstance fooInstance = dao.createJobInstance(fooJob, fooParams); + assertEquals(fooJob, fooInstance.getJobName()); + + JobInstance retrievedInstance = dao.getJobInstance(fooJob, fooParams); + assertNotNull(retrievedInstance); + assertEquals(fooInstance, retrievedInstance); + assertEquals(fooJob, retrievedInstance.getJobName()); + } + + /* + * Create and retrieve a job instance. + */ + + @Test + void testCreateAndGetById() { + + JobInstance fooInstance = dao.createJobInstance(fooJob, fooParams); + assertEquals(fooJob, fooInstance.getJobName()); + + JobInstance retrievedInstance = dao.getJobInstance(fooInstance.getId()); + assertNotNull(retrievedInstance); + assertEquals(fooInstance, retrievedInstance); + assertEquals(fooJob, retrievedInstance.getJobName()); + } + + /* + * Create and retrieve a job instance. + */ + + @Test + void testGetMissingById() { + + JobInstance retrievedInstance = dao.getJobInstance(1111111L); + assertNull(retrievedInstance); + + } + + /* + * Create and retrieve a job instance. + */ + + @Test + void testGetJobNames() { + + testCreateAndRetrieve(); + List jobNames = dao.getJobNames(); + assertFalse(jobNames.isEmpty()); + assertTrue(jobNames.contains(fooJob)); + + } + + /** + * Create and retrieve a job instance. + */ + + @Test + void testGetLastInstances() { + + testCreateAndRetrieve(); + + // unrelated job instance that should be ignored by the query + dao.createJobInstance("anotherJob", new JobParameters()); + + // we need two instances of the same job to check ordering + dao.createJobInstance(fooJob, new JobParameters()); + + List jobInstances = dao.getJobInstances(fooJob, 0, 2); + assertEquals(2, jobInstances.size()); + assertEquals(fooJob, jobInstances.get(0).getJobName()); + assertEquals(fooJob, jobInstances.get(1).getJobName()); + // assertEquals(Integer.valueOf(0), jobInstances.get(0).getVersion()); + // assertEquals(Integer.valueOf(0), jobInstances.get(1).getVersion()); + + assertTrue(jobInstances.get(0).getId() > jobInstances.get(1).getId(), + "Last instance should be first on the list"); + + } + + @Test + void testGetLastInstance() { + testCreateAndRetrieve(); + + // unrelated job instance that should be ignored by the query + dao.createJobInstance("anotherJob", new JobParameters()); + + // we need two instances of the same job to check ordering + dao.createJobInstance(fooJob, new JobParameters()); + + List jobInstances = dao.getJobInstances(fooJob, 0, 2); + assertEquals(2, jobInstances.size()); + JobInstance lastJobInstance = dao.getLastJobInstance(fooJob); + assertNotNull(lastJobInstance); + assertEquals(fooJob, lastJobInstance.getJobName()); + assertEquals(jobInstances.get(0), lastJobInstance, "Last instance should be first on the list"); + } + + @Test + void testGetLastInstanceWhenNoInstance() { + JobInstance lastJobInstance = dao.getLastJobInstance("NonExistingJob"); + assertNull(lastJobInstance); + } + + /** + * Create and retrieve a job instance. + */ + + @Test + void testGetLastInstancesPaged() { + + testCreateAndRetrieve(); + + // unrelated job instance that should be ignored by the query + dao.createJobInstance("anotherJob", new JobParameters()); + + // we need multiple instances of the same job to check ordering + String multiInstanceJob = "multiInstanceJob"; + String paramKey = "myID"; + int instanceCount = 6; + for (int i = 1; i <= instanceCount; i++) { + JobParameters params = new JobParametersBuilder().addLong(paramKey, (long) i).toJobParameters(); + dao.createJobInstance(multiInstanceJob, params); + } + + int startIndex = 3; + int queryCount = 2; + List jobInstances = dao.getJobInstances(multiInstanceJob, startIndex, queryCount); + + assertEquals(queryCount, jobInstances.size()); + + for (int i = 0; i < queryCount; i++) { + JobInstance returnedInstance = jobInstances.get(i); + assertEquals(multiInstanceJob, returnedInstance.getJobName()); + // assertEquals(Integer.valueOf(0), returnedInstance.getVersion()); + + // checks the correct instances are returned and the order is descending + // assertEquals(instanceCount - startIndex - i , + // returnedInstance.getJobParameters().getLong(paramKey)); + } + + } + + /** + * Create and retrieve a job instance. + */ + + @Test + void testGetLastInstancesPastEnd() { + + testCreateAndRetrieve(); + + // unrelated job instance that should be ignored by the query + dao.createJobInstance("anotherJob", new JobParameters()); + + // we need two instances of the same job to check ordering + dao.createJobInstance(fooJob, new JobParameters()); + + assertEquals(1, dao.getJobInstances(fooJob, 0, 1).size()); + assertEquals(2, dao.getJobInstances(fooJob, 0, 2).size()); + assertEquals(2, dao.getJobInstances(fooJob, 0, 3).size()); + assertEquals(1, dao.getJobInstances(fooJob, 1, 3).size()); + assertEquals(0, dao.getJobInstances(fooJob, 0, 0).size()); + assertEquals(0, dao.getJobInstances(fooJob, 4, 2).size()); + + } + + /** + * Trying to create instance twice for the same job+parameters causes error + */ + + @Test + void testCreateDuplicateInstance() { + + dao.createJobInstance(fooJob, fooParams); + + assertThrows(IllegalStateException.class, () -> dao.createJobInstance(fooJob, fooParams)); + } + + @Disabled("Version is not persisted") + @Test + void testCreationAddsVersion() { + + JobInstance jobInstance = new JobInstance(1L, "testVersionAndId"); + + assertNull(jobInstance.getVersion()); + + jobInstance = dao.createJobInstance("testVersion", new JobParameters()); + + assertNotNull(jobInstance.getVersion()); + } + +} \ No newline at end of file diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java new file mode 100644 index 0000000000..1d1187a946 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java @@ -0,0 +1,208 @@ +/* + * Copyright 2008-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.job.JobExecution; +import org.springframework.batch.core.job.JobInstance; +import org.springframework.batch.core.job.parameters.JobParameters; +import org.springframework.batch.core.repository.dao.JobExecutionDao; +import org.springframework.batch.core.repository.dao.JobInstanceDao; +import org.springframework.batch.core.repository.dao.StepExecutionDao; +import org.springframework.batch.core.step.StepExecution; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.OptimisticLockingFailureException; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author Yanming Zhou + */ +class MongoStepExecutionDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests { + + @Autowired + private StepExecutionDao dao; + + private JobInstance jobInstance; + + private JobExecution jobExecution; + + @BeforeEach + public void setUp(@Autowired JobInstanceDao jobInstanceDao, @Autowired JobExecutionDao jobExecutionDao) + throws Exception { + JobParameters jobParameters = new JobParameters(); + jobInstance = jobInstanceDao.createJobInstance("execTestJob", jobParameters); + jobExecution = jobExecutionDao.createJobExecution(jobInstance, new JobParameters()); + } + + @Test + void testSaveAndGetExecution() { + + StepExecution stepExecution = dao.createStepExecution("step", jobExecution); + + stepExecution.setStatus(BatchStatus.STARTED); + stepExecution.setReadSkipCount(7); + stepExecution.setProcessSkipCount(2); + stepExecution.setWriteSkipCount(5); + stepExecution.setProcessSkipCount(11); + stepExecution.setRollbackCount(3); + stepExecution.setLastUpdated(LocalDateTime.now()); + stepExecution.setReadCount(17); + stepExecution.setFilterCount(15); + stepExecution.setWriteCount(13); + dao.updateStepExecution(stepExecution); + + StepExecution retrieved = dao.getStepExecution(stepExecution.getId()); + assertNotNull(retrieved); + + assertStepExecutionsAreEqual(stepExecution, retrieved); + assertNotNull(retrieved.getJobExecution()); + assertNotNull(retrieved.getJobExecution().getId()); + assertNotNull(retrieved.getJobExecution().getJobInstance()); + + } + + @Test + void testSaveAndGetLastExecution() { + LocalDateTime now = LocalDateTime.now(); + StepExecution stepExecution1 = dao.createStepExecution("step1", jobExecution); + stepExecution1.setStartTime(now); + dao.updateStepExecution(stepExecution1); + + StepExecution stepExecution2 = dao.createStepExecution("step1", jobExecution); + stepExecution2.setStartTime(now.plus(500, ChronoUnit.MILLIS)); + dao.updateStepExecution(stepExecution2); + + StepExecution lastStepExecution = dao.getLastStepExecution(jobInstance, "step1"); + assertNotNull(lastStepExecution); + assertStepExecutionsAreEqual(stepExecution2, lastStepExecution); + } + + @Test + void testSaveAndGetLastExecutionWhenSameStartTime() { + LocalDateTime now = LocalDateTime.now(); + StepExecution stepExecution1 = dao.createStepExecution("step1", jobExecution); + stepExecution1.setStartTime(now); + dao.updateStepExecution(stepExecution1); + + StepExecution stepExecution2 = dao.createStepExecution("step1", jobExecution); + stepExecution2.setStartTime(now); + dao.updateStepExecution(stepExecution2); + + StepExecution lastStepExecution = stepExecution1.getId() > stepExecution2.getId() ? stepExecution1 + : stepExecution2; + StepExecution retrieved = dao.getLastStepExecution(jobInstance, "step1"); + assertNotNull(retrieved); + assertEquals(lastStepExecution.getId(), retrieved.getId()); + } + + @Test + void testGetForNotExistingJobExecution() { + assertNull(dao.getStepExecution(45677L)); + } + + /** + * Update and retrieve updated StepExecution - make sure the update is reflected as + * expected and version number has been incremented + */ + @Test + void testUpdateExecution() { + StepExecution stepExecution = dao.createStepExecution("step1", jobExecution); + + stepExecution.setStatus(BatchStatus.ABANDONED); + stepExecution.setLastUpdated(LocalDateTime.now()); + dao.updateStepExecution(stepExecution); + + StepExecution retrieved = dao.getStepExecution(stepExecution.getId()); + assertNotNull(retrieved); + assertEquals(stepExecution, retrieved); + assertTemporalEquals(stepExecution.getLastUpdated(), retrieved.getLastUpdated()); + assertEquals(BatchStatus.ABANDONED, retrieved.getStatus()); + } + + /** + * Exception should be raised when the version of update argument doesn't match the + * version of persisted entity. + */ + @Disabled("Not supported yet") + @Test + void testConcurrentModificationException() { + + StepExecution exec1 = dao.createStepExecution("step", jobExecution); + + StepExecution exec2 = dao.getStepExecution(exec1.getId()); + assertNotNull(exec2); + + assertEquals(Integer.valueOf(0), exec1.getVersion()); + assertEquals(exec1.getVersion(), exec2.getVersion()); + + dao.updateStepExecution(exec1); + assertEquals(Integer.valueOf(1), exec1.getVersion()); + + assertThrows(OptimisticLockingFailureException.class, () -> dao.updateStepExecution(exec2)); + } + + private void assertStepExecutionsAreEqual(StepExecution expected, StepExecution actual) { + assertEquals(expected.getId(), actual.getId()); + assertTemporalEquals(expected.getStartTime(), actual.getStartTime()); + assertTemporalEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getSkipCount(), actual.getSkipCount()); + assertEquals(expected.getCommitCount(), actual.getCommitCount()); + assertEquals(expected.getReadCount(), actual.getReadCount()); + assertEquals(expected.getWriteCount(), actual.getWriteCount()); + assertEquals(expected.getFilterCount(), actual.getFilterCount()); + assertEquals(expected.getWriteSkipCount(), actual.getWriteSkipCount()); + assertEquals(expected.getReadSkipCount(), actual.getReadSkipCount()); + assertEquals(expected.getProcessSkipCount(), actual.getProcessSkipCount()); + assertEquals(expected.getRollbackCount(), actual.getRollbackCount()); + assertEquals(expected.getExitStatus(), actual.getExitStatus()); + assertTemporalEquals(expected.getLastUpdated(), actual.getLastUpdated()); + assertEquals(expected.getExitStatus(), actual.getExitStatus()); + assertEquals(expected.getJobExecutionId(), actual.getJobExecutionId()); + assertTemporalEquals(expected.getCreateTime(), actual.getCreateTime()); + } + + @Test + void testCountStepExecutions() { + // Given + StepExecution stepExecution = dao.createStepExecution("step", jobExecution); + + // When + long result = dao.countStepExecutions(jobInstance, stepExecution.getStepName()); + + // Then + assertEquals(1, result); + } + + @Test + void testDeleteStepExecution() { + // Given + StepExecution stepExecution = dao.createStepExecution("step", jobExecution); + + // When + dao.deleteStepExecution(stepExecution); + + // Then + assertNull(dao.getStepExecution(stepExecution.getId())); + } + +} \ No newline at end of file