diff --git a/spring-cloud-task-core/pom.xml b/spring-cloud-task-core/pom.xml index 7cfa0fcbb..88221a387 100755 --- a/spring-cloud-task-core/pom.xml +++ b/spring-cloud-task-core/pom.xml @@ -85,6 +85,16 @@ spring-integration-jdbc true + + org.mongodb + mongodb-driver-sync + true + + + org.springframework.data + spring-data-mongodb + true + jakarta.platform jakarta.jakartaee-api @@ -135,6 +145,16 @@ micrometer-test test + + org.testcontainers + mongodb + test + + + org.testcontainers + junit-jupiter + test + io.micrometer micrometer-observation-test @@ -180,11 +200,6 @@ testcontainers test - - org.testcontainers - junit-jupiter - test - org.testcontainers mariadb diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/configuration/MongoTaskAutoConfiguration.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/configuration/MongoTaskAutoConfiguration.java new file mode 100644 index 000000000..a508dc86d --- /dev/null +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/configuration/MongoTaskAutoConfiguration.java @@ -0,0 +1,104 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.configuration; + +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.task.repository.dao.MongoLockRepository; +import org.springframework.cloud.task.repository.support.MongoTaskRepositoryInitializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * Auto-configuration for MongoDB-based Task repository. + * This configuration is activated when MongoDB is available and configured, + * and when the appropriate properties are set to enable MongoDB task storage. + * + * @author JongJun Kim + */ +@AutoConfiguration +@ConditionalOnClass({ MongoOperations.class }) +@ConditionalOnBean(MongoOperations.class) +@ConditionalOnProperty(prefix = "spring.cloud.task", name = "repository-type", havingValue = "mongodb", matchIfMissing = false) +@EnableConfigurationProperties(TaskProperties.class) +public class MongoTaskAutoConfiguration { + + /** + * MongoDB Repository Initializer configuration. + */ + @Bean + @ConditionalOnMissingBean(MongoTaskRepositoryInitializer.class) + public MongoTaskRepositoryInitializer mongoTaskRepositoryInitializer(MongoOperations mongoOperations, + TaskProperties taskProperties) { + return new MongoTaskRepositoryInitializer(mongoOperations, taskProperties); + } + + /** + * MongoDB Lock Registry configuration for single instance task execution. + */ + @Bean + @ConditionalOnMissingBean(LockRegistry.class) + @ConditionalOnProperty(prefix = "spring.cloud.task", name = "single-instance-enabled", havingValue = "true") + public LockRegistry mongoLockRegistry(MongoOperations mongoOperations, TaskProperties taskProperties) { + return new MongoLockRepository(mongoOperations, taskProperties); + } + + /** + * Configuration for MongoDB Task components when no custom {@link TaskConfigurer} is provided. + */ + @Configuration + @ConditionalOnMissingBean(TaskConfigurer.class) + public static class MongoTaskConfigurerConfiguration { + + /** + * Creates a {@link MongoTaskConfigurer} with transaction manager when MongoDB is + * the configured repository type, no custom {@link TaskConfigurer} bean exists, + * and a {@link PlatformTransactionManager} is available. + * @param mongoOperations the {@link MongoOperations} instance + * @param taskProperties the {@link TaskProperties} configuration + * @param transactionManager the {@link PlatformTransactionManager} for transaction support + * @return a configured {@link MongoTaskConfigurer} with transaction support + */ + @Bean + @ConditionalOnMissingBean + @ConditionalOnBean(PlatformTransactionManager.class) + public TaskConfigurer taskConfigurer(MongoOperations mongoOperations, + TaskProperties taskProperties, PlatformTransactionManager transactionManager) { + return new MongoTaskConfigurer(mongoOperations, taskProperties, transactionManager); + } + + /** + * Creates a {@link MongoTaskConfigurer} when MongoDB is the configured repository type + * and no custom {@link TaskConfigurer} bean exists. + * @param mongoOperations the {@link MongoOperations} instance + * @param taskProperties the {@link TaskProperties} configuration + * @return a configured {@link MongoTaskConfigurer} + */ + @Bean + @ConditionalOnMissingBean + public TaskConfigurer taskConfigurerWithoutTransactionManager(MongoOperations mongoOperations, TaskProperties taskProperties) { + return new MongoTaskConfigurer(mongoOperations, taskProperties); + } + } +} diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/configuration/MongoTaskConfigurer.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/configuration/MongoTaskConfigurer.java new file mode 100644 index 000000000..510b6e27e --- /dev/null +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/configuration/MongoTaskConfigurer.java @@ -0,0 +1,106 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.configuration; + +import javax.sql.DataSource; + +import org.springframework.cloud.task.repository.TaskExplorer; +import org.springframework.cloud.task.repository.TaskNameResolver; +import org.springframework.cloud.task.repository.TaskRepository; +import org.springframework.cloud.task.repository.support.SimpleTaskExplorer; +import org.springframework.cloud.task.repository.support.SimpleTaskNameResolver; +import org.springframework.cloud.task.repository.support.SimpleTaskRepository; +import org.springframework.cloud.task.repository.support.TaskExecutionDaoFactoryBean; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.util.Assert; + +/** + * A {@link TaskConfigurer} implementation that uses MongoDB for storing Task metadata. + * This configurer uses MongoDB collections to store task execution information instead + * of relational database tables. + * + * @author JongJun Kim + */ +public class MongoTaskConfigurer implements TaskConfigurer { + + private final MongoOperations mongoOperations; + + private final TaskProperties taskProperties; + + private TaskRepository taskRepository; + + private TaskExplorer taskExplorer; + + private PlatformTransactionManager transactionManager; + + /** + * Create a new {@link MongoTaskConfigurer} with the provided {@link MongoOperations}. + * @param mongoOperations the {@link MongoOperations} to use for task metadata storage + * @param taskProperties the {@link TaskProperties} for configuration + */ + public MongoTaskConfigurer(MongoOperations mongoOperations, TaskProperties taskProperties) { + Assert.notNull(mongoOperations, "mongoOperations must not be null"); + Assert.notNull(taskProperties, "taskProperties must not be null"); + this.mongoOperations = mongoOperations; + this.taskProperties = taskProperties; + } + + /** + * Create a new {@link MongoTaskConfigurer} with the provided {@link MongoOperations} + * and {@link PlatformTransactionManager}. + * @param mongoOperations the {@link MongoOperations} to use for task metadata storage + * @param taskProperties the {@link TaskProperties} for configuration + * @param transactionManager the {@link PlatformTransactionManager} for transaction management + */ + public MongoTaskConfigurer(MongoOperations mongoOperations, TaskProperties taskProperties, + PlatformTransactionManager transactionManager) { + this(mongoOperations, taskProperties); + this.transactionManager = transactionManager; + } + + @Override + public TaskRepository getTaskRepository() { + if (this.taskRepository == null) { + this.taskRepository = new SimpleTaskRepository(new TaskExecutionDaoFactoryBean(this.mongoOperations, this.taskProperties)); + } + return this.taskRepository; + } + + @Override + public TaskExplorer getTaskExplorer() { + if (this.taskExplorer == null) { + this.taskExplorer = new SimpleTaskExplorer(new TaskExecutionDaoFactoryBean(this.mongoOperations, this.taskProperties)); + } + return this.taskExplorer; + } + + @Override + public PlatformTransactionManager getTransactionManager() { + return this.transactionManager; + } + + @Override + public TaskNameResolver getTaskNameResolver() { + return new SimpleTaskNameResolver(); + } + + @Override + public DataSource getTaskDataSource() { + return null; + } +} diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/dao/MongoLockRepository.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/dao/MongoLockRepository.java new file mode 100644 index 000000000..4b20fe6e2 --- /dev/null +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/dao/MongoLockRepository.java @@ -0,0 +1,311 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.repository.dao; + +import java.time.LocalDateTime; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import org.springframework.cloud.task.configuration.TaskProperties; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.data.annotation.Id; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.index.Index; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.util.Assert; + +/** + * MongoDB-based implementation of {@link LockRegistry} for single instance task execution. + * This implementation provides distributed locking mechanism using MongoDB collections + * to ensure only one instance of a task can run at a time. + * + * @author JongJun Kim + */ +public class MongoLockRepository implements LockRegistry { + + private static final String TASK_LOCK_COLLECTION = "task_locks"; + + private final MongoOperations mongoOperations; + + private final String tablePrefix; + + private final long timeToLive; + + private final String clientId; + + /** + * Constructor for MongoDB Lock Repository. + * @param mongoOperations MongoDB operations template + * @param taskProperties Task configuration properties + */ + public MongoLockRepository(MongoOperations mongoOperations, TaskProperties taskProperties) { + Assert.notNull(mongoOperations, "mongoOperations must not be null"); + Assert.notNull(taskProperties, "taskProperties must not be null"); + + this.mongoOperations = mongoOperations; + this.tablePrefix = taskProperties.getTablePrefix(); + this.timeToLive = taskProperties.getSingleInstanceLockTtl(); + this.clientId = UUID.randomUUID().toString(); + + initializeCollection(); + } + + /** + * Initialize MongoDB collection for task locks. + * This ensures the collection and indexes exist even if MongoTaskRepositoryInitializer + * has not run yet or is disabled. + */ + private void initializeCollection() { + String collectionName = getCollectionName(TASK_LOCK_COLLECTION); + if (!mongoOperations.collectionExists(collectionName)) { + mongoOperations.createCollection(collectionName); + } + + // Ensure the compound unique index exists + // This is idempotent - MongoDB will not create duplicate indexes + Index compoundIndex = new Index() + .on("lockKey", Sort.Direction.ASC) + .on("region", Sort.Direction.ASC) + .unique() + .named("idx_lock_key_region"); + + var indexOps = mongoOperations.indexOps(collectionName); + try { + indexOps.createIndex(compoundIndex); + } + catch (Exception e) { + // Index may already exist - this is acceptable + // MongoDB will throw an error if index exists with same name but different spec + } + } + + /** + * Get collection name with table prefix. + * @param baseName base collection name + * @return prefixed collection name + */ + private String getCollectionName(String baseName) { + return tablePrefix + baseName; + } + + @Override + public Lock obtain(Object lockKey) { + Assert.notNull(lockKey, "lockKey must not be null"); + return new MongoLock(lockKey.toString()); + } + + /** + * MongoDB-based lock implementation. + */ + private class MongoLock implements Lock { + + + private final String lockKey; + + private final String region = "default"; + + private volatile boolean locked = false; + + MongoLock(String lockKey) { + this.lockKey = lockKey; + } + + @Override + public void lock() { + try { + lockInterruptibly(); + } + catch (InterruptedException e) { + Thread.currentThread() + .interrupt(); + throw new IllegalStateException("Interrupted while acquiring lock", e); + } + } + + @Override + public void lockInterruptibly() throws InterruptedException { + while (!tryLock()) { + if (Thread.currentThread() + .isInterrupted()) { + throw new InterruptedException(); + } + Thread.sleep(100); // Short sleep before retry + } + } + + @Override + public boolean tryLock() { + return tryLock(0, TimeUnit.MILLISECONDS); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) { + long timeoutMillis = unit.toMillis(time); + long startTime = System.currentTimeMillis(); + long retryInterval = 100; // 100ms between retries + + while (true) { + if (tryLockOnce()) { + return true; + } + + // Check if we've exceeded the timeout + long elapsed = System.currentTimeMillis() - startTime; + if (elapsed >= timeoutMillis) { + return false; + } + + // Sleep before retry, but not longer than remaining timeout + long remainingTime = timeoutMillis - elapsed; + long sleepTime = Math.min(retryInterval, remainingTime); + + if (sleepTime > 0) { + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } + } + + /** + * Attempts to acquire the lock once without retrying. + * @return true if lock was acquired, false otherwise + */ + private boolean tryLockOnce() { + cleanupExpiredLocks(); + LocalDateTime now = LocalDateTime.now(); + LocalDateTime expirationTime = now.plusSeconds(timeToLive / 1000); + try { + TaskLockDocument lockDoc = new TaskLockDocument(); + lockDoc.id = this.lockKey + ":" + this.region; // Composite ID for uniqueness + lockDoc.lockKey = this.lockKey; + lockDoc.region = this.region; + lockDoc.clientId = MongoLockRepository.this.clientId; + lockDoc.createdDate = now; + lockDoc.expirationDate = expirationTime; + mongoOperations.insert(lockDoc, getCollectionName(TASK_LOCK_COLLECTION)); + this.locked = true; + return true; + } + catch (DuplicateKeyException e) { + Query query = Query.query( + Criteria.where("lockKey") + .is(this.lockKey) + .and("region") + .is(this.region) + .and("expirationDate") + .lt(now) + ); + Update update = Update.update("expirationDate", expirationTime) + .set("clientId", MongoLockRepository.this.clientId) + .set("createdDate", now); + var result = mongoOperations.updateFirst(query, update, TaskLockDocument.class, getCollectionName(TASK_LOCK_COLLECTION)); + if (result.getModifiedCount() > 0) { + this.locked = true; + return true; + } + Query ownerQuery = Query.query( + Criteria.where("lockKey") + .is(this.lockKey) + .and("region") + .is(this.region) + .and("clientId") + .is(MongoLockRepository.this.clientId) + ); + Update ownerUpdate = Update.update("expirationDate", expirationTime); + var ownerResult = mongoOperations.updateFirst(ownerQuery, ownerUpdate, TaskLockDocument.class, getCollectionName(TASK_LOCK_COLLECTION)); + if (ownerResult.getModifiedCount() > 0) { + this.locked = true; + return true; + } + } + return false; + } + + @Override + public void unlock() { + if (this.locked) { + Query query = Query.query( + Criteria.where("lockKey") + .is(this.lockKey) + .and("region") + .is(this.region) + .and("clientId") + .is(MongoLockRepository.this.clientId) + ); + + mongoOperations.remove(query, TaskLockDocument.class, + getCollectionName(TASK_LOCK_COLLECTION)); + this.locked = false; + } + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException("Conditions are not supported"); + } + + private void cleanupExpiredLocks() { + LocalDateTime now = LocalDateTime.now(); + Query expiredQuery = Query.query( + Criteria.where("expirationDate") + .lt(now) + ); + + mongoOperations.remove(expiredQuery, TaskLockDocument.class, + getCollectionName(TASK_LOCK_COLLECTION)); + } + + } + + /** + * MongoDB document representing a task lock. + */ + @Document + static class TaskLockDocument { + + @Id + String id; + + String lockKey; + + String region; + + String clientId; + + LocalDateTime createdDate; + + LocalDateTime expirationDate; + + TaskLockDocument() { + // ID will be set as composite of lockKey:region for uniqueness + } + + } + +} diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/dao/MongoTaskExecutionDao.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/dao/MongoTaskExecutionDao.java new file mode 100644 index 000000000..e4a25cafa --- /dev/null +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/dao/MongoTaskExecutionDao.java @@ -0,0 +1,648 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.repository.dao; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.springframework.cloud.task.configuration.TaskProperties; +import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.FindAndModifyOptions; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Data Access Object for task executions using MongoDB. + * + * @author JongJun Kim + */ +public class MongoTaskExecutionDao implements TaskExecutionDao { + + private static final String TASK_EXECUTION_COLLECTION = "task_executions"; + private static final String TASK_EXECUTION_PARAMS_COLLECTION = "task_execution_parameters"; + private static final String TASK_BATCH_COLLECTION = "task_batch_associations"; + private static final String TASK_SEQUENCE_COLLECTION = "task_sequence"; + + private static final String TASK_EXECUTION_ID_KEY = "taskExecutionId"; + private static final String TASK_NAME_KEY = "taskName"; + private static final String START_TIME_KEY = "startTime"; + private static final String END_TIME_KEY = "endTime"; + private static final String EXIT_CODE_KEY = "exitCode"; + private static final String EXIT_MESSAGE_KEY = "exitMessage"; + private static final String ERROR_MESSAGE_KEY = "errorMessage"; + private static final String LAST_UPDATED_KEY = "lastUpdated"; + private static final String EXTERNAL_EXECUTION_ID_KEY = "externalExecutionId"; + private static final String PARENT_EXECUTION_ID_KEY = "parentExecutionId"; + private static final String TASK_PARAM_KEY = "taskParam"; + private static final String JOB_EXECUTION_ID_KEY = "jobExecutionId"; + + private final MongoOperations mongoOperations; + + private final String tablePrefix; + + public MongoTaskExecutionDao(MongoOperations mongoOperations, TaskProperties taskProperties) { + Assert.notNull(mongoOperations, "mongoOperations must not be null"); + Assert.notNull(taskProperties, "taskProperties must not be null"); + this.mongoOperations = mongoOperations; + this.tablePrefix = taskProperties.getTablePrefix(); + } + + /** + * Constructor that accepts Object type to maintain backward compatibility with + * {@link org.springframework.cloud.task.repository.support.TaskExecutionDaoFactoryBean}. + *

+ * This constructor exists because TaskExecutionDaoFactoryBean is in the core module + * and cannot have a compile-time dependency on MongoDB (which is optional). + * The factory uses reflection to instantiate this class only when MongoDB is available. + *

+ * Note: Direct users should prefer the type-safe constructor + * {@link #MongoTaskExecutionDao(MongoOperations, TaskProperties)}. + * + * @param mongoOperations the MongoOperations instance (passed as Object to avoid compile-time dependency) + * @param taskProperties the task properties + * @throws IllegalArgumentException if mongoOperations is not a MongoOperations instance + */ + public MongoTaskExecutionDao(Object mongoOperations, TaskProperties taskProperties) { + Assert.notNull(mongoOperations, "mongoOperations must not be null"); + Assert.notNull(taskProperties, "taskProperties must not be null"); + + if (!(mongoOperations instanceof MongoOperations)) { + throw new IllegalArgumentException( + String.format("Provided object is not a MongoOperations instance. Found: %s", + mongoOperations.getClass().getName())); + } + + this.mongoOperations = (MongoOperations) mongoOperations; + this.tablePrefix = taskProperties.getTablePrefix(); + } + + private String getCollectionName(String collectionName) { + return this.tablePrefix + collectionName; + } + + @Override + public TaskExecution createTaskExecution(String taskName, LocalDateTime startTime, List arguments, + String externalExecutionId) { + return createTaskExecution(taskName, startTime, arguments, externalExecutionId, null); + } + + @Override + public TaskExecution createTaskExecution(String taskName, LocalDateTime startTime, List arguments, + String externalExecutionId, Long parentExecutionId) { + Assert.hasText(taskName, "taskName must not be null or empty"); + Assert.notNull(startTime, "startTime must not be null"); + + long taskExecutionId = getNextExecutionId(); + + TaskExecutionDocument document = new TaskExecutionDocument(); + document.setTaskExecutionId(taskExecutionId); + document.setTaskName(taskName); + document.setStartTime(startTime); + document.setExternalExecutionId(externalExecutionId); + document.setParentExecutionId(parentExecutionId); + document.setLastUpdated(LocalDateTime.now()); + + mongoOperations.save(document, getCollectionName(TASK_EXECUTION_COLLECTION)); + + // Save parameters + if (arguments != null) { + for (String argument : arguments) { + if (argument != null) { + TaskExecutionParameterDocument paramDoc = new TaskExecutionParameterDocument(); + paramDoc.setTaskExecutionId(taskExecutionId); + paramDoc.setTaskParam(argument); + mongoOperations.save(paramDoc, getCollectionName(TASK_EXECUTION_PARAMS_COLLECTION)); + } + } + } + + return convertToTaskExecution(document, arguments); + } + + @Override + public TaskExecution startTaskExecution(long executionId, String taskName, LocalDateTime startTime, + List arguments, String externalExecutionId) { + return startTaskExecution(executionId, taskName, startTime, arguments, externalExecutionId, null); + } + + @Override + public TaskExecution startTaskExecution(long executionId, String taskName, LocalDateTime startTime, + List arguments, String externalExecutionId, Long parentExecutionId) { + Assert.hasText(taskName, "taskName must not be null or empty"); + Assert.notNull(startTime, "startTime must not be null"); + + // Create new task execution document + TaskExecutionDocument document = new TaskExecutionDocument(); + document.setTaskExecutionId(executionId); + document.setTaskName(taskName); + document.setStartTime(startTime); + document.setExternalExecutionId(externalExecutionId); + document.setParentExecutionId(parentExecutionId); + document.setLastUpdated(LocalDateTime.now()); + + mongoOperations.save(document, getCollectionName(TASK_EXECUTION_COLLECTION)); + + // Save parameters + if (arguments != null) { + for (String argument : arguments) { + if (argument != null) { + TaskExecutionParameterDocument paramDoc = new TaskExecutionParameterDocument(); + paramDoc.setTaskExecutionId(executionId); + paramDoc.setTaskParam(argument); + mongoOperations.save(paramDoc, getCollectionName(TASK_EXECUTION_PARAMS_COLLECTION)); + } + } + } + + return convertToTaskExecution(document, arguments); + } + + @Override + public void completeTaskExecution(long executionId, Integer exitCode, LocalDateTime endTime, String exitMessage, + String errorMessage) { + Assert.notNull(endTime, "endTime must not be null"); + + // Check if task execution exists first + TaskExecution taskExecution = getTaskExecution(executionId); + if (taskExecution == null) { + throw new IllegalStateException("Invalid TaskExecution, ID " + executionId + " not found."); + } + + Query query = new Query(Criteria.where(TASK_EXECUTION_ID_KEY).is(executionId)); + Update update = new Update() + .set(EXIT_CODE_KEY, exitCode) + .set(END_TIME_KEY, endTime) + .set(EXIT_MESSAGE_KEY, exitMessage) + .set(ERROR_MESSAGE_KEY, errorMessage) + .set(LAST_UPDATED_KEY, LocalDateTime.now()); + + mongoOperations.updateFirst(query, update, getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + @Override + public void completeTaskExecution(long executionId, Integer exitCode, LocalDateTime endTime, String exitMessage) { + completeTaskExecution(executionId, exitCode, endTime, exitMessage, null); + } + + @Override + public TaskExecution getTaskExecution(long executionId) { + Query query = new Query(Criteria.where(TASK_EXECUTION_ID_KEY).is(executionId)); + TaskExecutionDocument document = mongoOperations.findOne(query, TaskExecutionDocument.class, + getCollectionName(TASK_EXECUTION_COLLECTION)); + + if (document == null) { + return null; + } + + List arguments = getTaskExecutionParameters(executionId); + return convertToTaskExecution(document, arguments); + } + + @Override + public long getTaskExecutionCountByTaskName(String taskName) { + Assert.hasText(taskName, "taskName must not be null or empty"); + Query query = new Query(Criteria.where(TASK_NAME_KEY).is(taskName)); + return mongoOperations.count(query, getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + @Override + public long getRunningTaskExecutionCountByTaskName(String taskName) { + Assert.hasText(taskName, "taskName must not be null or empty"); + Query query = new Query(Criteria.where(TASK_NAME_KEY).is(taskName).and(END_TIME_KEY).isNull()); + return mongoOperations.count(query, getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + @Override + public long getRunningTaskExecutionCount() { + Query query = new Query(Criteria.where(END_TIME_KEY).isNull()); + return mongoOperations.count(query, getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + @Override + public long getTaskExecutionCount() { + return mongoOperations.count(new Query(), getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + @Override + public Page findRunningTaskExecutions(String taskName, Pageable pageable) { + Assert.hasText(taskName, "taskName must not be null or empty"); + Assert.notNull(pageable, "pageable must not be null"); + + Query query = new Query(Criteria.where(TASK_NAME_KEY).is(taskName).and(END_TIME_KEY).isNull()) + .with(pageable); + + List documents = mongoOperations.find(query, TaskExecutionDocument.class, + getCollectionName(TASK_EXECUTION_COLLECTION)); + + long total = getRunningTaskExecutionCountByTaskName(taskName); + List taskExecutions = convertToTaskExecutions(documents); + + return new PageImpl<>(taskExecutions, pageable, total); + } + + @Override + public Page findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable) { + Assert.hasText(externalExecutionId, "externalExecutionId must not be null or empty"); + Assert.notNull(pageable, "pageable must not be null"); + + Query query = new Query(Criteria.where(EXTERNAL_EXECUTION_ID_KEY).is(externalExecutionId)) + .with(pageable); + + List documents = mongoOperations.find(query, TaskExecutionDocument.class, + getCollectionName(TASK_EXECUTION_COLLECTION)); + + long total = getTaskExecutionCountByExternalExecutionId(externalExecutionId); + List taskExecutions = convertToTaskExecutions(documents); + + return new PageImpl<>(taskExecutions, pageable, total); + } + + @Override + public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) { + Assert.hasText(externalExecutionId, "externalExecutionId must not be null or empty"); + Query query = new Query(Criteria.where(EXTERNAL_EXECUTION_ID_KEY).is(externalExecutionId)); + return mongoOperations.count(query, getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + @Override + public Page findTaskExecutionsByName(String taskName, Pageable pageable) { + Assert.hasText(taskName, "taskName must not be null or empty"); + Assert.notNull(pageable, "pageable must not be null"); + + Query query = new Query(Criteria.where(TASK_NAME_KEY).is(taskName)) + .with(pageable); + + List documents = mongoOperations.find(query, TaskExecutionDocument.class, + getCollectionName(TASK_EXECUTION_COLLECTION)); + + long total = getTaskExecutionCountByTaskName(taskName); + List taskExecutions = convertToTaskExecutions(documents); + + return new PageImpl<>(taskExecutions, pageable, total); + } + + @Override + public List getTaskNames() { + Query query = new Query(); + return mongoOperations.findDistinct(query, TASK_NAME_KEY, getCollectionName(TASK_EXECUTION_COLLECTION), String.class); + } + + @Override + public Page findAll(Pageable pageable) { + Assert.notNull(pageable, "pageable must not be null"); + + Query query = new Query().with(pageable); + + List documents = mongoOperations.find(query, TaskExecutionDocument.class, + getCollectionName(TASK_EXECUTION_COLLECTION)); + + long total = getTaskExecutionCount(); + List taskExecutions = convertToTaskExecutions(documents); + + return new PageImpl<>(taskExecutions, pageable, total); + } + + @Override + public long getNextExecutionId() { + Query query = new Query(Criteria.where("_id").is("task_seq")); + Update update = new Update().inc("sequence", 1); + FindAndModifyOptions options = new FindAndModifyOptions().upsert(true).returnNew(true); + TaskSequence sequence = mongoOperations.findAndModify(query, update, options, TaskSequence.class, getCollectionName(TASK_SEQUENCE_COLLECTION)); + return sequence != null ? sequence.getSequence() : 1L; + } + + @Override + public Long getTaskExecutionIdByJobExecutionId(long jobExecutionId) { + Query query = new Query(Criteria.where(JOB_EXECUTION_ID_KEY).is(jobExecutionId)); + TaskBatchAssociationDocument association = mongoOperations.findOne(query, + TaskBatchAssociationDocument.class, getCollectionName(TASK_BATCH_COLLECTION)); + + return association != null ? association.getTaskExecutionId() : null; + } + + @Override + public Set getJobExecutionIdsByTaskExecutionId(long taskExecutionId) { + Query query = new Query(Criteria.where(TASK_EXECUTION_ID_KEY).is(taskExecutionId)); + List associations = mongoOperations.find(query, + TaskBatchAssociationDocument.class, getCollectionName(TASK_BATCH_COLLECTION)); + + Set jobExecutionIds = new HashSet<>(); + for (TaskBatchAssociationDocument association : associations) { + jobExecutionIds.add(association.getJobExecutionId()); + } + + return jobExecutionIds; + } + + @Override + public void updateExternalExecutionId(long taskExecutionId, String externalExecutionId) { + // Check if task execution exists first + TaskExecution taskExecution = getTaskExecution(taskExecutionId); + Assert.notNull(taskExecution, "Invalid TaskExecution, ID " + taskExecutionId + " not found."); + + Query query = new Query(Criteria.where(TASK_EXECUTION_ID_KEY).is(taskExecutionId)); + Update update = new Update() + .set(EXTERNAL_EXECUTION_ID_KEY, externalExecutionId) + .set(LAST_UPDATED_KEY, LocalDateTime.now()); + + mongoOperations.updateFirst(query, update, getCollectionName(TASK_EXECUTION_COLLECTION)); + } + + /** + * Create a batch-task association. + * @param taskExecutionId The task execution ID + * @param jobExecutionId The batch job execution ID + */ + public void createTaskBatchAssociation(long taskExecutionId, long jobExecutionId) { + TaskBatchAssociationDocument association = new TaskBatchAssociationDocument(); + association.setTaskExecutionId(taskExecutionId); + association.setJobExecutionId(jobExecutionId); + + mongoOperations.save(association, getCollectionName(TASK_BATCH_COLLECTION)); + } + + /** + * Delete batch-task association by task execution ID. + * @param taskExecutionId The task execution ID + */ + public void deleteTaskBatchAssociationsByTaskExecutionId(long taskExecutionId) { + Query query = new Query(Criteria.where(TASK_EXECUTION_ID_KEY).is(taskExecutionId)); + mongoOperations.remove(query, TaskBatchAssociationDocument.class, + getCollectionName(TASK_BATCH_COLLECTION)); + } + + /** + * Delete batch-task association by job execution ID. + * @param jobExecutionId The batch job execution ID + */ + public void deleteTaskBatchAssociationsByJobExecutionId(long jobExecutionId) { + Query query = new Query(Criteria.where(JOB_EXECUTION_ID_KEY).is(jobExecutionId)); + mongoOperations.remove(query, TaskBatchAssociationDocument.class, + getCollectionName(TASK_BATCH_COLLECTION)); + } + + @Override + public List getLatestTaskExecutionsByTaskNames(String... taskNames) { + Assert.notEmpty(taskNames, "At least 1 task name must be provided."); + + final List taskNamesAsList = new ArrayList<>(); + + for (String taskName : taskNames) { + if (StringUtils.hasText(taskName)) { + taskNamesAsList.add(taskName); + } + } + + Assert.isTrue(taskNamesAsList.size() == taskNames.length, + String.format("Task names must not contain any empty elements but %s of %s were empty or null.", + taskNames.length - taskNamesAsList.size(), taskNames.length)); + + List result = new ArrayList<>(); + for (String taskName : taskNamesAsList) { + TaskExecution latest = getLatestTaskExecutionForTaskName(taskName); + if (latest != null) { + result.add(latest); + } + } + + // Sort by startTime DESC, then by executionId DESC (latest first) + result.sort((a, b) -> { + int timeCompare = b.getStartTime().compareTo(a.getStartTime()); + if (timeCompare != 0) { + return timeCompare; + } + return Long.compare(b.getExecutionId(), a.getExecutionId()); + }); + + return result; + } + + @Override + public TaskExecution getLatestTaskExecutionForTaskName(String taskName) { + Assert.hasText(taskName, "The task name must not be empty."); + + Query query = new Query(Criteria.where(TASK_NAME_KEY).is(taskName)) + .with(Sort.by(Sort.Direction.DESC, START_TIME_KEY, TASK_EXECUTION_ID_KEY)) + .limit(1); + + TaskExecutionDocument document = mongoOperations.findOne(query, TaskExecutionDocument.class, + getCollectionName(TASK_EXECUTION_COLLECTION)); + + if (document == null) { + return null; + } + + List arguments = getTaskExecutionParameters(document.getTaskExecutionId()); + return convertToTaskExecution(document, arguments); + } + + private List getTaskExecutionParameters(long taskExecutionId) { + Query query = new Query(Criteria.where(TASK_EXECUTION_ID_KEY).is(taskExecutionId)); + List paramDocs = mongoOperations.find(query, + TaskExecutionParameterDocument.class, getCollectionName(TASK_EXECUTION_PARAMS_COLLECTION)); + + List arguments = new ArrayList<>(); + for (TaskExecutionParameterDocument paramDoc : paramDocs) { + arguments.add(paramDoc.getTaskParam()); + } + + return arguments; + } + + private List convertToTaskExecutions(List documents) { + List taskExecutions = new ArrayList<>(); + + for (TaskExecutionDocument document : documents) { + List arguments = getTaskExecutionParameters(document.getTaskExecutionId()); + taskExecutions.add(convertToTaskExecution(document, arguments)); + } + + return taskExecutions; + } + + private TaskExecution convertToTaskExecution(TaskExecutionDocument document, List arguments) { + if (document == null) { + return null; + } + + return new TaskExecution( + document.getTaskExecutionId(), + document.getExitCode(), + document.getTaskName(), + document.getStartTime(), + document.getEndTime(), + document.getExitMessage(), + arguments != null ? arguments : Collections.emptyList(), + document.getErrorMessage(), + document.getExternalExecutionId(), + document.getParentExecutionId() + ); + } + + // MongoDB Document Classes + // Note: These classes use simple POJOs without @Document annotations because + // collections are specified dynamically at runtime with table prefixes. + // The mongoOperations.save(object, collectionName) pattern is used instead. + public static class TaskExecutionDocument { + private Long taskExecutionId; + private LocalDateTime startTime; + private LocalDateTime endTime; + private String taskName; + private Integer exitCode; + private String exitMessage; + private String errorMessage; + private LocalDateTime lastUpdated; + private String externalExecutionId; + private Long parentExecutionId; + + // Getters and setters + public Long getTaskExecutionId() { + return taskExecutionId; + } + public void setTaskExecutionId(Long taskExecutionId) { + this.taskExecutionId = taskExecutionId; + } + public LocalDateTime getStartTime() { + return startTime; + } + public void setStartTime(LocalDateTime startTime) { + this.startTime = startTime; + } + public LocalDateTime getEndTime() { + return endTime; + } + public void setEndTime(LocalDateTime endTime) { + this.endTime = endTime; + } + public String getTaskName() { + return taskName; + } + public void setTaskName(String taskName) { + this.taskName = taskName; + } + public Integer getExitCode() { + return exitCode; + } + public void setExitCode(Integer exitCode) { + this.exitCode = exitCode; + } + public String getExitMessage() { + return exitMessage; + } + public void setExitMessage(String exitMessage) { + this.exitMessage = exitMessage; + } + public String getErrorMessage() { + return errorMessage; + } + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + public LocalDateTime getLastUpdated() { + return lastUpdated; + } + public void setLastUpdated(LocalDateTime lastUpdated) { + this.lastUpdated = lastUpdated; + } + public String getExternalExecutionId() { + return externalExecutionId; + } + public void setExternalExecutionId(String externalExecutionId) { + this.externalExecutionId = externalExecutionId; + } + public Long getParentExecutionId() { + return parentExecutionId; + } + public void setParentExecutionId(Long parentExecutionId) { + this.parentExecutionId = parentExecutionId; + } + } + + public static class TaskExecutionParameterDocument { + private Long taskExecutionId; + private String taskParam; + + public Long getTaskExecutionId() { + return taskExecutionId; + } + public void setTaskExecutionId(Long taskExecutionId) { + this.taskExecutionId = taskExecutionId; + } + public String getTaskParam() { + return taskParam; + } + public void setTaskParam(String taskParam) { + this.taskParam = taskParam; + } + } + + public static class TaskBatchAssociationDocument { + private Long taskExecutionId; + private Long jobExecutionId; + + public Long getTaskExecutionId() { + return taskExecutionId; + } + public void setTaskExecutionId(Long taskExecutionId) { + this.taskExecutionId = taskExecutionId; + } + public Long getJobExecutionId() { + return jobExecutionId; + } + public void setJobExecutionId(Long jobExecutionId) { + this.jobExecutionId = jobExecutionId; + } + } + + public static class TaskSequence { + private String id; + private Long sequence; + + public TaskSequence() { + } + + public TaskSequence(String id, Long sequence) { + this.id = id; + this.sequence = sequence; + } + + public String getId() { + return id; + } + public void setId(String id) { + this.id = id; + } + public Long getSequence() { + return sequence; + } + public void setSequence(Long sequence) { + this.sequence = sequence; + } + } +} diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/DatabaseType.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/DatabaseType.java index 009665290..7e0272da5 100644 --- a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/DatabaseType.java +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/DatabaseType.java @@ -90,7 +90,12 @@ public enum DatabaseType { /** * DB2AS400 DB. */ - DB2AS400("DB2AS400"); + DB2AS400("DB2AS400"), + + /** + * MongoDB. + */ + MONGODB("MongoDB"); private static final Map dbNameMap; @@ -172,6 +177,23 @@ public static DatabaseType fromProductName(String productName) { } } + /** + * Determine if the provided object represents a MongoDB data source or template. + * This method checks for MongoDB-specific classes to identify MongoDB usage. + * @param dataSourceOrTemplate the data source or MongoDB template object + * @return true if MongoDB is detected, false otherwise + */ + public static boolean isMongoDB(Object dataSourceOrTemplate) { + if (dataSourceOrTemplate == null) { + return false; + } + String className = dataSourceOrTemplate.getClass().getName(); + return className.contains("org.springframework.data.mongodb") || + className.contains("com.mongodb") || + className.contains("MongoTemplate") || + className.contains("MongoOperations"); + } + private String getProductName() { return this.productName; } diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/MongoTaskRepositoryInitializer.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/MongoTaskRepositoryInitializer.java new file mode 100644 index 000000000..80e331634 --- /dev/null +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/MongoTaskRepositoryInitializer.java @@ -0,0 +1,238 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.repository.support; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.task.configuration.TaskProperties; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.index.Index; +import org.springframework.data.mongodb.core.index.IndexOperations; +import org.springframework.util.Assert; + +/** + * Utility for initializing the MongoDB Task Repository collections and indexes. + * This class creates the necessary collections and indexes for Spring Cloud Task + * to function properly with MongoDB. + * + * Initialization can be disabled by setting the property + * {@code spring.cloud.task.initialize-enabled} to false. + * + * @author JongJun Kim + */ +public final class MongoTaskRepositoryInitializer implements InitializingBean { + + private static final Log logger = LogFactory.getLog(MongoTaskRepositoryInitializer.class); + + private final MongoOperations mongoOperations; + + private final TaskProperties taskProperties; + + @Value("${spring.cloud.task.initialize.enable:true}") + private boolean taskInitializationEnabled; + + private static final List COLLECTION_NAMES = Arrays.asList( + "task_executions", + "task_execution_parameters", + "task_batch_associations", + "task_sequence", + "task_locks" + ); + + /** + * Constructor for MongoDB Task Repository Initializer. + * @param mongoOperations MongoDB operations template + * @param taskProperties Task configuration properties + */ + public MongoTaskRepositoryInitializer(MongoOperations mongoOperations, TaskProperties taskProperties) { + Assert.notNull(mongoOperations, "mongoOperations must not be null"); + Assert.notNull(taskProperties, "taskProperties must not be null"); + + this.mongoOperations = mongoOperations; + this.taskProperties = taskProperties; + } + + @Override + public void afterPropertiesSet() throws Exception { + boolean isInitializeEnabled = (this.taskProperties.isInitializeEnabled() != null) + ? this.taskProperties.isInitializeEnabled() : this.taskInitializationEnabled; + + if (isInitializeEnabled) { + logger.debug("Initializing MongoDB Task collections and indexes"); + initializeCollections(); + createIndexes(); + initializeSequence(); + } + } + + /** + * Create all necessary MongoDB collections for Spring Cloud Task. + */ + private void initializeCollections() { + String tablePrefix = this.taskProperties.getTablePrefix(); + + for (String collectionName : COLLECTION_NAMES) { + String prefixedName = tablePrefix + collectionName; + + if (!mongoOperations.collectionExists(prefixedName)) { + mongoOperations.createCollection(prefixedName); + logger.debug(String.format("Created collection: %s", prefixedName)); + } + } + } + + /** + * Create indexes for optimal query performance. + */ + private void createIndexes() { + String tablePrefix = this.taskProperties.getTablePrefix(); + + // Task Executions indexes + createTaskExecutionIndexes(tablePrefix + "task_executions"); + + // Task Execution Parameters indexes + createTaskParameterIndexes(tablePrefix + "task_execution_parameters"); + + // Task Batch Association indexes + createBatchAssociationIndexes(tablePrefix + "task_batch_associations"); + + // Task Locks indexes + createLockIndexes(tablePrefix + "task_locks"); + } + + /** + * Create indexes for task executions collection. + */ + private void createTaskExecutionIndexes(String collectionName) { + IndexOperations indexOps = mongoOperations.indexOps(collectionName); + + // Index on task name for faster lookups + indexOps.createIndex(new Index().on("taskName", Sort.Direction.ASC) + .named("idx_task_name")); + + // Index on start time for ordering + indexOps.createIndex(new Index().on("startTime", Sort.Direction.DESC) + .named("idx_start_time")); + + // Index on end time for running task queries + indexOps.createIndex(new Index().on("endTime", Sort.Direction.ASC) + .named("idx_end_time")); + + // Index on external execution ID + indexOps.createIndex(new Index().on("externalExecutionId", Sort.Direction.ASC) + .named("idx_external_execution_id")); + + // Compound index for running tasks by name + indexOps.createIndex(new Index() + .on("taskName", Sort.Direction.ASC) + .on("endTime", Sort.Direction.ASC) + .named("idx_task_name_end_time")); + + // Index on parent execution ID + indexOps.createIndex(new Index().on("parentExecutionId", Sort.Direction.ASC) + .named("idx_parent_execution_id")); + + logger.debug("Created indexes for task executions collection: " + collectionName); + } + + /** + * Create indexes for task parameters collection. + */ + private void createTaskParameterIndexes(String collectionName) { + IndexOperations indexOps = mongoOperations.indexOps(collectionName); + + // Index on task execution ID for parameter lookups + indexOps.createIndex(new Index().on("taskExecutionId", Sort.Direction.ASC) + .named("idx_task_execution_id")); + + logger.debug("Created indexes for task parameters collection: " + collectionName); + } + + /** + * Create indexes for batch association collection. + */ + private void createBatchAssociationIndexes(String collectionName) { + IndexOperations indexOps = mongoOperations.indexOps(collectionName); + + // Index on task execution ID + indexOps.createIndex(new Index().on("taskExecutionId", Sort.Direction.ASC) + .named("idx_task_execution_id")); + + // Index on job execution ID + indexOps.createIndex(new Index().on("jobExecutionId", Sort.Direction.ASC) + .named("idx_job_execution_id")); + + logger.debug("Created indexes for batch association collection: " + collectionName); + } + + /** + * Create indexes for locks collection. + */ + private void createLockIndexes(String collectionName) { + IndexOperations indexOps = mongoOperations.indexOps(collectionName); + + // Unique compound index on lock key and region + indexOps.createIndex(new Index() + .on("lockKey", Sort.Direction.ASC) + .on("region", Sort.Direction.ASC) + .unique() + .named("idx_lock_key_region")); + + // Index on expiration date for cleanup + indexOps.createIndex(new Index().on("expirationDate", Sort.Direction.ASC) + .named("idx_expiration_date")); + + // Index on client ID + indexOps.createIndex(new Index().on("clientId", Sort.Direction.ASC) + .named("idx_client_id")); + + logger.debug("Created indexes for locks collection: " + collectionName); + } + + /** + * Initialize the sequence counter for task execution IDs. + * The sequence starts at 0, and the first call to getNextExecutionId() will return 1. + * This matches the behavior of database sequences in the JDBC implementation. + */ + private void initializeSequence() { + String sequenceCollectionName = this.taskProperties.getTablePrefix() + "task_sequence"; + + // Check if sequence document exists, if not create it + org.springframework.data.mongodb.core.query.Query sequenceQuery = + org.springframework.data.mongodb.core.query.Query.query( + org.springframework.data.mongodb.core.query.Criteria.where("_id").is("task_seq") + ); + + if (!mongoOperations.exists(sequenceQuery, sequenceCollectionName)) { + // Create sequence document with initial value of 0 + // The first getNextExecutionId() call will increment and return 1 + java.util.Map sequenceDoc = new java.util.HashMap<>(); + sequenceDoc.put("_id", "task_seq"); + sequenceDoc.put("sequence", 0L); + + mongoOperations.save(sequenceDoc, sequenceCollectionName); + logger.debug("Initialized sequence counter for task execution IDs"); + } + } +} diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/TaskExecutionDaoFactoryBean.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/TaskExecutionDaoFactoryBean.java index f5d3f0db7..22c21aaaf 100644 --- a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/TaskExecutionDaoFactoryBean.java +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/support/TaskExecutionDaoFactoryBean.java @@ -42,15 +42,20 @@ * * @author Michael Minella * @author Glenn Renfro + * @author JongJun Kim */ public class TaskExecutionDaoFactoryBean implements FactoryBean { private DataSource dataSource; + private Object mongoOperations; + private TaskExecutionDao dao = null; private String tablePrefix = TaskProperties.DEFAULT_TABLE_PREFIX; + private TaskProperties taskProperties; + /** * Default constructor will result in a Map based TaskExecutionDao. This is only * intended for testing purposes. @@ -79,10 +84,30 @@ public TaskExecutionDaoFactoryBean(DataSource dataSource) { this.dataSource = dataSource; } + /** + * MongoOperations to be used for MongoDB storage. + * @param mongoOperations MongoOperations instance to be used + * @param taskProperties the task properties for configuration + */ + public TaskExecutionDaoFactoryBean(Object mongoOperations, TaskProperties taskProperties) { + Assert.notNull(mongoOperations, "MongoOperations is required"); + Assert.notNull(taskProperties, "TaskProperties is required"); + + if (!DatabaseType.isMongoDB(mongoOperations)) { + throw new IllegalArgumentException("Provided object is not a MongoDB operations instance"); + } + + this.mongoOperations = mongoOperations; + this.taskProperties = taskProperties; + } + @Override public TaskExecutionDao getObject() throws Exception { if (this.dao == null) { - if (this.dataSource != null) { + if (this.mongoOperations != null) { + buildMongoTaskExecutionDao(); + } + else if (this.dataSource != null) { buildTaskExecutionDao(this.dataSource); } else { @@ -140,6 +165,40 @@ private void buildTaskExecutionDao(DataSource dataSource) { .setTaskIncrementer(incrementerFactory.getIncrementer(databaseType, this.tablePrefix + "SEQ")); } + private void buildMongoTaskExecutionDao() { + if (DatabaseType.isMongoDB(this.mongoOperations)) { + try { + // Verify MongoDB classes are available at runtime + Class.forName("org.springframework.data.mongodb.core.MongoOperations"); + + // Load MongoTaskExecutionDao class via reflection to avoid compile-time dependency + Class daoClass = Class.forName( + "org.springframework.cloud.task.repository.dao.MongoTaskExecutionDao"); + + // Instantiate using the Object-type constructor + // (mongoOperations is kept as Object to avoid compile-time MongoDB dependency) + this.dao = (TaskExecutionDao) daoClass + .getConstructor(Object.class, TaskProperties.class) + .newInstance(this.mongoOperations, this.taskProperties); + } + catch (ClassNotFoundException e) { + throw new IllegalStateException( + "MongoDB support requires spring-data-mongodb dependency. " + + "Add org.springframework.boot:spring-boot-starter-data-mongodb to your dependencies.", e); + } + catch (Exception e) { + throw new IllegalStateException( + "Failed to create MongoTaskExecutionDao. " + + "Ensure MongoDB is properly configured.", e); + } + } + else { + throw new IllegalArgumentException( + String.format("Provided object is not a valid MongoOperations instance. Found: %s", + this.mongoOperations.getClass().getName())); + } + } + private boolean isSqlServerTableSequenceAvailable(String incrementerName) { boolean result = false; DatabaseMetaData metaData = null; diff --git a/spring-cloud-task-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-task-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 3196ed996..154a3bf69 100644 --- a/spring-cloud-task-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-task-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,3 +1,4 @@ org.springframework.cloud.task.configuration.SingleTaskConfiguration org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration +org.springframework.cloud.task.configuration.MongoTaskAutoConfiguration org.springframework.cloud.task.configuration.observation.ObservationTaskAutoConfiguration diff --git a/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/configuration/MongoTaskAutoConfigurationTests.java b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/configuration/MongoTaskAutoConfigurationTests.java new file mode 100644 index 000000000..aded0962f --- /dev/null +++ b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/configuration/MongoTaskAutoConfigurationTests.java @@ -0,0 +1,195 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.configuration; + +import javax.sql.DataSource; + +import com.mongodb.client.MongoClients; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.task.repository.TaskExplorer; +import org.springframework.cloud.task.repository.TaskNameResolver; +import org.springframework.cloud.task.repository.TaskRepository; +import org.springframework.cloud.task.repository.support.MongoTaskRepositoryInitializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.transaction.PlatformTransactionManager; + +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link MongoTaskAutoConfiguration}. + * + * @author JongJun Kim + */ +@Testcontainers +public class MongoTaskAutoConfigurationTests { + + @Container + static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:6.0") + .withExposedPorts(27017); + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of( + MongoTaskAutoConfiguration.class)) + .withUserConfiguration(MongoTestConfiguration.class); + + @Test + public void testMongoTaskAutoConfigurationDisabledByDefault() { + contextRunner + .run(context -> { + assertThat(context).doesNotHaveBean(MongoTaskRepositoryInitializer.class); + assertThat(context).doesNotHaveBean(LockRegistry.class); + assertThat(context).doesNotHaveBean(TaskConfigurer.class); + }); + } + + @Test + public void testMongoTaskAutoConfigurationEnabled() { + contextRunner + .withPropertyValues("spring.cloud.task.repository-type=mongodb") + .run(context -> { + assertThat(context).hasSingleBean(MongoTaskRepositoryInitializer.class); + assertThat(context).hasSingleBean(TaskConfigurer.class); + assertThat(context).getBean(TaskConfigurer.class).isInstanceOf(MongoTaskConfigurer.class); + }); + } + + @Test + public void testMongoLockRegistryEnabled() { + contextRunner + .withPropertyValues( + "spring.cloud.task.repository-type=mongodb", + "spring.cloud.task.single-instance-enabled=true") + .run(context -> { + assertThat(context).hasSingleBean(MongoTaskRepositoryInitializer.class); + assertThat(context).hasSingleBean(LockRegistry.class); + assertThat(context).hasSingleBean(TaskConfigurer.class); + }); + } + + @Test + public void testMongoLockRegistryDisabledByDefault() { + contextRunner + .withPropertyValues("spring.cloud.task.repository-type=mongodb") + .run(context -> { + assertThat(context).hasSingleBean(MongoTaskRepositoryInitializer.class); + assertThat(context).doesNotHaveBean(LockRegistry.class); + assertThat(context).hasSingleBean(TaskConfigurer.class); + }); + } + + @Test + public void testCustomTaskConfigurerPreventsAutoConfiguration() { + contextRunner + .withPropertyValues("spring.cloud.task.repository-type=mongodb") + .withUserConfiguration(CustomTaskConfigurerConfiguration.class) + .run(context -> { + assertThat(context).hasSingleBean(MongoTaskRepositoryInitializer.class); + assertThat(context).hasSingleBean(TaskConfigurer.class); + assertThat(context).getBean(TaskConfigurer.class).isInstanceOf(CustomTaskConfigurer.class); + }); + } + + @Test + public void testMongoAutoConfigurationWithoutMongoOperations() { + new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(MongoTaskAutoConfiguration.class)) + .withPropertyValues("spring.cloud.task.repository-type=mongodb") + .run(context -> { + assertThat(context).doesNotHaveBean(MongoTaskRepositoryInitializer.class); + assertThat(context).doesNotHaveBean(TaskConfigurer.class); + }); + } + + @Test + public void testCustomTablePrefix() { + contextRunner + .withPropertyValues( + "spring.cloud.task.repository-type=mongodb", + "spring.cloud.task.table-prefix=CUSTOM_") + .run(context -> { + assertThat(context).hasSingleBean(MongoTaskRepositoryInitializer.class); + assertThat(context).hasSingleBean(TaskConfigurer.class); + + TaskProperties taskProperties = context.getBean(TaskProperties.class); + assertThat(taskProperties.getTablePrefix()).isEqualTo("CUSTOM_"); + }); + } + + @Configuration + static class MongoTestConfiguration { + + @Bean + public MongoOperations mongoOperations() { + return new MongoTemplate( + new SimpleMongoClientDatabaseFactory( + MongoClients.create(mongoDBContainer.getConnectionString()), + "test-task-config-db" + ) + ); + } + } + + @Configuration + static class CustomTaskConfigurerConfiguration { + + @Bean + public TaskConfigurer customTaskConfigurer() { + return new CustomTaskConfigurer(); + } + } + + static class CustomTaskConfigurer implements TaskConfigurer { + + @Override + public TaskRepository getTaskRepository() { + return null; + } + + @Override + public TaskExplorer getTaskExplorer() { + return null; + } + + @Override + public PlatformTransactionManager getTransactionManager() { + return null; + } + + @Override + public DataSource getTaskDataSource() { + return null; + } + + @Override + public TaskNameResolver getTaskNameResolver() { + return null; + } + } +} diff --git a/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/configuration/MongoTaskConfigurerTests.java b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/configuration/MongoTaskConfigurerTests.java new file mode 100644 index 000000000..c6edeeb52 --- /dev/null +++ b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/configuration/MongoTaskConfigurerTests.java @@ -0,0 +1,223 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.configuration; + +import javax.sql.DataSource; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import org.springframework.cloud.task.repository.TaskExplorer; +import org.springframework.cloud.task.repository.TaskNameResolver; +import org.springframework.cloud.task.repository.TaskRepository; +import org.springframework.cloud.task.repository.support.SimpleTaskExplorer; +import org.springframework.cloud.task.repository.support.SimpleTaskRepository; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.transaction.PlatformTransactionManager; + +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link MongoTaskConfigurer}. + * + * @author JongJun Kim + */ +@Testcontainers +public class MongoTaskConfigurerTests { + + private static final String DATABASE_NAME = "test-task-configurer-db"; + + @Container + static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:6.0") + .withExposedPorts(27017); + + private MongoClient mongoClient; + private MongoOperations mongoOperations; + private TaskProperties taskProperties; + + @Mock + private PlatformTransactionManager transactionManager; + + @BeforeEach + public void setup() { + mongoClient = MongoClients.create(mongoDBContainer.getConnectionString()); + mongoOperations = new MongoTemplate(new SimpleMongoClientDatabaseFactory(mongoClient, DATABASE_NAME)); + taskProperties = new TaskProperties(); + taskProperties.setTablePrefix("TASK_"); + } + + @AfterEach + public void tearDown() { + if (mongoClient != null) { + mongoClient.close(); + } + } + + @Test + public void testMongoTaskConfigurerConstruction() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + + assertThat(configurer).isNotNull(); + assertThat(configurer.getTaskRepository()).isInstanceOf(SimpleTaskRepository.class); + assertThat(configurer.getTaskExplorer()).isInstanceOf(SimpleTaskExplorer.class); + } + + @Test + public void testMongoTaskConfigurerWithTransactionManager() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties, transactionManager); + + assertThat(configurer).isNotNull(); + assertThat(configurer.getTaskRepository()).isInstanceOf(SimpleTaskRepository.class); + assertThat(configurer.getTaskExplorer()).isInstanceOf(SimpleTaskExplorer.class); + assertThat(configurer.getTransactionManager()).isSameAs(transactionManager); + } + + @Test + public void testGetTaskDataSource() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + DataSource dataSource = configurer.getTaskDataSource(); + + // MongoDB configurer doesn't use DataSource, should return null + assertThat(dataSource).isNull(); + } + + @Test + public void testGetTaskNameResolver() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + TaskNameResolver nameResolver = configurer.getTaskNameResolver(); + + assertThat(nameResolver).isNotNull(); + } + + @Test + public void testGetTaskRepository() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + TaskRepository repository = configurer.getTaskRepository(); + + assertThat(repository).isNotNull(); + assertThat(repository).isInstanceOf(SimpleTaskRepository.class); + } + + @Test + public void testGetTaskExplorer() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + TaskExplorer explorer = configurer.getTaskExplorer(); + + assertThat(explorer).isNotNull(); + assertThat(explorer).isInstanceOf(SimpleTaskExplorer.class); + } + + @Test + public void testGetTransactionManager() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties, transactionManager); + PlatformTransactionManager retrievedManager = configurer.getTransactionManager(); + + assertThat(retrievedManager).isSameAs(transactionManager); + } + + @Test + public void testGetTransactionManagerWithoutTransactionManager() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + PlatformTransactionManager retrievedManager = configurer.getTransactionManager(); + + assertThat(retrievedManager).isNull(); + } + + @Test + public void testConstructorWithNullMongoOperations() { + assertThatThrownBy(() -> new MongoTaskConfigurer(null, taskProperties)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("mongoOperations must not be null"); + } + + @Test + public void testConstructorWithNullTaskProperties() { + assertThatThrownBy(() -> new MongoTaskConfigurer(mongoOperations, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskProperties must not be null"); + } + + @Test + public void testConstructorWithTransactionManagerAndNullMongoOperations() { + assertThatThrownBy(() -> new MongoTaskConfigurer(null, taskProperties, transactionManager)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("mongoOperations must not be null"); + } + + @Test + public void testConstructorWithTransactionManagerAndNullTaskProperties() { + assertThatThrownBy(() -> new MongoTaskConfigurer(mongoOperations, null, transactionManager)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskProperties must not be null"); + } + + @Test + public void testRepositoryAndExplorerConsistency() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + + TaskRepository repository = configurer.getTaskRepository(); + TaskExplorer explorer = configurer.getTaskExplorer(); + + // Both should be created and of correct type + assertThat(repository).isInstanceOf(SimpleTaskRepository.class); + assertThat(explorer).isInstanceOf(SimpleTaskExplorer.class); + + // Both should be non-null and functional + assertThat(repository).isNotNull(); + assertThat(explorer).isNotNull(); + } + + @Test + public void testCustomTablePrefix() { + TaskProperties customProperties = new TaskProperties(); + customProperties.setTablePrefix("CUSTOM_PREFIX_"); + + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, customProperties); + TaskRepository repository = configurer.getTaskRepository(); + TaskExplorer explorer = configurer.getTaskExplorer(); + + // Verify components are created with custom properties + assertThat(repository).isNotNull(); + assertThat(explorer).isNotNull(); + } + + @Test + public void testMultipleCallsReturnSameInstance() { + MongoTaskConfigurer configurer = new MongoTaskConfigurer(mongoOperations, taskProperties); + + // Multiple calls should return the same instances (singleton behavior) + TaskRepository repository1 = configurer.getTaskRepository(); + TaskRepository repository2 = configurer.getTaskRepository(); + assertThat(repository1).isSameAs(repository2); + + TaskExplorer explorer1 = configurer.getTaskExplorer(); + TaskExplorer explorer2 = configurer.getTaskExplorer(); + assertThat(explorer1).isSameAs(explorer2); + } +} diff --git a/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/dao/MongoLockRepositoryTests.java b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/dao/MongoLockRepositoryTests.java new file mode 100644 index 000000000..4c89d16b0 --- /dev/null +++ b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/dao/MongoLockRepositoryTests.java @@ -0,0 +1,409 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.repository.dao; + +import java.time.LocalDateTime; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.cloud.task.configuration.TaskProperties; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link MongoLockRepository}. + * + * @author JongJun Kim + */ +@Testcontainers +public class MongoLockRepositoryTests { + + private static final String DATABASE_NAME = "test-lock-db"; + + @Container + static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:6.0") + .withExposedPorts(27017); + + private MongoClient mongoClient; + private MongoOperations mongoOperations; + private TaskProperties taskProperties; + private MongoLockRepository lockRepository; + + @BeforeEach + public void setup() { + mongoClient = MongoClients.create(mongoDBContainer.getConnectionString()); + mongoOperations = new MongoTemplate(new SimpleMongoClientDatabaseFactory(mongoClient, DATABASE_NAME)); + taskProperties = new TaskProperties(); + taskProperties.setTablePrefix("TASK_"); + taskProperties.setSingleInstanceLockTtl(30000); // 30 seconds + + // Clean up any existing collections + cleanupCollections(); + + lockRepository = new MongoLockRepository(mongoOperations, taskProperties); + } + + @AfterEach + public void tearDown() { + if (mongoClient != null) { + cleanupCollections(); + mongoClient.close(); + } + } + + private void cleanupCollections() { + if (mongoOperations.collectionExists("TASK_task_locks")) { + mongoOperations.getCollection("TASK_task_locks").drop(); + } + } + + @Test + public void testConstructorWithValidParameters() { + MongoLockRepository repository = new MongoLockRepository(mongoOperations, taskProperties); + assertThat(repository).isNotNull(); + + // Verify collection was created + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isTrue(); + } + + @Test + public void testConstructorWithNullMongoOperations() { + assertThatThrownBy(() -> new MongoLockRepository(null, taskProperties)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("mongoOperations must not be null"); + } + + @Test + public void testConstructorWithNullTaskProperties() { + assertThatThrownBy(() -> new MongoLockRepository(mongoOperations, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskProperties must not be null"); + } + + @Test + public void testObtainLock() { + Lock lock = lockRepository.obtain("testKey"); + assertThat(lock).isNotNull(); + } + + @Test + public void testObtainLockWithNullKey() { + assertThatThrownBy(() -> lockRepository.obtain(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("lockKey must not be null"); + } + + @Test + public void testTryLockSuccess() { + Lock lock = lockRepository.obtain("testKey1"); + boolean acquired = lock.tryLock(); + assertThat(acquired).isTrue(); + + // Verify lock document exists in MongoDB + Query lockQuery = Query.query( + Criteria.where("lockKey").is("testKey1") + .and("region").is("default") + ); + boolean lockExists = mongoOperations.exists(lockQuery, "TASK_task_locks"); + assertThat(lockExists).isTrue(); + + lock.unlock(); + } + + @Test + public void testTryLockFailureWhenAlreadyLocked() throws InterruptedException { + // Use different repository instances to simulate different clients + MongoLockRepository lockRepository1 = new MongoLockRepository(mongoOperations, taskProperties); + MongoLockRepository lockRepository2 = new MongoLockRepository(mongoOperations, taskProperties); + + Lock lock1 = lockRepository1.obtain("testKey2"); + Lock lock2 = lockRepository2.obtain("testKey2"); + + boolean acquired1 = lock1.tryLock(); + assertThat(acquired1).isTrue(); + + // Small delay to ensure lock is persisted + Thread.sleep(100); + + boolean acquired2 = lock2.tryLock(); + assertThat(acquired2).isFalse(); + + lock1.unlock(); + } + + @Test + public void testTryLockWithTimeout() throws InterruptedException { + Lock lock = lockRepository.obtain("testKey3"); + boolean acquired = lock.tryLock(100, TimeUnit.MILLISECONDS); + assertThat(acquired).isTrue(); + + lock.unlock(); + } + + @Test + public void testUnlock() { + Lock lock = lockRepository.obtain("testKey4"); + lock.tryLock(); + + // Verify lock exists + Query lockQuery = Query.query( + Criteria.where("lockKey").is("testKey4") + .and("region").is("default") + ); + boolean lockExists = mongoOperations.exists(lockQuery, "TASK_task_locks"); + assertThat(lockExists).isTrue(); + + lock.unlock(); + + // Verify lock was removed + boolean lockExistsAfterUnlock = mongoOperations.exists(lockQuery, "TASK_task_locks"); + assertThat(lockExistsAfterUnlock).isFalse(); + } + + @Test + public void testUnlockWhenNotLocked() { + Lock lock = lockRepository.obtain("testKey5"); + // Should not throw exception when unlocking a lock that was never acquired + lock.unlock(); + } + + + @Test + public void testLockExpirationWithDifferentClients() { + // Create a lock with very short TTL + TaskProperties shortTtlProperties = new TaskProperties(); + shortTtlProperties.setTablePrefix("TASK_"); + shortTtlProperties.setSingleInstanceLockTtl(100); // 0.1 second + + // Test that a lock can be obtained by different client after expiration + // This simulates the most realistic scenario + MongoLockRepository.TaskLockDocument expiredLock = new MongoLockRepository.TaskLockDocument(); + expiredLock.id = "expiredKey2:default"; + expiredLock.lockKey = "expiredKey2"; + expiredLock.region = "default"; + expiredLock.clientId = "different-expired-client"; + expiredLock.createdDate = LocalDateTime.now().minusMinutes(5); + expiredLock.expirationDate = LocalDateTime.now().minusMinutes(1); + + mongoOperations.save(expiredLock, "TASK_task_locks"); + + // New repository with different client ID should be able to acquire the expired lock + MongoLockRepository newRepository = new MongoLockRepository(mongoOperations, taskProperties); + Lock lock = newRepository.obtain("expiredKey2"); + boolean acquired = lock.tryLock(); + assertThat(acquired).isTrue(); + + lock.unlock(); + } + + @Test + public void testSameClientCanExtendLock() { + Lock lock = lockRepository.obtain("testKey7"); + boolean acquired1 = lock.tryLock(); + assertThat(acquired1).isTrue(); + + // Same lock instance from same repository should be able to extend the lock + boolean acquired2 = lock.tryLock(); + assertThat(acquired2).isTrue(); + + lock.unlock(); + } + + @Test + public void testSameRepositoryDifferentLocks() { + // Same repository instance, different lock keys - should both succeed + Lock lock1 = lockRepository.obtain("testKey7a"); + Lock lock2 = lockRepository.obtain("testKey7b"); + + boolean acquired1 = lock1.tryLock(); + assertThat(acquired1).isTrue(); + + boolean acquired2 = lock2.tryLock(); + assertThat(acquired2).isTrue(); + + lock1.unlock(); + lock2.unlock(); + } + + @Test + public void testConcurrentLockAttempts() throws InterruptedException { + final String lockKey = "concurrentTestKey"; + final int threadCount = 3; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch endLatch = new CountDownLatch(threadCount); + final AtomicBoolean[] results = new AtomicBoolean[threadCount]; + + for (int i = 0; i < threadCount; i++) { + final int index = i; + results[i] = new AtomicBoolean(false); + + new Thread(() -> { + try { + // Create separate repository instances to simulate different clients + MongoLockRepository repository = new MongoLockRepository(mongoOperations, taskProperties); + Lock lock = repository.obtain(lockKey); + + // Wait for all threads to be ready + startLatch.await(); + + boolean acquired = lock.tryLock(); + results[index].set(acquired); + if (acquired) { + // Hold the lock for a short time + Thread.sleep(50); + lock.unlock(); + } + } + catch (Exception e) { + // Test failure - ignore silently for this test + } + finally { + endLatch.countDown(); + } + }).start(); + } + + // Start all threads simultaneously + startLatch.countDown(); + + // Wait for all threads to complete + boolean completed = endLatch.await(10, TimeUnit.SECONDS); + assertThat(completed).isTrue(); + + // Only one thread should have successfully acquired the lock initially + int successCount = 0; + for (AtomicBoolean result : results) { + if (result.get()) { + successCount++; + } + } + // At least 1 should succeed (may be more due to sequential access after unlock) + assertThat(successCount).isGreaterThanOrEqualTo(1); + } + + @Test + public void testLockInterruptibly() throws InterruptedException { + Lock lock = lockRepository.obtain("interruptibleTestKey"); + lock.lockInterruptibly(); + + // Verify lock was acquired + Query lockQuery = Query.query( + Criteria.where("lockKey").is("interruptibleTestKey") + .and("region").is("default") + ); + boolean lockExists = mongoOperations.exists(lockQuery, "TASK_task_locks"); + assertThat(lockExists).isTrue(); + + lock.unlock(); + } + + @Test + public void testLockMethod() { + Lock lock = lockRepository.obtain("lockMethodTestKey"); + lock.lock(); + + // Verify lock was acquired + Query lockQuery = Query.query( + Criteria.where("lockKey").is("lockMethodTestKey") + .and("region").is("default") + ); + boolean lockExists = mongoOperations.exists(lockQuery, "TASK_task_locks"); + assertThat(lockExists).isTrue(); + + lock.unlock(); + } + + @Test + public void testNewConditionThrowsException() { + Lock lock = lockRepository.obtain("conditionTestKey"); + assertThatThrownBy(lock::newCondition) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Conditions are not supported"); + } + + @Test + public void testCustomTablePrefix() { + TaskProperties customProperties = new TaskProperties(); + customProperties.setTablePrefix("CUSTOM_"); + customProperties.setSingleInstanceLockTtl(30000); + + MongoLockRepository customRepository = new MongoLockRepository(mongoOperations, customProperties); + + // Verify collection was created with custom prefix + assertThat(mongoOperations.collectionExists("CUSTOM_task_locks")).isTrue(); + + Lock lock = customRepository.obtain("customPrefixTestKey"); + lock.tryLock(); + + // Verify lock document exists in custom collection + Query lockQuery = Query.query( + Criteria.where("lockKey").is("customPrefixTestKey") + .and("region").is("default") + ); + boolean lockExists = mongoOperations.exists(lockQuery, "CUSTOM_task_locks"); + assertThat(lockExists).isTrue(); + + lock.unlock(); + + // Clean up custom collection + mongoOperations.getCollection("CUSTOM_task_locks").drop(); + } + + @Test + public void testExpiredLockCleanup() { + // Create expired lock manually + MongoLockRepository.TaskLockDocument expiredLock = new MongoLockRepository.TaskLockDocument(); + expiredLock.id = "expiredKey:default"; + expiredLock.lockKey = "expiredKey"; + expiredLock.region = "default"; + expiredLock.clientId = "expired-client"; + expiredLock.createdDate = LocalDateTime.now().minusHours(1); + expiredLock.expirationDate = LocalDateTime.now().minusMinutes(1); + + mongoOperations.save(expiredLock, "TASK_task_locks"); + + // Verify expired lock exists + Query expiredQuery = Query.query(Criteria.where("lockKey").is("expiredKey")); + boolean expiredExists = mongoOperations.exists(expiredQuery, "TASK_task_locks"); + assertThat(expiredExists).isTrue(); + + // Try to acquire lock - this should trigger cleanup and succeed + Lock lock = lockRepository.obtain("expiredKey"); + boolean acquired = lock.tryLock(); + assertThat(acquired).isTrue(); + + lock.unlock(); + } +} diff --git a/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/dao/MongoTaskExecutionDaoTests.java b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/dao/MongoTaskExecutionDaoTests.java new file mode 100644 index 000000000..2e80d8f58 --- /dev/null +++ b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/dao/MongoTaskExecutionDaoTests.java @@ -0,0 +1,569 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.repository.dao; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.cloud.task.configuration.TaskProperties; +import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; + +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; + +/** + * Tests for {@link MongoTaskExecutionDao}. + * + * @author JongJun Kim + */ +@Testcontainers +public class MongoTaskExecutionDaoTests extends BaseTaskExecutionDaoTestCases { + + private static final String DATABASE_NAME = "test-task-db"; + + @Container + static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:6.0") + .withExposedPorts(27017); + + private MongoClient mongoClient; + private MongoOperations mongoOperations; + private MongoTaskExecutionDao mongoDao; + private TaskProperties taskProperties; + + @BeforeEach + public void setup() { + mongoClient = MongoClients.create(mongoDBContainer.getConnectionString()); + mongoOperations = new MongoTemplate(new SimpleMongoClientDatabaseFactory(mongoClient, DATABASE_NAME)); + taskProperties = new TaskProperties(); + taskProperties.setTablePrefix("TASK_"); + mongoDao = new MongoTaskExecutionDao(mongoOperations, taskProperties); + super.dao = mongoDao; + + // Clean up collections before each test + cleanupCollections(); + } + + @AfterEach + public void tearDown() { + if (mongoClient != null) { + cleanupCollections(); + mongoClient.close(); + } + } + + private void cleanupCollections() { + mongoOperations.getCollection("TASK_task_executions").drop(); + mongoOperations.getCollection("TASK_task_execution_parameters").drop(); + mongoOperations.getCollection("TASK_task_batch_associations").drop(); + mongoOperations.getCollection("TASK_task_sequence").drop(); + } + + @Test + public void testCreateTaskExecution() { + String taskName = "testTask"; + LocalDateTime startTime = LocalDateTime.now(); + List arguments = Arrays.asList("arg1", "arg2", "arg3"); + String externalExecutionId = "ext123"; + + TaskExecution taskExecution = mongoDao.createTaskExecution(taskName, startTime, arguments, externalExecutionId); + + assertThat(taskExecution).isNotNull(); + assertThat(taskExecution.getExecutionId()).isGreaterThan(0); + assertThat(taskExecution.getTaskName()).isEqualTo(taskName); + assertThat(taskExecution.getStartTime()).isEqualTo(startTime); + assertThat(taskExecution.getArguments()).hasSize(3); + assertThat(taskExecution.getArguments()).containsExactlyInAnyOrder("arg1", "arg2", "arg3"); + assertThat(taskExecution.getExternalExecutionId()).isEqualTo(externalExecutionId); + } + + @Test + public void testCreateTaskExecutionWithParentId() { + String taskName = "parentTask"; + LocalDateTime startTime = LocalDateTime.now(); + List arguments = Arrays.asList("arg1"); + String externalExecutionId = "ext123"; + Long parentExecutionId = 100L; + + TaskExecution taskExecution = mongoDao.createTaskExecution(taskName, startTime, arguments, externalExecutionId, parentExecutionId); + + assertThat(taskExecution).isNotNull(); + assertThat(taskExecution.getParentExecutionId()).isEqualTo(parentExecutionId); + assertThat(taskExecution.getTaskName()).isEqualTo(taskName); + assertThat(taskExecution.getExternalExecutionId()).isEqualTo(externalExecutionId); + } + + @Test + public void testCreateTaskExecutionWithNullArguments() { + String taskName = "testTask"; + LocalDateTime startTime = LocalDateTime.now(); + String externalExecutionId = "ext123"; + + TaskExecution taskExecution = mongoDao.createTaskExecution(taskName, startTime, null, externalExecutionId); + + assertThat(taskExecution).isNotNull(); + assertThat(taskExecution.getArguments()).isEmpty(); + } + + @Test + public void testStartTaskExecution() { + long executionId = mongoDao.getNextExecutionId(); + String taskName = "startTask"; + LocalDateTime startTime = LocalDateTime.now(); + List arguments = Arrays.asList("startArg1", "startArg2"); + String externalExecutionId = "startExt123"; + + TaskExecution taskExecution = mongoDao.startTaskExecution(executionId, taskName, startTime, arguments, externalExecutionId); + + assertThat(taskExecution).isNotNull(); + assertThat(taskExecution.getExecutionId()).isEqualTo(executionId); + assertThat(taskExecution.getTaskName()).isEqualTo(taskName); + assertThat(taskExecution.getArguments()).hasSize(2); + assertThat(taskExecution.getExternalExecutionId()).isEqualTo(externalExecutionId); + } + + @Test + public void testCompleteTaskExecution() { + TaskExecution taskExecution = mongoDao.createTaskExecution("completeTask", LocalDateTime.now(), + Arrays.asList("arg1"), "ext123"); + + Integer exitCode = 0; + LocalDateTime endTime = LocalDateTime.now(); + String exitMessage = "Task completed successfully"; + String errorMessage = null; + + mongoDao.completeTaskExecution(taskExecution.getExecutionId(), exitCode, endTime, exitMessage, errorMessage); + + TaskExecution completedTask = mongoDao.getTaskExecution(taskExecution.getExecutionId()); + assertThat(completedTask.getExitCode()).isEqualTo(exitCode); + assertThat(completedTask.getEndTime()).isCloseTo(endTime, within(1, ChronoUnit.SECONDS)); + assertThat(completedTask.getExitMessage()).isEqualTo(exitMessage); + assertThat(completedTask.getErrorMessage()).isNull(); + } + + @Test + public void testCompleteTaskExecutionWithError() { + TaskExecution taskExecution = mongoDao.createTaskExecution("errorTask", LocalDateTime.now(), + Arrays.asList("arg1"), "ext123"); + + Integer exitCode = 1; + LocalDateTime endTime = LocalDateTime.now(); + String exitMessage = "Task failed"; + String errorMessage = "Error occurred during execution"; + + mongoDao.completeTaskExecution(taskExecution.getExecutionId(), exitCode, endTime, exitMessage, errorMessage); + + TaskExecution completedTask = mongoDao.getTaskExecution(taskExecution.getExecutionId()); + assertThat(completedTask.getExitCode()).isEqualTo(exitCode); + assertThat(completedTask.getEndTime()).isCloseTo(endTime, within(1, ChronoUnit.SECONDS)); + assertThat(completedTask.getExitMessage()).isEqualTo(exitMessage); + assertThat(completedTask.getErrorMessage()).isEqualTo(errorMessage); + } + + @Test + public void testGetTaskExecution() { + TaskExecution original = mongoDao.createTaskExecution("getTask", LocalDateTime.now(), + Arrays.asList("arg1", "arg2"), "ext123"); + + TaskExecution retrieved = mongoDao.getTaskExecution(original.getExecutionId()); + + assertThat(retrieved).isNotNull(); + assertThat(retrieved.getExecutionId()).isEqualTo(original.getExecutionId()); + assertThat(retrieved.getTaskName()).isEqualTo(original.getTaskName()); + assertThat(retrieved.getArguments()).hasSize(2); + assertThat(retrieved.getExternalExecutionId()).isEqualTo(original.getExternalExecutionId()); + } + + @Test + public void testGetTaskExecutionNonExistent() { + TaskExecution retrieved = mongoDao.getTaskExecution(999L); + assertThat(retrieved).isNull(); + } + + @Test + public void testGetTaskExecutionCountByTaskName() { + String taskName = "countTask"; + mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext1"); + mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution("otherTask", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + long count = mongoDao.getTaskExecutionCountByTaskName(taskName); + assertThat(count).isEqualTo(2); + } + + @Test + public void testGetRunningTaskExecutionCountByTaskName() { + String taskName = "runningTask"; + TaskExecution task1 = mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext1"); + TaskExecution task2 = mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution("otherTask", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + // Complete one task + mongoDao.completeTaskExecution(task2.getExecutionId(), 0, LocalDateTime.now(), "completed"); + + long runningCount = mongoDao.getRunningTaskExecutionCountByTaskName(taskName); + assertThat(runningCount).isEqualTo(1); + } + + @Test + public void testGetRunningTaskExecutionCount() { + TaskExecution task1 = mongoDao.createTaskExecution("task1", LocalDateTime.now(), Collections.emptyList(), "ext1"); + TaskExecution task2 = mongoDao.createTaskExecution("task2", LocalDateTime.now(), Collections.emptyList(), "ext2"); + TaskExecution task3 = mongoDao.createTaskExecution("task3", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + // Complete one task + mongoDao.completeTaskExecution(task2.getExecutionId(), 0, LocalDateTime.now(), "completed"); + + long runningCount = mongoDao.getRunningTaskExecutionCount(); + assertThat(runningCount).isEqualTo(2); + } + + @Test + public void testGetTaskExecutionCount() { + mongoDao.createTaskExecution("task1", LocalDateTime.now(), Collections.emptyList(), "ext1"); + mongoDao.createTaskExecution("task2", LocalDateTime.now(), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution("task3", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + long totalCount = mongoDao.getTaskExecutionCount(); + assertThat(totalCount).isEqualTo(3); + } + + @Test + public void testFindRunningTaskExecutions() { + String taskName = "runningFindTask"; + mongoDao.createTaskExecution(taskName, LocalDateTime.now().minusHours(2), Collections.emptyList(), "ext1"); + TaskExecution task2 = mongoDao.createTaskExecution(taskName, LocalDateTime.now().minusHours(1), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext3"); + + // Complete one task + mongoDao.completeTaskExecution(task2.getExecutionId(), 0, LocalDateTime.now(), "completed"); + + Pageable pageable = PageRequest.of(0, 10); + Page runningTasks = mongoDao.findRunningTaskExecutions(taskName, pageable); + + assertThat(runningTasks.getTotalElements()).isEqualTo(2); + assertThat(runningTasks.getContent()).hasSize(2); + } + + @Test + public void testFindTaskExecutionsByExternalExecutionId() { + String externalId = "external123"; + mongoDao.createTaskExecution("task1", LocalDateTime.now(), Collections.emptyList(), externalId); + mongoDao.createTaskExecution("task2", LocalDateTime.now(), Collections.emptyList(), externalId); + mongoDao.createTaskExecution("task3", LocalDateTime.now(), Collections.emptyList(), "other"); + + Pageable pageable = PageRequest.of(0, 10); + Page tasks = mongoDao.findTaskExecutionsByExternalExecutionId(externalId, pageable); + + assertThat(tasks.getTotalElements()).isEqualTo(2); + assertThat(tasks.getContent()).hasSize(2); + tasks.getContent().forEach(task -> + assertThat(task.getExternalExecutionId()).isEqualTo(externalId)); + } + + @Test + public void testGetTaskExecutionCountByExternalExecutionId() { + String externalId = "external123"; + mongoDao.createTaskExecution("task1", LocalDateTime.now(), Collections.emptyList(), externalId); + mongoDao.createTaskExecution("task2", LocalDateTime.now(), Collections.emptyList(), externalId); + mongoDao.createTaskExecution("task3", LocalDateTime.now(), Collections.emptyList(), "other"); + + long count = mongoDao.getTaskExecutionCountByExternalExecutionId(externalId); + assertThat(count).isEqualTo(2); + } + + @Test + public void testFindTaskExecutionsByName() { + String taskName = "findByNameTask"; + mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext1"); + mongoDao.createTaskExecution(taskName, LocalDateTime.now(), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution("otherTask", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + Pageable pageable = PageRequest.of(0, 10); + Page tasks = mongoDao.findTaskExecutionsByName(taskName, pageable); + + assertThat(tasks.getTotalElements()).isEqualTo(2); + assertThat(tasks.getContent()).hasSize(2); + tasks.getContent().forEach(task -> + assertThat(task.getTaskName()).isEqualTo(taskName)); + } + + @Test + public void testGetTaskNames() { + mongoDao.createTaskExecution("taskA", LocalDateTime.now(), Collections.emptyList(), "ext1"); + mongoDao.createTaskExecution("taskB", LocalDateTime.now(), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution("taskA", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + List taskNames = mongoDao.getTaskNames(); + assertThat(taskNames).hasSize(2); + assertThat(taskNames).containsExactlyInAnyOrder("taskA", "taskB"); + } + + @Test + public void testFindAllPaginated() { + mongoDao.createTaskExecution("task1", LocalDateTime.now(), Collections.emptyList(), "ext1"); + mongoDao.createTaskExecution("task2", LocalDateTime.now(), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution("task3", LocalDateTime.now(), Collections.emptyList(), "ext3"); + + Pageable pageable = PageRequest.of(0, 2); + Page tasks = mongoDao.findAll(pageable); + + assertThat(tasks.getTotalElements()).isEqualTo(3); + assertThat(tasks.getContent()).hasSize(2); + assertThat(tasks.getTotalPages()).isEqualTo(2); + } + + @Test + public void testGetNextExecutionId() { + long id1 = mongoDao.getNextExecutionId(); + long id2 = mongoDao.getNextExecutionId(); + long id3 = mongoDao.getNextExecutionId(); + + assertThat(id1).isGreaterThan(0); + assertThat(id2).isGreaterThan(id1); + assertThat(id3).isGreaterThan(id2); + } + + @Test + public void testTaskBatchAssociations() { + long taskExecutionId = mongoDao.getNextExecutionId(); + long jobExecutionId1 = 100L; + long jobExecutionId2 = 200L; + + // Create associations + mongoDao.createTaskBatchAssociation(taskExecutionId, jobExecutionId1); + mongoDao.createTaskBatchAssociation(taskExecutionId, jobExecutionId2); + + // Test getJobExecutionIdsByTaskExecutionId + Set jobExecutionIds = mongoDao.getJobExecutionIdsByTaskExecutionId(taskExecutionId); + assertThat(jobExecutionIds).hasSize(2); + assertThat(jobExecutionIds).contains(jobExecutionId1, jobExecutionId2); + + // Test getTaskExecutionIdByJobExecutionId + Long retrievedTaskExecutionId1 = mongoDao.getTaskExecutionIdByJobExecutionId(jobExecutionId1); + assertThat(retrievedTaskExecutionId1).isEqualTo(taskExecutionId); + + Long retrievedTaskExecutionId2 = mongoDao.getTaskExecutionIdByJobExecutionId(jobExecutionId2); + assertThat(retrievedTaskExecutionId2).isEqualTo(taskExecutionId); + } + + @Test + public void testDeleteTaskBatchAssociationsByTaskExecutionId() { + long taskExecutionId = mongoDao.getNextExecutionId(); + long jobExecutionId1 = 100L; + long jobExecutionId2 = 200L; + + mongoDao.createTaskBatchAssociation(taskExecutionId, jobExecutionId1); + mongoDao.createTaskBatchAssociation(taskExecutionId, jobExecutionId2); + + mongoDao.deleteTaskBatchAssociationsByTaskExecutionId(taskExecutionId); + + Set jobExecutionIds = mongoDao.getJobExecutionIdsByTaskExecutionId(taskExecutionId); + assertThat(jobExecutionIds).isEmpty(); + } + + @Test + public void testDeleteTaskBatchAssociationsByJobExecutionId() { + long taskExecutionId = mongoDao.getNextExecutionId(); + long jobExecutionId = 100L; + + mongoDao.createTaskBatchAssociation(taskExecutionId, jobExecutionId); + + mongoDao.deleteTaskBatchAssociationsByJobExecutionId(jobExecutionId); + + Long retrievedTaskExecutionId = mongoDao.getTaskExecutionIdByJobExecutionId(jobExecutionId); + assertThat(retrievedTaskExecutionId).isNull(); + } + + @Test + public void testUpdateExternalExecutionId() { + TaskExecution taskExecution = mongoDao.createTaskExecution("updateTask", LocalDateTime.now(), + Collections.emptyList(), "original"); + + String newExternalId = "updated"; + mongoDao.updateExternalExecutionId(taskExecution.getExecutionId(), newExternalId); + + TaskExecution updatedTask = mongoDao.getTaskExecution(taskExecution.getExecutionId()); + assertThat(updatedTask.getExternalExecutionId()).isEqualTo(newExternalId); + } + + @Test + public void testGetLatestTaskExecutionForTaskName() { + String taskName = "latestTask"; + mongoDao.createTaskExecution(taskName, LocalDateTime.now().minusHours(3), Collections.emptyList(), "ext1"); + TaskExecution latest = mongoDao.createTaskExecution(taskName, LocalDateTime.now().minusHours(1), Collections.emptyList(), "ext2"); + mongoDao.createTaskExecution(taskName, LocalDateTime.now().minusHours(2), Collections.emptyList(), "ext3"); + + TaskExecution retrievedLatest = mongoDao.getLatestTaskExecutionForTaskName(taskName); + assertThat(retrievedLatest).isNotNull(); + assertThat(retrievedLatest.getExecutionId()).isEqualTo(latest.getExecutionId()); + assertThat(retrievedLatest.getExternalExecutionId()).isEqualTo("ext2"); + } + + @Test + public void testGetLatestTaskExecutionsByTaskNames() { + String taskName1 = "latestTask1"; + String taskName2 = "latestTask2"; + + mongoDao.createTaskExecution(taskName1, LocalDateTime.now().minusHours(2), Collections.emptyList(), "ext1"); + TaskExecution latest1 = mongoDao.createTaskExecution(taskName1, LocalDateTime.now().minusHours(1), Collections.emptyList(), "ext2"); + + TaskExecution latest2 = mongoDao.createTaskExecution(taskName2, LocalDateTime.now().minusHours(1), Collections.emptyList(), "ext3"); + + List latestExecutions = mongoDao.getLatestTaskExecutionsByTaskNames(taskName1, taskName2); + assertThat(latestExecutions).hasSize(2); + + TaskExecution retrieved1 = latestExecutions.stream() + .filter(te -> taskName1.equals(te.getTaskName())) + .findFirst() + .orElse(null); + assertThat(retrieved1).isNotNull(); + assertThat(retrieved1.getExecutionId()).isEqualTo(latest1.getExecutionId()); + + TaskExecution retrieved2 = latestExecutions.stream() + .filter(te -> taskName2.equals(te.getTaskName())) + .findFirst() + .orElse(null); + assertThat(retrieved2).isNotNull(); + assertThat(retrieved2.getExecutionId()).isEqualTo(latest2.getExecutionId()); + } + + @Test + public void testConstructorWithInvalidObject() { + assertThatThrownBy(() -> new MongoTaskExecutionDao("not a MongoOperations", taskProperties)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Provided object is not a MongoOperations instance"); + } + + @Test + public void testConstructorWithNullMongoOperations() { + assertThatThrownBy(() -> new MongoTaskExecutionDao((MongoOperations) null, taskProperties)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("mongoOperations must not be null"); + } + + @Test + public void testConstructorWithNullTaskProperties() { + assertThatThrownBy(() -> new MongoTaskExecutionDao(mongoOperations, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskProperties must not be null"); + } + + @Test + public void testTablePrefixInCollectionNames() { + TaskProperties customTaskProperties = new TaskProperties(); + customTaskProperties.setTablePrefix("CUSTOM_"); + MongoTaskExecutionDao customDao = new MongoTaskExecutionDao(mongoOperations, customTaskProperties); + + TaskExecution taskExecution = customDao.createTaskExecution("prefixTest", LocalDateTime.now(), + Collections.emptyList(), "ext123"); + + // Verify that collections are created with custom prefix + boolean executionCollectionExists = mongoOperations.collectionExists("CUSTOM_task_executions"); + boolean paramsCollectionExists = mongoOperations.collectionExists("CUSTOM_task_execution_parameters"); + boolean sequenceCollectionExists = mongoOperations.collectionExists("CUSTOM_task_sequence"); + + assertThat(executionCollectionExists).isTrue(); + assertThat(sequenceCollectionExists).isTrue(); + + // Clean up custom collections + mongoOperations.getCollection("CUSTOM_task_executions").drop(); + mongoOperations.getCollection("CUSTOM_task_execution_parameters").drop(); + mongoOperations.getCollection("CUSTOM_task_sequence").drop(); + } + + // Validation tests for improved input checking + @Test + public void testCreateTaskExecutionWithNullTaskName() { + assertThatThrownBy(() -> mongoDao.createTaskExecution(null, LocalDateTime.now(), + Collections.emptyList(), "ext123")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskName must not be null or empty"); + } + + @Test + public void testCreateTaskExecutionWithEmptyTaskName() { + assertThatThrownBy(() -> mongoDao.createTaskExecution("", LocalDateTime.now(), + Collections.emptyList(), "ext123")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskName must not be null or empty"); + } + + @Test + public void testCreateTaskExecutionWithNullStartTime() { + assertThatThrownBy(() -> mongoDao.createTaskExecution("testTask", null, + Collections.emptyList(), "ext123")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("startTime must not be null"); + } + + @Test + public void testCompleteTaskExecutionWithNullEndTime() { + TaskExecution task = mongoDao.createTaskExecution("testTask", LocalDateTime.now(), + Collections.emptyList(), "ext123"); + + assertThatThrownBy(() -> mongoDao.completeTaskExecution(task.getExecutionId(), 0, null, "completed")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("endTime must not be null"); + } + + @Test + public void testCompleteTaskExecutionWithNonExistentId() { + assertThatThrownBy(() -> mongoDao.completeTaskExecution(999L, 0, LocalDateTime.now(), "completed")) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid TaskExecution, ID 999 not found."); + } + + @Test + public void testUpdateExternalExecutionIdWithNonExistentId() { + assertThatThrownBy(() -> mongoDao.updateExternalExecutionId(999L, "newExt")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid TaskExecution, ID 999 not found."); + } + + @Test + public void testGetLatestTaskExecutionsByTaskNamesWithEmptyArray() { + assertThatThrownBy(() -> mongoDao.getLatestTaskExecutionsByTaskNames()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("At least 1 task name must be provided."); + } + + @Test + public void testGetLatestTaskExecutionsByTaskNamesWithNullElements() { + assertThatThrownBy(() -> mongoDao.getLatestTaskExecutionsByTaskNames("validTask", null, "anotherValidTask")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Task names must not contain any empty elements"); + } +} diff --git a/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/support/MongoTaskRepositoryInitializerTests.java b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/support/MongoTaskRepositoryInitializerTests.java new file mode 100644 index 000000000..fc3c23084 --- /dev/null +++ b/spring-cloud-task-core/src/test/java/org/springframework/cloud/task/repository/support/MongoTaskRepositoryInitializerTests.java @@ -0,0 +1,322 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.repository.support; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.cloud.task.configuration.TaskProperties; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.data.mongodb.core.index.IndexInfo; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.test.util.ReflectionTestUtils; + +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link MongoTaskRepositoryInitializer}. + * + * @author JongJun Kim + */ +@Testcontainers +public class MongoTaskRepositoryInitializerTests { + + private static final String DATABASE_NAME = "test-task-init-db"; + + @Container + static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:6.0") + .withExposedPorts(27017); + + private MongoClient mongoClient; + private MongoOperations mongoOperations; + private TaskProperties taskProperties; + private MongoTaskRepositoryInitializer initializer; + + @BeforeEach + public void setup() { + mongoClient = MongoClients.create(mongoDBContainer.getConnectionString()); + mongoOperations = new MongoTemplate(new SimpleMongoClientDatabaseFactory(mongoClient, DATABASE_NAME)); + taskProperties = new TaskProperties(); + taskProperties.setTablePrefix("TASK_"); + + // Clean up any existing collections + cleanupCollections(); + + initializer = new MongoTaskRepositoryInitializer(mongoOperations, taskProperties); + } + + @AfterEach + public void tearDown() { + if (mongoClient != null) { + cleanupCollections(); + mongoClient.close(); + } + } + + private void cleanupCollections() { + List collectionNames = Arrays.asList( + "TASK_task_executions", + "TASK_task_execution_parameters", + "TASK_task_batch_associations", + "TASK_task_sequence", + "TASK_task_locks", + "CUSTOM_task_executions", + "CUSTOM_task_execution_parameters", + "CUSTOM_task_batch_associations", + "CUSTOM_task_sequence", + "CUSTOM_task_locks" + ); + + for (String collectionName : collectionNames) { + if (mongoOperations.collectionExists(collectionName)) { + mongoOperations.getCollection(collectionName).drop(); + } + } + } + + @Test + public void testConstructorWithValidParameters() { + MongoTaskRepositoryInitializer init = new MongoTaskRepositoryInitializer(mongoOperations, taskProperties); + assertThat(init).isNotNull(); + } + + @Test + public void testConstructorWithNullMongoOperations() { + assertThatThrownBy(() -> new MongoTaskRepositoryInitializer(null, taskProperties)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("mongoOperations must not be null"); + } + + @Test + public void testConstructorWithNullTaskProperties() { + assertThatThrownBy(() -> new MongoTaskRepositoryInitializer(mongoOperations, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("taskProperties must not be null"); + } + + @Test + public void testAfterPropertiesSetCreatesCollections() throws Exception { + // Set initialization enabled + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", true); + + // Verify collections don't exist initially + assertThat(mongoOperations.collectionExists("TASK_task_executions")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_execution_parameters")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_batch_associations")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_sequence")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isFalse(); + + initializer.afterPropertiesSet(); + + // Verify collections were created + assertThat(mongoOperations.collectionExists("TASK_task_executions")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_execution_parameters")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_batch_associations")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_sequence")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isTrue(); + } + + @Test + public void testAfterPropertiesSetCreatesIndexes() throws Exception { + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", true); + + initializer.afterPropertiesSet(); + + // Verify task executions indexes + List taskExecutionIndexes = mongoOperations.indexOps("TASK_task_executions").getIndexInfo(); + assertThat(taskExecutionIndexes.size()).isGreaterThan(1); // Should have multiple indexes beyond the default _id index + + // Check for specific indexes + boolean hasTaskNameIndex = taskExecutionIndexes.stream() + .anyMatch(index -> "idx_task_name".equals(index.getName())); + assertThat(hasTaskNameIndex).isTrue(); + + boolean hasStartTimeIndex = taskExecutionIndexes.stream() + .anyMatch(index -> "idx_start_time".equals(index.getName())); + assertThat(hasStartTimeIndex).isTrue(); + + boolean hasEndTimeIndex = taskExecutionIndexes.stream() + .anyMatch(index -> "idx_end_time".equals(index.getName())); + assertThat(hasEndTimeIndex).isTrue(); + + boolean hasExternalExecutionIdIndex = taskExecutionIndexes.stream() + .anyMatch(index -> "idx_external_execution_id".equals(index.getName())); + assertThat(hasExternalExecutionIdIndex).isTrue(); + + // Verify task parameters indexes + List taskParameterIndexes = mongoOperations.indexOps("TASK_task_execution_parameters").getIndexInfo(); + boolean hasTaskExecutionIdIndex = taskParameterIndexes.stream() + .anyMatch(index -> "idx_task_execution_id".equals(index.getName())); + assertThat(hasTaskExecutionIdIndex).isTrue(); + + // Verify batch association indexes + List batchAssociationIndexes = mongoOperations.indexOps("TASK_task_batch_associations").getIndexInfo(); + boolean hasTaskExecIdIndex = batchAssociationIndexes.stream() + .anyMatch(index -> "idx_task_execution_id".equals(index.getName())); + assertThat(hasTaskExecIdIndex).isTrue(); + + boolean hasJobExecIdIndex = batchAssociationIndexes.stream() + .anyMatch(index -> "idx_job_execution_id".equals(index.getName())); + assertThat(hasJobExecIdIndex).isTrue(); + + // Verify lock indexes + List lockIndexes = mongoOperations.indexOps("TASK_task_locks").getIndexInfo(); + boolean hasLockKeyRegionIndex = lockIndexes.stream() + .anyMatch(index -> "idx_lock_key_region".equals(index.getName())); + assertThat(hasLockKeyRegionIndex).isTrue(); + } + + @Test + public void testAfterPropertiesSetInitializesSequence() throws Exception { + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", true); + + initializer.afterPropertiesSet(); + + // Verify sequence document was created + Query sequenceQuery = Query.query(Criteria.where("_id").is("task_seq")); + boolean sequenceExists = mongoOperations.exists(sequenceQuery, "TASK_task_sequence"); + assertThat(sequenceExists).isTrue(); + } + + @Test + public void testAfterPropertiesSetWithInitializationDisabled() throws Exception { + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", false); + + initializer.afterPropertiesSet(); + + // Verify collections were not created + assertThat(mongoOperations.collectionExists("TASK_task_executions")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_execution_parameters")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_batch_associations")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_sequence")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isFalse(); + } + + @Test + public void testAfterPropertiesSetWithTaskPropertiesDisabled() throws Exception { + taskProperties.setInitializeEnabled(false); + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", true); + + initializer.afterPropertiesSet(); + + // Verify collections were not created because task properties disabled it + assertThat(mongoOperations.collectionExists("TASK_task_executions")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_execution_parameters")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_batch_associations")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_sequence")).isFalse(); + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isFalse(); + } + + @Test + public void testAfterPropertiesSetWithTaskPropertiesEnabled() throws Exception { + taskProperties.setInitializeEnabled(true); + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", false); + + initializer.afterPropertiesSet(); + + // Verify collections were created because task properties enabled it (overrides field) + assertThat(mongoOperations.collectionExists("TASK_task_executions")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_execution_parameters")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_batch_associations")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_sequence")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isTrue(); + } + + @Test + public void testCustomTablePrefix() throws Exception { + TaskProperties customTaskProperties = new TaskProperties(); + customTaskProperties.setTablePrefix("CUSTOM_"); + + MongoTaskRepositoryInitializer customInitializer = + new MongoTaskRepositoryInitializer(mongoOperations, customTaskProperties); + ReflectionTestUtils.setField(customInitializer, "taskInitializationEnabled", true); + + customInitializer.afterPropertiesSet(); + + // Verify collections were created with custom prefix + assertThat(mongoOperations.collectionExists("CUSTOM_task_executions")).isTrue(); + assertThat(mongoOperations.collectionExists("CUSTOM_task_execution_parameters")).isTrue(); + assertThat(mongoOperations.collectionExists("CUSTOM_task_batch_associations")).isTrue(); + assertThat(mongoOperations.collectionExists("CUSTOM_task_sequence")).isTrue(); + assertThat(mongoOperations.collectionExists("CUSTOM_task_locks")).isTrue(); + + // Verify sequence document was created with custom prefix + Query sequenceQuery = Query.query(Criteria.where("_id").is("task_seq")); + boolean sequenceExists = mongoOperations.exists(sequenceQuery, "CUSTOM_task_sequence"); + assertThat(sequenceExists).isTrue(); + } + + @Test + public void testIdempotentInitialization() throws Exception { + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", true); + + // Run initialization twice + initializer.afterPropertiesSet(); + initializer.afterPropertiesSet(); + + // Verify collections still exist and no errors occurred + assertThat(mongoOperations.collectionExists("TASK_task_executions")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_execution_parameters")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_batch_associations")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_sequence")).isTrue(); + assertThat(mongoOperations.collectionExists("TASK_task_locks")).isTrue(); + } + + @Test + public void testSequenceInitializationDoesNotOverwrite() throws Exception { + ReflectionTestUtils.setField(initializer, "taskInitializationEnabled", true); + + // Initialize first time + initializer.afterPropertiesSet(); + + // Manually update the sequence + Query sequenceQuery = Query.query(Criteria.where("_id").is("task_seq")); + Object sequenceDoc = mongoOperations.findOne(sequenceQuery, Object.class, "TASK_task_sequence"); + assertThat(sequenceDoc).isNotNull(); + + // Update sequence value + java.util.Map updatedDoc = new java.util.HashMap<>(); + updatedDoc.put("_id", "task_seq"); + updatedDoc.put("sequence", 100L); + mongoOperations.save(updatedDoc, "TASK_task_sequence"); + + // Initialize again + initializer.afterPropertiesSet(); + + // Verify sequence wasn't reset + Object finalSequenceDoc = mongoOperations.findOne(sequenceQuery, Map.class, "TASK_task_sequence"); + @SuppressWarnings("unchecked") + java.util.Map sequenceMap = (Map) finalSequenceDoc; + assertThat(sequenceMap.get("sequence")).isEqualTo(100L); + } +}