diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java index 0baee5fb5c..2cdb4e596c 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java @@ -42,6 +42,7 @@ * Batch in a declarative way through {@link EnableBatchProcessing}. * * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.0 * @see EnableBatchProcessing */ @@ -185,6 +186,11 @@ private void registerMongoJobRepository(BeanDefinitionRegistry registry, beanDefinitionBuilder.addPropertyValue("isolationLevelForCreate", isolationLevelForCreate); } + String collectionPrefix = mongoJobRepositoryAnnotation.collectionPrefix(); + if (collectionPrefix != null) { + beanDefinitionBuilder.addPropertyValue("collectionPrefix", collectionPrefix); + } + String jobKeyGeneratorRef = mongoJobRepositoryAnnotation.jobKeyGeneratorRef(); if (registry.containsBeanDefinition(jobKeyGeneratorRef)) { beanDefinitionBuilder.addPropertyReference("jobKeyGenerator", jobKeyGeneratorRef); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java index 922bca0192..e5f48163d8 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java @@ -98,4 +98,10 @@ */ String stepExecutionIncrementerRef() default "stepExecutionIncrementer"; + /** + * Set the prefix for MongoDB collection names. Defaults to {@literal BATCH_}. + * @return the collection prefix to use + */ + String collectionPrefix() default "BATCH_"; + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java index acbf26e6a6..c2f2d79a2a 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java @@ -24,24 +24,34 @@ import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; +import org.springframework.util.Assert; import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.query; /** * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoExecutionContextDao implements ExecutionContextDao { - private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; + private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION"; - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; + private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION"; private final MongoOperations mongoOperations; - public MongoExecutionContextDao(MongoOperations mongoOperations) { + private final String stepExecutionCollectionName; + + private final String jobExecutionCollectionName; + + public MongoExecutionContextDao(MongoOperations mongoOperations, String collectionPrefix) { + Assert.notNull(mongoOperations, "mongoOperations must not be null."); + Assert.notNull(collectionPrefix, "collectionPrefix must not be null."); this.mongoOperations = mongoOperations; + this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME; + this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME; } @Override @@ -49,7 +59,7 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) { Query query = query(where("jobExecutionId").is(jobExecution.getId())); org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findOne( query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionCollectionName); if (execution == null) { return new ExecutionContext(); } @@ -61,7 +71,7 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) { Query query = query(where("stepExecutionId").is(stepExecution.getId())); org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findOne( query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); if (execution == null) { return new ExecutionContext(); } @@ -77,8 +87,7 @@ public void saveExecutionContext(JobExecution jobExecution) { new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(), executionContext.isDirty())); this.mongoOperations.updateFirst(query, update, - org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionCollectionName); } @Override @@ -90,8 +99,7 @@ public void saveExecutionContext(StepExecution stepExecution) { new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(), executionContext.isDirty())); this.mongoOperations.updateFirst(query, update, - org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.StepExecution.class, stepExecutionCollectionName); } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java index 149c0555e3..04f72cf1df 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java @@ -34,25 +34,30 @@ /** * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoJobExecutionDao implements JobExecutionDao { - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; + private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION"; - private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ"; + private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "JOB_EXECUTION_SEQ"; private final MongoOperations mongoOperations; + private final String jobExecutionsCollectionName; + private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); private DataFieldMaxValueIncrementer jobExecutionIncrementer; private MongoJobInstanceDao jobInstanceDao; - public MongoJobExecutionDao(MongoOperations mongoOperations) { + public MongoJobExecutionDao(MongoOperations mongoOperations, String collectionPrefix) { this.mongoOperations = mongoOperations; - this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME); + this.jobExecutionsCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME; + this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, + collectionPrefix + JOB_EXECUTIONS_SEQUENCE_NAME); } public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { @@ -69,7 +74,7 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jo org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter .fromJobExecution(jobExecution); - this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.insert(jobExecutionToSave, jobExecutionsCollectionName); return jobExecution; } @@ -79,7 +84,7 @@ public void updateJobExecution(JobExecution jobExecution) { Query query = query(where("jobExecutionId").is(jobExecution.getId())); org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter .fromJobExecution(jobExecution); - this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, jobExecutionsCollectionName); } @Override @@ -87,7 +92,7 @@ public List findJobExecutions(JobInstance jobInstance) { Query query = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionsCollectionName); return jobExecutions.stream() .map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance)) .toList(); @@ -99,8 +104,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) { Sort.Order sortOrder = Sort.Order.desc("jobExecutionId"); org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne( query.with(Sort.by(sortOrder)), - org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionsCollectionName); return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null; } @@ -113,7 +117,7 @@ public Set findRunningJobExecutions(String jobName) { where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING")); this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME) + jobExecutionsCollectionName) .stream() .map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance)) .forEach(runningJobExecutions::add); @@ -126,7 +130,7 @@ public JobExecution getJobExecution(long executionId) { Query jobExecutionQuery = query(where("jobExecutionId").is(executionId)); org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne( jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionsCollectionName); if (jobExecution == null) { return null; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java index d7d9c564a3..5c5d474c9c 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java @@ -36,26 +36,31 @@ /** * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoJobInstanceDao implements JobInstanceDao { - private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE"; + private static final String COLLECTION_NAME = "JOB_INSTANCE"; - private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ"; + private static final String SEQUENCE_NAME = "JOB_INSTANCE_SEQ"; private final MongoOperations mongoOperations; + private final String collectionName; + private DataFieldMaxValueIncrementer jobInstanceIncrementer; private JobKeyGenerator jobKeyGenerator = new DefaultJobKeyGenerator(); private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter(); - public MongoJobInstanceDao(MongoOperations mongoOperations) { + public MongoJobInstanceDao(MongoOperations mongoOperations, String collectionPrefix) { Assert.notNull(mongoOperations, "mongoOperations must not be null."); + Assert.notNull(collectionPrefix, "collectionPrefix must not be null."); this.mongoOperations = mongoOperations; - this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME); + this.collectionName = collectionPrefix + COLLECTION_NAME; + this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, collectionPrefix + SEQUENCE_NAME); } public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) { @@ -79,7 +84,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters jobInstanceToSave.setJobKey(key); long instanceId = jobInstanceIncrementer.nextLongValue(); jobInstanceToSave.setJobInstanceId(instanceId); - this.mongoOperations.insert(jobInstanceToSave, COLLECTION_NAME); + this.mongoOperations.insert(jobInstanceToSave, this.collectionName); JobInstance jobInstance = new JobInstance(instanceId, jobName); jobInstance.incrementVersion(); // TODO is this needed? @@ -90,16 +95,16 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters public JobInstance getJobInstance(String jobName, JobParameters jobParameters) { String key = this.jobKeyGenerator.generateKey(jobParameters); Query query = query(where("jobName").is(jobName).and("jobKey").is(key)); - org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations - .findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( + query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName); return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; } @Override public JobInstance getJobInstance(long instanceId) { Query query = query(where("jobInstanceId").is(instanceId)); - org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations - .findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( + query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName); return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; } @@ -114,7 +119,7 @@ public List getJobInstances(String jobName, int start, int count) { Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); List jobInstances = this.mongoOperations .find(query.with(Sort.by(sortOrder)), - org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .toList(); return jobInstances.subList(start, jobInstances.size()) @@ -134,7 +139,7 @@ public List getJobInstances(String jobName, int start, int count) { public List getJobInstances(String jobName) { Query query = query(where("jobName").is(jobName)); return this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(this.jobInstanceConverter::toJobInstance) .toList(); @@ -144,7 +149,7 @@ public List getJobInstances(String jobName) { public List getJobInstanceIds(String jobName) { Query query = query(where("jobName").is(jobName)); return this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(org.springframework.batch.core.repository.persistence.JobInstance::getJobInstanceId) .toList(); @@ -153,7 +158,7 @@ public List getJobInstanceIds(String jobName) { public List findJobInstancesByName(String jobName) { Query query = query(where("jobName").is(jobName)); return this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(this.jobInstanceConverter::toJobInstance) .toList(); @@ -165,14 +170,14 @@ public JobInstance getLastJobInstance(String jobName) { Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( query.with(Sort.by(sortOrder)), org.springframework.batch.core.repository.persistence.JobInstance.class, - COLLECTION_NAME); + this.collectionName); return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; } @Override public List getJobNames() { return this.mongoOperations - .findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(org.springframework.batch.core.repository.persistence.JobInstance::getJobName) .toList(); @@ -195,7 +200,7 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException { throw new NoSuchJobException("Job not found " + jobName); } Query query = query(where("jobName").is(jobName)); - return this.mongoOperations.count(query, COLLECTION_NAME); + return this.mongoOperations.count(query, this.collectionName); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index d7ed32b519..371b66cfa8 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -37,15 +37,20 @@ /** * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoStepExecutionDao implements StepExecutionDao { - private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; + private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION"; - private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ"; + private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "STEP_EXECUTION_SEQ"; - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; + private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION"; + + private final String stepExecutionCollectionName; + + private final String jobExecutionCollectionName; private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter(); @@ -57,9 +62,12 @@ public class MongoStepExecutionDao implements StepExecutionDao { MongoJobExecutionDao jobExecutionDao; - public MongoStepExecutionDao(MongoOperations mongoOperations) { + public MongoStepExecutionDao(MongoOperations mongoOperations, String collectionPrefix) { this.mongoOperations = mongoOperations; - this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME); + this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME; + this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME; + this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, + collectionPrefix + STEP_EXECUTIONS_SEQUENCE_NAME); } public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) { @@ -76,7 +84,7 @@ public StepExecution createStepExecution(String stepName, JobExecution jobExecut StepExecution stepExecution = new StepExecution(id, stepName, jobExecution); org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToSave = this.stepExecutionConverter .fromStepExecution(stepExecution); - this.mongoOperations.insert(stepExecutionToSave, STEP_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.insert(stepExecutionToSave, stepExecutionCollectionName); return stepExecution; } @@ -86,7 +94,7 @@ public void updateStepExecution(StepExecution stepExecution) { Query query = query(where("stepExecutionId").is(stepExecution.getId())); org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToUpdate = this.stepExecutionConverter .fromStepExecution(stepExecution); - this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, STEP_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, stepExecutionCollectionName); } @Nullable @@ -95,7 +103,7 @@ public StepExecution getStepExecution(long stepExecutionId) { Query query = query(where("stepExecutionId").is(stepExecutionId)); org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations .findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); JobExecution jobExecution = jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId()); return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null; } @@ -106,7 +114,7 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut Query query = query(where("stepExecutionId").is(stepExecutionId)); org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations .findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null; } @@ -118,7 +126,7 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa Query query = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionCollectionName); for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) { stepExecutions.addAll(jobExecution.getStepExecutions()); } @@ -154,7 +162,7 @@ public List getStepExecutions(JobExecution jobExecution) { Query query = query(where("jobExecutionId").is(jobExecution.getId())); return this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME) + stepExecutionCollectionName) .stream() .map(stepExecution -> this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution)) .toList(); @@ -167,7 +175,7 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) { Query query = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionCollectionName); for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) { List stepExecutions = jobExecution .getStepExecutions(); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java index 13997a0fd7..415fd2474b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java @@ -38,6 +38,7 @@ * "batch.version") * * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 * @deprecated since 6.0 in favor of {@link MongoJobRepositoryFactoryBean}. Scheduled for * removal in 6.2 or later. @@ -47,28 +48,34 @@ public class MongoJobExplorerFactoryBean extends AbstractJobExplorerFactoryBean private MongoOperations mongoOperations; + private String collectionPrefix; + public void setMongoOperations(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; } + public void setCollectionPrefix(String collectionPrefix) { + this.collectionPrefix = collectionPrefix; + } + @Override protected JobInstanceDao createJobInstanceDao() { - return new MongoJobInstanceDao(this.mongoOperations); + return new MongoJobInstanceDao(this.mongoOperations, this.collectionPrefix); } @Override protected JobExecutionDao createJobExecutionDao() { - return new MongoJobExecutionDao(this.mongoOperations); + return new MongoJobExecutionDao(this.mongoOperations, this.collectionPrefix); } @Override protected StepExecutionDao createStepExecutionDao() { - return new MongoStepExecutionDao(this.mongoOperations); + return new MongoStepExecutionDao(this.mongoOperations, this.collectionPrefix); } @Override protected ExecutionContextDao createExecutionContextDao() { - return new MongoExecutionContextDao(this.mongoOperations); + return new MongoExecutionContextDao(this.mongoOperations, this.collectionPrefix); } @Override diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java index 75e309f70b..df28b4d370 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java @@ -37,6 +37,7 @@ * "batch.version") * * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoJobRepositoryFactoryBean extends AbstractJobRepositoryFactoryBean implements InitializingBean { @@ -49,6 +50,8 @@ public class MongoJobRepositoryFactoryBean extends AbstractJobRepositoryFactoryB private @Nullable DataFieldMaxValueIncrementer stepExecutionIncrementer; + private @Nullable String collectionPrefix; + public void setMongoOperations(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; } @@ -65,6 +68,10 @@ public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecuti this.stepExecutionIncrementer = stepExecutionIncrementer; } + public void setCollectionPrefix(String collectionPrefix) { + this.collectionPrefix = collectionPrefix; + } + @Override protected Object getTarget() throws Exception { MongoJobInstanceDao jobInstanceDao = createJobInstanceDao(); @@ -78,7 +85,7 @@ protected Object getTarget() throws Exception { @Override protected MongoJobInstanceDao createJobInstanceDao() { - MongoJobInstanceDao mongoJobInstanceDao = new MongoJobInstanceDao(this.mongoOperations); + MongoJobInstanceDao mongoJobInstanceDao = new MongoJobInstanceDao(this.mongoOperations, this.collectionPrefix); mongoJobInstanceDao.setJobKeyGenerator(this.jobKeyGenerator); mongoJobInstanceDao.setJobInstanceIncrementer(this.jobInstanceIncrementer); return mongoJobInstanceDao; @@ -86,21 +93,23 @@ protected MongoJobInstanceDao createJobInstanceDao() { @Override protected MongoJobExecutionDao createJobExecutionDao() { - MongoJobExecutionDao mongoJobExecutionDao = new MongoJobExecutionDao(this.mongoOperations); + MongoJobExecutionDao mongoJobExecutionDao = new MongoJobExecutionDao(this.mongoOperations, + this.collectionPrefix); mongoJobExecutionDao.setJobExecutionIncrementer(this.jobExecutionIncrementer); return mongoJobExecutionDao; } @Override protected MongoStepExecutionDao createStepExecutionDao() { - MongoStepExecutionDao mongoStepExecutionDao = new MongoStepExecutionDao(this.mongoOperations); + MongoStepExecutionDao mongoStepExecutionDao = new MongoStepExecutionDao(this.mongoOperations, + this.collectionPrefix); mongoStepExecutionDao.setStepExecutionIncrementer(this.stepExecutionIncrementer); return mongoStepExecutionDao; } @Override protected MongoExecutionContextDao createExecutionContextDao() { - return new MongoExecutionContextDao(this.mongoOperations); + return new MongoExecutionContextDao(this.mongoOperations, this.collectionPrefix); } @Override diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBCollectionPrefixTestConfiguration.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBCollectionPrefixTestConfiguration.java new file mode 100644 index 0000000000..a6933a1dab --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBCollectionPrefixTestConfiguration.java @@ -0,0 +1,95 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import org.springframework.batch.core.job.Job; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.EnableMongoJobRepository; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.MongoDatabaseFactory; +import org.springframework.data.mongodb.MongoTransactionManager; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Test configuration for MongoDB collection prefix functionality + * + * @author Myeongha Shin + */ +@Configuration +@EnableBatchProcessing +@EnableMongoJobRepository(collectionPrefix = "TEST_COLLECTION_PREFIX_") +class MongoDBCollectionPrefixTestConfiguration { + + private static final DockerImageName MONGODB_IMAGE = DockerImageName.parse("mongo:8.0.11"); + + @Bean(initMethod = "start") + public MongoDBContainer mongoDBContainer() { + return new MongoDBContainer(MONGODB_IMAGE); + } + + @Bean + public MongoDatabaseFactory mongoDatabaseFactory(MongoDBContainer mongoDBContainer) { + return new SimpleMongoClientDatabaseFactory(mongoDBContainer.getConnectionString() + "/test"); + } + + @Bean + public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager) + throws Exception { + MongoJobRepositoryFactoryBean jobRepositoryFactoryBean = new MongoJobRepositoryFactoryBean(); + jobRepositoryFactoryBean.setMongoOperations(mongoTemplate); + jobRepositoryFactoryBean.setTransactionManager(transactionManager); + jobRepositoryFactoryBean.afterPropertiesSet(); + return jobRepositoryFactoryBean.getObject(); + } + + @Bean + public MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) { + MongoTemplate template = new MongoTemplate(mongoDatabaseFactory); + MappingMongoConverter converter = (MappingMongoConverter) template.getConverter(); + converter.setMapKeyDotReplacement("."); + return template; + } + + @Bean + public MongoTransactionManager transactionManager(MongoDatabaseFactory mongoDatabaseFactory) { + MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(); + mongoTransactionManager.setDatabaseFactory(mongoDatabaseFactory); + mongoTransactionManager.afterPropertiesSet(); + return mongoTransactionManager; + } + + @Bean + public Job job(JobRepository jobRepository, MongoTransactionManager transactionManager) { + return new JobBuilder("job", jobRepository) + .start(new StepBuilder("step1", jobRepository) + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager) + .build()) + .next(new StepBuilder("step2", jobRepository) + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager) + .build()) + .build(); + } + +} \ No newline at end of file diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryCollectionPrefixTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryCollectionPrefixTests.java new file mode 100644 index 0000000000..24ea729a8d --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryCollectionPrefixTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import java.time.LocalDateTime; +import java.util.Map; + +import com.mongodb.client.MongoCollection; +import org.bson.Document; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.job.Job; +import org.springframework.batch.core.job.JobExecution; +import org.springframework.batch.core.job.parameters.JobParameters; +import org.springframework.batch.core.job.parameters.JobParametersBuilder; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoTemplate; + +/** + * Test for MongoDB collection prefix functionality + * + * @author Myeongha Shin + */ +@DirtiesContext +@Testcontainers(disabledWithoutDocker = true) +@SpringJUnitConfig(MongoDBCollectionPrefixTestConfiguration.class) +public class MongoDBJobRepositoryCollectionPrefixTests { + + @Autowired + private MongoTemplate mongoTemplate; + + private static final String PREFIX = "TEST_COLLECTION_PREFIX_"; + + @BeforeEach + public void setUp() { + // collections with custom prefix + mongoTemplate.createCollection(PREFIX + "JOB_INSTANCE"); + mongoTemplate.createCollection(PREFIX + "JOB_EXECUTION"); + mongoTemplate.createCollection(PREFIX + "STEP_EXECUTION"); + // sequences + mongoTemplate.createCollection("BATCH_SEQUENCES"); + mongoTemplate.getCollection("BATCH_SEQUENCES") + .insertOne(new Document(Map.of("_id", PREFIX + "JOB_INSTANCE_SEQ", "count", 0L))); + mongoTemplate.getCollection("BATCH_SEQUENCES") + .insertOne(new Document(Map.of("_id", PREFIX + "JOB_EXECUTION_SEQ", "count", 0L))); + mongoTemplate.getCollection("BATCH_SEQUENCES") + .insertOne(new Document(Map.of("_id", PREFIX + "STEP_EXECUTION_SEQ", "count", 0L))); + } + + @Test + void testJobExecutionWithCustomPrefix(@Autowired JobOperator jobOperator, @Autowired Job job) throws Exception { + // given + JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo") + .addLocalDateTime("runtime", LocalDateTime.now()) + .toJobParameters(); + + // when + JobExecution jobExecution = jobOperator.start(job, jobParameters); + + // then + Assertions.assertNotNull(jobExecution); + Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); + + // Verify data is stored in collections with custom prefix + MongoCollection jobInstancesCollection = mongoTemplate.getCollection(PREFIX + "JOB_INSTANCE"); + MongoCollection jobExecutionsCollection = mongoTemplate.getCollection(PREFIX + "JOB_EXECUTION"); + MongoCollection stepExecutionsCollection = mongoTemplate.getCollection(PREFIX + "STEP_EXECUTION"); + + Assertions.assertEquals(1, jobInstancesCollection.countDocuments()); + Assertions.assertEquals(1, jobExecutionsCollection.countDocuments()); + Assertions.assertEquals(2, stepExecutionsCollection.countDocuments()); + + // Verify default collections are empty + Assertions.assertFalse(mongoTemplate.collectionExists("BATCH_JOB_INSTANCE")); + Assertions.assertFalse(mongoTemplate.collectionExists("BATCH_JOB_EXECUTION")); + Assertions.assertFalse(mongoTemplate.collectionExists("BATCH_STEP_EXECUTION")); + + System.out.println("✅ Collection prefix test passed! Data stored in: " + PREFIX + "* collections"); + } + +} \ No newline at end of file diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java index 5db103725c..b7a4d805b6 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java @@ -136,7 +136,7 @@ static class ExecutionContextDaoConfiguration { @Bean ExecutionContextDao executionContextDao(MongoOperations mongoOperations) { - return new MongoExecutionContextDao(mongoOperations); + return new MongoExecutionContextDao(mongoOperations, "BATCH_"); } }