diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index a7bac8ce26..0f1d06708e 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -15,11 +15,8 @@ */ package org.springframework.batch.core.repository.dao.mongodb; -import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.List; -import java.util.Optional; import org.springframework.batch.core.job.JobExecution; import org.springframework.batch.core.job.JobInstance; @@ -27,6 +24,7 @@ import org.springframework.batch.core.repository.dao.StepExecutionDao; import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter; import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter; +import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.Query; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; @@ -36,6 +34,7 @@ /** * @author Mahmoud Ben Hassine + * @author Jinwoo Bae * @since 5.2.0 */ public class MongoStepExecutionDao implements StepExecutionDao { @@ -100,34 +99,42 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut @Override public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { // TODO optimize the query - // get all step executions - List stepExecutions = new ArrayList<>(); - Query query = query(where("jobInstanceId").is(jobInstance.getId())); + Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, + .find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class, JOB_EXECUTIONS_COLLECTION_NAME); - for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) { - stepExecutions.addAll(jobExecution.getStepExecutions()); - } - // sort step executions by creation date then id (see contract) and return the - // first one - Optional lastStepExecution = stepExecutions - .stream() - .filter(stepExecution -> stepExecution.getName().equals(stepName)) - .min(Comparator - .comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime) - .thenComparing(org.springframework.batch.core.repository.persistence.StepExecution::getId)); - if (lastStepExecution.isPresent()) { - org.springframework.batch.core.repository.persistence.StepExecution stepExecution = lastStepExecution.get(); - JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream() - .filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId())) - .findFirst() - .get(), jobInstance); - return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution); + + if (jobExecutions.isEmpty()) { + return null; } - else { + + List jobExecutionIds = jobExecutions.stream() + .map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId) + .toList(); + + Query stepExecutionQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds)) + .with(Sort.by(Sort.Direction.DESC, "createTime", "stepExecutionId")) + .limit(1); + + org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations + .findOne(stepExecutionQuery, org.springframework.batch.core.repository.persistence.StepExecution.class, + STEP_EXECUTIONS_COLLECTION_NAME); + + if (stepExecution == null) { return null; } + + org.springframework.batch.core.repository.persistence.JobExecution jobExecution = jobExecutions.stream() + .filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId())) + .findFirst() + .orElse(null); + + if (jobExecution != null) { + JobExecution jobExecutionDomain = this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance); + return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecutionDomain); + } + + return null; } @Override @@ -144,22 +151,23 @@ public void addStepExecutions(JobExecution jobExecution) { @Override public long countStepExecutions(JobInstance jobInstance, String stepName) { - long count = 0; - // TODO optimize the count query - Query query = query(where("jobInstanceId").is(jobInstance.getId())); - List jobExecutions = this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); - for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) { - List stepExecutions = jobExecution - .getStepExecutions(); - for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) { - if (stepExecution.getName().equals(stepName)) { - count++; - } - } + Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId())); + List jobExecutionIds = this.mongoOperations + .find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class, + JOB_EXECUTIONS_COLLECTION_NAME) + .stream() + .map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId) + .toList(); + + if (jobExecutionIds.isEmpty()) { + return 0; } - return count; + + // Count step executions directly from BATCH_STEP_EXECUTION collection + Query stepQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds)); + return this.mongoOperations.count(stepQuery, + org.springframework.batch.core.repository.persistence.StepExecution.class, + STEP_EXECUTIONS_COLLECTION_NAME); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBIntegrationTestConfiguration.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBIntegrationTestConfiguration.java index 93d2be8a57..b7d890a57e 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBIntegrationTestConfiguration.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBIntegrationTestConfiguration.java @@ -19,6 +19,7 @@ import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.StepExecutionDao; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; @@ -56,6 +57,11 @@ public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransaction return jobRepositoryFactoryBean.getObject(); } + @Bean + public StepExecutionDao stepExecutionDao(MongoTemplate mongoTemplate) { + return new org.springframework.batch.core.repository.dao.mongodb.MongoStepExecutionDao(mongoTemplate); + } + @Bean public MongoDatabaseFactory mongoDatabaseFactory(MongoDBContainer mongoDBContainer) { return new SimpleMongoClientDatabaseFactory(mongoDBContainer.getConnectionString() + "/test"); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java index 9e72075de4..0c36ed290c 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java @@ -16,6 +16,7 @@ package org.springframework.batch.core.repository.support; import java.time.LocalDateTime; +import java.util.Collections; import java.util.Map; import com.mongodb.client.MongoCollection; @@ -29,17 +30,26 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.testcontainers.junit.jupiter.Testcontainers; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.job.Job; import org.springframework.batch.core.job.JobExecution; +import org.springframework.batch.core.job.JobInstance; import org.springframework.batch.core.job.parameters.JobParameters; import org.springframework.batch.core.job.parameters.JobParametersBuilder; import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.StepExecutionDao; +import org.springframework.batch.core.step.StepExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; /** * @author Mahmoud Ben Hassine + * @author Jinwoo Bae * @author Yanming Zhou */ @DirtiesContext @@ -53,18 +63,25 @@ public class MongoDBJobRepositoryIntegrationTests { @SuppressWarnings("removal") @BeforeEach public void setUp() { - // collections + // Clear existing collections to ensure clean state + mongoTemplate.getCollection("BATCH_JOB_INSTANCE").drop(); + mongoTemplate.getCollection("BATCH_JOB_EXECUTION").drop(); + mongoTemplate.getCollection("BATCH_STEP_EXECUTION").drop(); + mongoTemplate.getCollection("BATCH_SEQUENCES").drop(); + + // sequences mongoTemplate.createCollection("BATCH_JOB_INSTANCE"); mongoTemplate.createCollection("BATCH_JOB_EXECUTION"); mongoTemplate.createCollection("BATCH_STEP_EXECUTION"); - // sequences mongoTemplate.createCollection("BATCH_SEQUENCES"); + mongoTemplate.getCollection("BATCH_SEQUENCES") .insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L))); mongoTemplate.getCollection("BATCH_SEQUENCES") .insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L))); mongoTemplate.getCollection("BATCH_SEQUENCES") .insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L))); + // indices mongoTemplate.indexOps("BATCH_JOB_INSTANCE") .ensureIndex(new Index().on("jobName", Sort.Direction.ASC).named("job_name_idx")); @@ -112,6 +129,58 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th dump(stepExecutionsCollection, "step execution = "); } + /** + * Test for GitHub issue #4943: getLastStepExecution should work when JobExecution's + * embedded stepExecutions array is empty. + * + *

+ * This can happen after abrupt shutdown when the embedded stepExecutions array is not + * synchronized, but BATCH_STEP_EXECUTION collection still contains the data. + * + */ + @Test + void testGetLastStepExecutionWithEmptyEmbeddedArray(@Autowired JobOperator jobOperator, @Autowired Job job, + @Autowired StepExecutionDao stepExecutionDao) throws Exception { + // Step 1: Run job normally + JobParameters jobParameters = new JobParametersBuilder().addString("name", "emptyArrayTest") + .addLocalDateTime("runtime", LocalDateTime.now()) + .toJobParameters(); + + JobExecution jobExecution = jobOperator.start(job, jobParameters); + JobInstance jobInstance = jobExecution.getJobInstance(); + + // Verify job completed successfully + Assertions.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()); + + // Step 2: Simulate the core issue - clear embedded stepExecutions array + // while keeping BATCH_STEP_EXECUTION collection intact + Query jobQuery = new Query(Criteria.where("jobExecutionId").is(jobExecution.getId())); + Update jobUpdate = new Update().set("stepExecutions", Collections.emptyList()); + mongoTemplate.updateFirst(jobQuery, jobUpdate, "BATCH_JOB_EXECUTION"); + + // Step 3: Verify embedded array is empty but collection still has data + MongoCollection jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION"); + MongoCollection stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION"); + + Document jobDoc = jobExecutionsCollection.find(new Document("jobExecutionId", jobExecution.getId())).first(); + Assertions.assertTrue(jobDoc.getList("stepExecutions", Document.class).isEmpty(), + "Embedded stepExecutions array should be empty"); + Assertions.assertEquals(2, stepExecutionsCollection.countDocuments(), + "BATCH_STEP_EXECUTION collection should still contain data"); + + // Step 4: Test the fix - getLastStepExecution should work despite empty embedded + // array + StepExecution lastStepExecution = stepExecutionDao.getLastStepExecution(jobInstance, "step1"); + Assertions.assertNotNull(lastStepExecution, + "getLastStepExecution should find step execution even with empty embedded array"); + Assertions.assertEquals("step1", lastStepExecution.getStepName()); + Assertions.assertEquals(BatchStatus.COMPLETED, lastStepExecution.getStatus()); + + // Step 5: Test countStepExecutions also works + long stepCount = stepExecutionDao.countStepExecutions(jobInstance, "step1"); + Assertions.assertEquals(1L, stepCount, "countStepExecutions should work despite empty embedded array"); + } + private static void dump(MongoCollection collection, String prefix) { for (Document document : collection.find()) { System.out.println(prefix + document.toJson());