Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package org.springframework.batch.core.repository.dao.mongodb;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -23,17 +28,20 @@
import org.springframework.batch.core.job.JobInstance;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.repository.dao.JobExecutionDao;
import org.springframework.batch.core.repository.persistence.JobParameter;
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
import org.springframework.util.CollectionUtils;

import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;

/**
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
* @since 5.2.0
*/
public class MongoJobExecutionDao implements JobExecutionDao {
Expand Down Expand Up @@ -84,13 +92,12 @@ public void updateJobExecution(JobExecution jobExecution) {

@Override
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
Query query = query(where("jobInstanceId").is(jobInstance.getId()))
.with(Sort.by(Sort.Direction.DESC, "jobExecutionId"));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
return jobExecutions.stream()
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
.toList();
return jobExecutions.stream().map(jobExecution -> convert(jobExecution, jobInstance)).toList();
}

@Override
Expand All @@ -101,7 +108,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
query.with(Sort.by(sortOrder)),
org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null;
return jobExecution != null ? convert(jobExecution, jobInstance) : null;
}

@Override
Expand All @@ -115,7 +122,7 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME)
.stream()
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
.map(jobExecution -> convert(jobExecution, jobInstance))
.forEach(runningJobExecutions::add);
}
return runningJobExecutions;
Expand All @@ -132,7 +139,7 @@ public JobExecution getJobExecution(long executionId) {
}
org.springframework.batch.core.job.JobInstance jobInstance = this.jobInstanceDao
.getJobInstance(jobExecution.getJobInstanceId());
return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
return convert(jobExecution, jobInstance);
}

@Override
Expand All @@ -146,4 +153,43 @@ public void synchronizeStatus(JobExecution jobExecution) {
// synchronizeStatus
}

@Override
public void deleteJobExecution(JobExecution jobExecution) {
this.mongoOperations.remove(query(where("jobExecutionId").is(jobExecution.getId())),
JOB_EXECUTIONS_COLLECTION_NAME);

}

private JobExecution convert(org.springframework.batch.core.repository.persistence.JobExecution jobExecution,
org.springframework.batch.core.job.JobInstance jobInstance) {
Set<JobParameter<?>> parameters = jobExecution.getJobParameters();
if (!CollectionUtils.isEmpty(parameters)) {
// MongoDB restore temporal value as Date
Set<JobParameter<?>> converted = new HashSet<>();
for (JobParameter<?> parameter : parameters) {
if (LocalDate.class.getName().equals(parameter.type()) && parameter.value() instanceof Date date) {
converted.add(new JobParameter<>(parameter.name(),
date.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(), parameter.type(),
parameter.identifying()));
}
else if (LocalTime.class.getName().equals(parameter.type()) && parameter.value() instanceof Date date) {
converted.add(new JobParameter<>(parameter.name(),
date.toInstant().atZone(ZoneId.systemDefault()).toLocalTime(), parameter.type(),
parameter.identifying()));
}
else if (LocalDateTime.class.getName().equals(parameter.type())
&& parameter.value() instanceof Date date) {
converted.add(new JobParameter<>(parameter.name(),
date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(), parameter.type(),
parameter.identifying()));
}
else {
converted.add(parameter);
}
}
jobExecution.setJobParameters(converted);
}
return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.batch.core.repository.dao.mongodb;

import java.util.Collections;
import java.util.List;

import org.springframework.batch.core.job.DefaultJobKeyGenerator;
Expand All @@ -36,6 +37,7 @@

/**
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
* @since 5.2.0
*/
public class MongoJobInstanceDao implements JobInstanceDao {
Expand Down Expand Up @@ -117,7 +119,10 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
.stream()
.toList();
return jobInstances.subList(start, jobInstances.size())
if (jobInstances.size() <= start) {
return Collections.emptyList();
}
return jobInstances.subList(start, Math.min(jobInstances.size(), start + jobInstances.size()))
.stream()
.map(this.jobInstanceConverter::toJobInstance)
.limit(count)
Expand Down Expand Up @@ -198,4 +203,9 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException {
return this.mongoOperations.count(query, COLLECTION_NAME);
}

@Override
public void deleteJobInstance(JobInstance jobInstance) {
this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), COLLECTION_NAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

/**
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
* @since 5.2.0
*/
public class MongoStepExecutionDao implements StepExecutionDao {
Expand Down Expand Up @@ -95,8 +96,8 @@ public StepExecution getStepExecution(long stepExecutionId) {
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations
.findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
JobExecution jobExecution = jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId());
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null;
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution,
jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId())) : null;
}

@Deprecated(since = "6.0", forRemoval = true)
Expand Down Expand Up @@ -163,24 +164,22 @@ public List<StepExecution> getStepExecutions(JobExecution jobExecution) {

@Override
public long countStepExecutions(JobInstance jobInstance, String stepName) {
long count = 0;
// TODO optimize the count query
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = jobExecution
.getStepExecutions();
for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) {
if (stepExecution.getName().equals(stepName)) {
count++;
}
}
}
return count;
return this.mongoOperations.count(
query(where("jobExecutionId").in(jobExecutions.stream()
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
.toList())),
org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
}

// TODO implement deleteStepExecution(StepExecution stepExecution)
@Override
public void deleteStepExecution(StepExecution stepExecution) {
this.mongoOperations.remove(query(where("stepExecutionId").is(stepExecution.getId())),
STEP_EXECUTIONS_COLLECTION_NAME);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2008-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.repository.support;

import org.junit.jupiter.api.BeforeEach;
import org.springframework.batch.core.repository.dao.mongodb.MongoExecutionContextDao;
import org.springframework.batch.core.repository.dao.mongodb.MongoJobExecutionDao;
import org.springframework.batch.core.repository.dao.mongodb.MongoJobInstanceDao;
import org.springframework.batch.core.repository.dao.mongodb.MongoStepExecutionDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.nio.file.Files;
import java.util.stream.Stream;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;

import static org.junit.jupiter.api.Assertions.*;

/**
* @author Yanming Zhou
*/
@Testcontainers(disabledWithoutDocker = true)
@SpringJUnitConfig({ MongoDBIntegrationTestConfiguration.class,
AbstractMongoDBDaoIntegrationTests.MongoDBDaoConfiguration.class })
abstract class AbstractMongoDBDaoIntegrationTests {

@BeforeEach
void setUp(@Autowired MongoTemplate mongoTemplate) throws IOException {
mongoTemplate.dropCollection("BATCH_JOB_INSTANCE");
mongoTemplate.dropCollection("BATCH_JOB_EXECUTION");
mongoTemplate.dropCollection("BATCH_STEP_EXECUTION");
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
// sequences
mongoTemplate.dropCollection("BATCH_SEQUENCES");
Resource resource = new FileSystemResource(
"src/main/resources/org/springframework/batch/core/schema-mongodb.jsonl");
try (Stream<String> lines = Files.lines(resource.getFilePath())) {
lines.forEach(mongoTemplate::executeCommand);
}
}

protected void assertTemporalEquals(LocalDateTime lhs, LocalDateTime rhs) {
assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs,
rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null);
}

protected void assertTemporalEquals(LocalTime lhs, LocalTime rhs) {
assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs,
rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null);
}

@Configuration
static class MongoDBDaoConfiguration {

@Bean
MongoJobInstanceDao jobInstanceDao(MongoOperations mongoOperations) {
return new MongoJobInstanceDao(mongoOperations);
}

@Bean
MongoJobExecutionDao jobExecutionDao(MongoOperations mongoOperations, MongoJobInstanceDao jobInstanceDao) {
MongoJobExecutionDao jobExecutionDao = new MongoJobExecutionDao(mongoOperations);
jobExecutionDao.setJobInstanceDao(jobInstanceDao);
return jobExecutionDao;
}

@Bean
MongoStepExecutionDao stepExecutionDao(MongoOperations mongoOperations, MongoJobExecutionDao jobExecutionDao) {
MongoStepExecutionDao stepExecutionDao = new MongoStepExecutionDao(mongoOperations);
stepExecutionDao.setJobExecutionDao(jobExecutionDao);
return stepExecutionDao;
}

@Bean
MongoExecutionContextDao executionContextDao(MongoOperations mongoOperations) {
return new MongoExecutionContextDao(mongoOperations);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package org.springframework.batch.core.repository.support;

import java.io.IOException;
import java.nio.file.Files;
import java.time.LocalDateTime;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.JobExecution;
Expand All @@ -29,19 +26,8 @@
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.dao.ExecutionContextDao;
import org.springframework.batch.core.repository.dao.mongodb.MongoExecutionContextDao;
import org.springframework.batch.core.repository.support.MongoExecutionContextDaoIntegrationTests.ExecutionContextDaoConfiguration;
import org.springframework.batch.infrastructure.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.testcontainers.junit.jupiter.Testcontainers;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -51,17 +37,7 @@
* @author Henning Pöttker
* @author Yanming Zhou
*/
@DirtiesContext
@Testcontainers(disabledWithoutDocker = true)
@SpringJUnitConfig({ MongoDBIntegrationTestConfiguration.class, ExecutionContextDaoConfiguration.class })
public class MongoExecutionContextDaoIntegrationTests {

@BeforeAll
static void setUp(@Autowired MongoTemplate mongoTemplate) throws IOException {
Resource resource = new FileSystemResource(
"src/main/resources/org/springframework/batch/core/schema-mongodb.jsonl");
Files.lines(resource.getFilePath()).forEach(line -> mongoTemplate.executeCommand(line));
}
public class MongoExecutionContextDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests {

@Test
void testGetJobExecutionWithEmptyResult(@Autowired ExecutionContextDao executionContextDao) {
Expand Down Expand Up @@ -131,14 +107,4 @@ void testSaveStepExecution(@Autowired JobOperator jobOperator, @Autowired Job jo
assertEquals("bar", actual.get("foo"));
}

@Configuration
static class ExecutionContextDaoConfiguration {

@Bean
ExecutionContextDao executionContextDao(MongoOperations mongoOperations) {
return new MongoExecutionContextDao(mongoOperations);
}

}

}
Loading
Loading