diff --git a/backend/services/data-collection-service/pom.xml b/backend/services/data-collection-service/pom.xml index 1ea9d3c4..6600bf11 100644 --- a/backend/services/data-collection-service/pom.xml +++ b/backend/services/data-collection-service/pom.xml @@ -127,35 +127,6 @@ - - - org.openapitools - openapi-generator-maven-plugin - 6.6.0 - - - - generate - - - ${project.basedir}/../../openapi/specs/data-collection.yaml - spring - ${project.build.directory}/generated-sources/openapi - com.datamate.collection.interfaces.api - com.datamate.collection.interfaces.dto - - true - true - true - springdoc - java8-localdatetime - true - - - - - - org.springframework.boot spring-boot-maven-plugin diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java new file mode 100644 index 00000000..80bb8e75 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -0,0 +1,79 @@ +package com.datamate.collection.application; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.model.entity.TaskExecution; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.domain.repository.CollectionTaskRepository; +import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery; +import com.datamate.collection.common.enums.SyncMode; +import com.datamate.common.domain.utils.ChunksSaver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Objects; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CollectionTaskService { + private final TaskExecutionService taskExecutionService; + private final CollectionTaskRepository collectionTaskRepository; + + @Transactional + public CollectionTask create(CollectionTask task) { + task.setStatus(TaskStatus.READY); + task.setCreatedAt(LocalDateTime.now()); + task.setUpdatedAt(LocalDateTime.now()); + collectionTaskRepository.save(task); + executeTaskNow(task); + return task; + } + + private void executeTaskNow(CollectionTask task) { + if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) { + TaskExecution exec = taskExecutionService.createExecution(task); + int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); + taskExecutionService.runAsync(task, exec.getId(), timeout); + log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId()); + } + } + + @Transactional + public CollectionTask update(CollectionTask task) { + task.setUpdatedAt(LocalDateTime.now()); + collectionTaskRepository.updateById(task); + return task; + } + + @Transactional + public void delete(String id) { + CollectionTask task = collectionTaskRepository.getById(id); + if (task != null) { + ChunksSaver.deleteFolder("/dataset/local/" + task.getId()); + } + collectionTaskRepository.removeById(id); + } + + public CollectionTask get(String id) { + return collectionTaskRepository.getById(id); + } + + public IPage getTasks(CollectionTaskPagingQuery query) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus()) + .like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName()); + return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper); + } + + public List selectActiveTasks() { + return collectionTaskRepository.selectActiveTasks(); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java new file mode 100644 index 00000000..58455154 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -0,0 +1,57 @@ +package com.datamate.collection.application; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.model.entity.TaskExecution; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.collection.domain.repository.CollectionTaskRepository; +import com.datamate.collection.domain.repository.TaskExecutionRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TaskExecutionService { + private final ProcessRunner processRunner; + private final TaskExecutionRepository executionRepository; + private final CollectionTaskRepository collectionTaskRepository; + + + @Transactional + public TaskExecution createExecution(CollectionTask task) { + TaskExecution exec = TaskExecution.initTaskExecution(); + exec.setTaskId(task.getId()); + exec.setTaskName(task.getName()); + executionRepository.save(exec); + collectionTaskRepository.updateLastExecution(task.getId(), exec.getId()); + collectionTaskRepository.updateStatus(task.getId(), TaskStatus.RUNNING.name()); + return exec; + } + + public TaskExecution selectLatestByTaskId(String taskId) { + return executionRepository.selectLatestByTaskId(taskId); + } + + @Async + public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { + try { + int code = processRunner.runJob(task, executionId, timeoutSeconds); + log.info("DataX finished with code {} for execution {}", code, executionId); + // 简化:成功即完成 + executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), + 0, 0L, 0L, 0L, null); + collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); + } catch (Exception e) { + log.error("DataX execution failed", e); + executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), + 0, 0L, 0L, 0L, e.getMessage()); + collectionTaskRepository.updateStatus(task.getId(), TaskStatus.FAILED.name()); + } + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java deleted file mode 100644 index d2b6e21a..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.datamate.collection.application.service; - -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.domain.model.DataxTemplate; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; -import com.datamate.collection.interfaces.dto.SyncMode; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -@Slf4j -@Service -@RequiredArgsConstructor -public class CollectionTaskService { - private final CollectionTaskMapper taskMapper; - private final TaskExecutionMapper executionMapper; - private final DataxExecutionService dataxExecutionService; - - @Transactional - public CollectionTask create(CollectionTask task) { - task.setStatus(TaskStatus.READY); - task.setCreatedAt(LocalDateTime.now()); - task.setUpdatedAt(LocalDateTime.now()); - taskMapper.insert(task); - executeTaskNow(task); - return task; - } - - private void executeTaskNow(CollectionTask task) { - if (Objects.equals(task.getSyncMode(), SyncMode.ONCE.getValue())) { - TaskExecution exec = dataxExecutionService.createExecution(task); - int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - dataxExecutionService.runAsync(task, exec.getId(), timeout); - log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId()); - } - } - - @Transactional - public CollectionTask update(CollectionTask task) { - task.setUpdatedAt(LocalDateTime.now()); - taskMapper.update(task); - return task; - } - - @Transactional - public void delete(String id) { taskMapper.deleteById(id); } - - public CollectionTask get(String id) { return taskMapper.selectById(id); } - - public List list(Integer page, Integer size, String status, String name) { - Map p = new HashMap<>(); - p.put("status", status); - p.put("name", name); - if (page != null && size != null) { - p.put("offset", page * size); - p.put("limit", size); - } - return taskMapper.selectAll(p); - } - - @Transactional - public TaskExecution startExecution(CollectionTask task) { - return dataxExecutionService.createExecution(task); - } - - // ---- Template related merged methods ---- - public List listTemplates(String sourceType, String targetType, int page, int size) { - int offset = page * size; - return taskMapper.selectList(sourceType, targetType, offset, size); - } - - public int countTemplates(String sourceType, String targetType) { - return taskMapper.countTemplates(sourceType, targetType); - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java deleted file mode 100644 index fc2eb9e4..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.datamate.collection.application.service; - -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; -import com.datamate.collection.infrastructure.runtime.datax.DataxJobBuilder; -import com.datamate.collection.infrastructure.runtime.datax.DataxProcessRunner; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.nio.file.Path; -import java.time.Duration; -import java.time.LocalDateTime; - -@Slf4j -@Service -@RequiredArgsConstructor -public class DataxExecutionService { - - private final DataxJobBuilder jobBuilder; - private final DataxProcessRunner processRunner; - private final TaskExecutionMapper executionMapper; - private final CollectionTaskMapper taskMapper; - - - @Transactional - public TaskExecution createExecution(CollectionTask task) { - TaskExecution exec = TaskExecution.initTaskExecution(); - exec.setTaskId(task.getId()); - exec.setTaskName(task.getName()); - executionMapper.insert(exec); - taskMapper.updateLastExecution(task.getId(), exec.getId()); - taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name()); - return exec; - } - - @Async - public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { - try { - Path job = jobBuilder.buildJobFile(task); - - int code = processRunner.runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); - log.info("DataX finished with code {} for execution {}", code, executionId); - // 简化:成功即完成 - executionMapper.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), - 0, 0L, 0L, 0L, null, null); - taskMapper.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); - } catch (Exception e) { - log.error("DataX execution failed", e); - executionMapper.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), - 0, 0L, 0L, 0L, e.getMessage(), null); - taskMapper.updateStatus(task.getId(), TaskStatus.FAILED.name()); - } - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java deleted file mode 100644 index 68f9e3b6..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.datamate.collection.application.service; - -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Service -@RequiredArgsConstructor -public class TaskExecutionService { - private final TaskExecutionMapper executionMapper; - private final CollectionTaskMapper taskMapper; - - public List list(String taskId, String status, LocalDateTime startDate, - LocalDateTime endDate, Integer page, Integer size) { - Map p = new HashMap<>(); - p.put("taskId", taskId); - p.put("status", status); - p.put("startDate", startDate); - p.put("endDate", endDate); - if (page != null && size != null) { - p.put("offset", page * size); - p.put("limit", size); - } - return executionMapper.selectAll(p); - } - - public long count(String taskId, String status, LocalDateTime startDate, LocalDateTime endDate) { - Map p = new HashMap<>(); - p.put("taskId", taskId); - p.put("status", status); - p.put("startDate", startDate); - p.put("endDate", endDate); - return executionMapper.count(p); - } - - // --- Added convenience methods --- - public TaskExecution get(String id) { return executionMapper.selectById(id); } - public TaskExecution getLatestByTaskId(String taskId) { return executionMapper.selectLatestByTaskId(taskId); } - - @Transactional - public void complete(String executionId, boolean success, long successCount, long failedCount, - long dataSizeBytes, String errorMessage, String resultJson) { - LocalDateTime now = LocalDateTime.now(); - TaskExecution exec = executionMapper.selectById(executionId); - if (exec == null) { return; } - int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds(); - executionMapper.completeExecution(executionId, success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name(), - now, duration, successCount, failedCount, dataSizeBytes, errorMessage, resultJson); - CollectionTask task = taskMapper.selectById(exec.getTaskId()); - if (task != null) { - taskMapper.updateStatus(task.getId(), success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name()); - } - } - - @Transactional - public void stop(String executionId) { - TaskExecution exec = executionMapper.selectById(executionId); - if (exec == null || exec.getStatus() != TaskStatus.RUNNING) { return; } - LocalDateTime now = LocalDateTime.now(); - int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds(); - // Reuse completeExecution to persist STOPPED status and timing info - executionMapper.completeExecution(exec.getId(), TaskStatus.STOPPED.name(), now, duration, - exec.getRecordsSuccess(), exec.getRecordsFailed(), exec.getDataSizeBytes(), null, exec.getResult()); - taskMapper.updateStatus(exec.getTaskId(), TaskStatus.STOPPED.name()); - } - - @Transactional - public void stopLatestByTaskId(String taskId) { - TaskExecution latest = executionMapper.selectLatestByTaskId(taskId); - if (latest != null) { stop(latest.getId()); } - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java new file mode 100644 index 00000000..28399a27 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java @@ -0,0 +1,12 @@ +package com.datamate.collection.common.enums; + +/** + * 同步方式:一次性(ONCE) 或 定时(SCHEDULED) + */ +public enum SyncMode { + /** 一次性(ONCE) */ + ONCE, + /// 定时(SCHEDULED) + SCHEDULED; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskStatus.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TaskStatus.java similarity index 59% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskStatus.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TaskStatus.java index c5273d28..ed9585ab 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskStatus.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TaskStatus.java @@ -1,7 +1,8 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.common.enums; /** * 统一的任务和执行状态枚举 + * 任务和执行状态枚举: - DRAFT: 草稿状态 - READY: 就绪状态 - RUNNING: 运行中 - SUCCESS: 执行成功 (对应原来的COMPLETED/SUCCESS) - FAILED: 执行失败 - STOPPED: 已停止 * * @author Data Mate Platform Team */ diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/CollectionTask.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java similarity index 66% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/CollectionTask.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java index f40afa7e..5bbdb2a2 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/CollectionTask.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java @@ -1,32 +1,36 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.domain.model.entity; +import com.baomidou.mybatisplus.annotation.TableName; +import com.datamate.collection.common.enums.SyncMode; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.common.domain.model.base.BaseEntity; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; -import java.time.LocalDateTime; import java.util.Collections; import java.util.Map; -@Data -public class CollectionTask { - private String id; +/** + * 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐) + */ +@Getter +@Setter +@TableName(value = "t_dc_collection_tasks", autoResultMap = true) +public class CollectionTask extends BaseEntity { private String name; private String description; private String config; // DataX JSON 配置,包含源端和目标端配置信息 private TaskStatus status; - private String syncMode; // ONCE / SCHEDULED + private SyncMode syncMode; // ONCE / SCHEDULED private String scheduleExpression; private Integer retryCount; private Integer timeoutSeconds; private Long maxRecords; private String sortField; private String lastExecutionId; - private LocalDateTime createdAt; - private LocalDateTime updatedAt; - private String createdBy; - private String updatedBy; public void addPath() { try { diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/DataxTemplate.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/DataxTemplate.java similarity index 95% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/DataxTemplate.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/DataxTemplate.java index c537a67f..54792fdc 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/DataxTemplate.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/DataxTemplate.java @@ -1,4 +1,4 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.domain.model.entity; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskExecution.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/TaskExecution.java similarity index 70% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskExecution.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/TaskExecution.java index 05f1fe89..6009957e 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskExecution.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/TaskExecution.java @@ -1,13 +1,19 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.domain.model.entity; +import com.baomidou.mybatisplus.annotation.TableName; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.common.domain.model.base.BaseEntity; import lombok.Data; +import lombok.Getter; +import lombok.Setter; import java.time.LocalDateTime; import java.util.UUID; -@Data -public class TaskExecution { - private String id; +@Getter +@Setter +@TableName(value = "t_dc_task_executions", autoResultMap = true) +public class TaskExecution extends BaseEntity { private String taskId; private String taskName; private TaskStatus status; @@ -25,7 +31,6 @@ public class TaskExecution { private String dataxJobId; private String config; private String result; - private LocalDateTime createdAt; public static TaskExecution initTaskExecution() { TaskExecution exec = new TaskExecution(); diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java new file mode 100644 index 00000000..73d904e2 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java @@ -0,0 +1,21 @@ +package com.datamate.collection.domain.process; + +import com.datamate.collection.domain.model.entity.CollectionTask; + +/** + * 归集执行器接口 + * + * @since 2025/10/23 + */ +public interface ProcessRunner { + /** + * 执行归集任务 + * + * @param task 任务 + * @param executionId 执行ID + * @param timeoutSeconds 超时时间(秒) + * @return 执行结果 + * @throws Exception 执行异常 + */ + int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception; +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java new file mode 100644 index 00000000..21b0d169 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java @@ -0,0 +1,19 @@ +package com.datamate.collection.domain.repository; + +import com.baomidou.mybatisplus.extension.repository.IRepository; +import com.datamate.collection.domain.model.entity.CollectionTask; + +import java.util.List; + +/** + * 归集任务仓储层 + * + * @since 2025/10/23 + */ +public interface CollectionTaskRepository extends IRepository { + List selectActiveTasks(); + + void updateStatus(String id, String status); + + void updateLastExecution(String id, String lastExecutionId); +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java new file mode 100644 index 00000000..9b0c059d --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java @@ -0,0 +1,19 @@ +package com.datamate.collection.domain.repository; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.datamate.collection.domain.model.entity.TaskExecution; + +import java.time.LocalDateTime; + +/** + * TaskExecutionRepository + * + * @since 2025/10/23 + */ +public interface TaskExecutionRepository extends IService { + TaskExecution selectLatestByTaskId(String taskId); + + void completeExecution(String executionId, String status, LocalDateTime completedAt, + Integer recordsProcessed, Long recordsTotal, + Long recordsSuccess, Long recordsFailed, String errorMessage); +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java new file mode 100644 index 00000000..29ffcd23 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java @@ -0,0 +1,124 @@ +package com.datamate.collection.infrastructure.datax; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.exec.*; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DataxProcessRunner implements ProcessRunner { + + private final DataxProperties props; + + @Override + public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception { + Path job = buildJobFile(task); + return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); + } + + private int runJob(File jobFile, String executionId, Duration timeout) throws Exception { + File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId)); + String python = props.getPythonPath(); + String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py"; + String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath()); + + log.info("Execute DataX: {}", cmd); + + CommandLine cl = CommandLine.parse(cmd); + DefaultExecutor executor = getExecutor(timeout, logFile); + + return executor.execute(cl); + } + + private static DefaultExecutor getExecutor(Duration timeout, File logFile) throws FileNotFoundException { + DefaultExecutor executor = new DefaultExecutor(); + + // 将日志追加输出到文件 + File parent = logFile.getParentFile(); + if (!parent.exists()) { + parent.mkdirs(); + } + + ExecuteStreamHandler streamHandler = new PumpStreamHandler( + new org.apache.commons.io.output.TeeOutputStream( + new java.io.FileOutputStream(logFile, true), System.out), + new org.apache.commons.io.output.TeeOutputStream( + new java.io.FileOutputStream(logFile, true), System.err) + ); + executor.setStreamHandler(streamHandler); + + ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis()); + executor.setWatchdog(watchdog); + return executor; + } + + private Path buildJobFile(CollectionTask task) throws IOException { + Files.createDirectories(Paths.get(props.getJobConfigPath())); + String fileName = String.format("datax-job-%s.json", task.getId()); + Path path = Paths.get(props.getJobConfigPath(), fileName); + // 简化:直接将任务中的 config 字段作为 DataX 作业 JSON + try (FileWriter fw = new FileWriter(path.toFile())) { + if (StringUtils.isBlank(task.getConfig())) { + throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); + } + String json = getJobConfig(task); + log.info("Job config: {}", json); + fw.write(json); + } + return path; + } + + private String getJobConfig(CollectionTask task) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + Map parameter = objectMapper.readValue( + task.getConfig(), + new TypeReference<>() { + } + ); + Map job = new HashMap<>(); + Map content = new HashMap<>(); + Map reader = new HashMap<>(); + reader.put("name", "nfsreader"); + reader.put("parameter", parameter); + content.put("reader", reader); + Map writer = new HashMap<>(); + writer.put("name", "nfswriter"); + writer.put("parameter", parameter); + content.put("writer", writer); + job.put("content", List.of(content)); + Map setting = new HashMap<>(); + Map channel = new HashMap<>(); + channel.put("channel", 2); + setting.put("speed", channel); + job.put("setting", setting); + Map jobConfig = new HashMap<>(); + jobConfig.put("job", job); + return objectMapper.writeValueAsString(jobConfig); + } catch (Exception e) { + log.error("Failed to parse task config", e); + throw new RuntimeException("Failed to parse task config", e); + } + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProperties.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProperties.java similarity index 90% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProperties.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProperties.java index e194444e..8a3644ce 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProperties.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProperties.java @@ -1,4 +1,4 @@ -package com.datamate.collection.infrastructure.runtime.datax; +package com.datamate.collection.infrastructure.datax; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java index 9359cc99..7c65b475 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java @@ -1,47 +1,15 @@ package com.datamate.collection.infrastructure.persistence.mapper; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.DataxTemplate; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.datamate.collection.domain.model.entity.CollectionTask; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; -import java.util.Map; @Mapper -public interface CollectionTaskMapper { - int insert(CollectionTask entity); - int update(CollectionTask entity); - int deleteById(@Param("id") String id); - CollectionTask selectById(@Param("id") String id); - CollectionTask selectByName(@Param("name") String name); - List selectByStatus(@Param("status") String status); - List selectAll(Map params); +public interface CollectionTaskMapper extends BaseMapper { int updateStatus(@Param("id") String id, @Param("status") String status); int updateLastExecution(@Param("id") String id, @Param("lastExecutionId") String lastExecutionId); List selectActiveTasks(); - - /** - * 查询模板列表 - * - * @param sourceType 源数据源类型(可选) - * @param targetType 目标数据源类型(可选) - * @param offset 偏移量 - * @param limit 限制数量 - * @return 模板列表 - */ - List selectList(@Param("sourceType") String sourceType, - @Param("targetType") String targetType, - @Param("offset") int offset, - @Param("limit") int limit); - - /** - * 统计模板数量 - * - * @param sourceType 源数据源类型(可选) - * @param targetType 目标数据源类型(可选) - * @return 模板总数 - */ - int countTemplates(@Param("sourceType") String sourceType, - @Param("targetType") String targetType); } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java index 645d885e..2c224267 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java @@ -1,38 +1,22 @@ package com.datamate.collection.infrastructure.persistence.mapper; -import com.datamate.collection.domain.model.TaskExecution; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.datamate.collection.domain.model.entity.TaskExecution; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; @Mapper -public interface TaskExecutionMapper { - int insert(TaskExecution entity); - int update(TaskExecution entity); - int deleteById(@Param("id") String id); - TaskExecution selectById(@Param("id") String id); - List selectByTaskId(@Param("taskId") String taskId, @Param("limit") Integer limit); - List selectByStatus(@Param("status") String status); - List selectAll(Map params); - long count(Map params); - int updateProgress(@Param("id") String id, - @Param("status") String status, - @Param("progress") Double progress, - @Param("recordsProcessed") Long recordsProcessed, - @Param("throughput") Double throughput); - int completeExecution(@Param("id") String id, - @Param("status") String status, - @Param("completedAt") LocalDateTime completedAt, - @Param("durationSeconds") Integer durationSeconds, - @Param("recordsSuccess") Long recordsSuccess, - @Param("recordsFailed") Long recordsFailed, - @Param("dataSizeBytes") Long dataSizeBytes, - @Param("errorMessage") String errorMessage, - @Param("result") String result); - List selectRunningExecutions(); +public interface TaskExecutionMapper extends BaseMapper { TaskExecution selectLatestByTaskId(@Param("taskId") String taskId); - int deleteOldExecutions(@Param("beforeDate") LocalDateTime beforeDate); + + void completeExecution(@Param("executionId") String executionId, + @Param("status") String status, + @Param("completedAt") LocalDateTime completedAt, + @Param("recordsProcessed") Integer recordsProcessed, + @Param("recordsTotal") Long recordsTotal, + @Param("recordsSuccess") Long recordsSuccess, + @Param("recordsFailed") Long recordsFailed, + @Param("errorMessage") String errorMessage); } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java new file mode 100644 index 00000000..c17e1853 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java @@ -0,0 +1,36 @@ +package com.datamate.collection.infrastructure.persistence.repository; + +import com.baomidou.mybatisplus.extension.repository.CrudRepository; +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.repository.CollectionTaskRepository; +import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.util.List; + +/** + * CollectionTaskRepositoryImpl + * + * @since 2025/10/23 + */ +@Repository +@RequiredArgsConstructor +public class CollectionTaskRepositoryImpl extends CrudRepository implements CollectionTaskRepository { + private final CollectionTaskMapper collectionTaskMapper; + + @Override + public List selectActiveTasks() { + return collectionTaskMapper.selectActiveTasks(); + } + + @Override + public void updateStatus(String id, String status) { + collectionTaskMapper.updateStatus(id, status); + } + + @Override + public void updateLastExecution(String id, String lastExecutionId) { + collectionTaskMapper.updateLastExecution(id, lastExecutionId); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java new file mode 100644 index 00000000..2263fc98 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java @@ -0,0 +1,37 @@ +package com.datamate.collection.infrastructure.persistence.repository; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.datamate.collection.domain.model.entity.TaskExecution; +import com.datamate.collection.domain.repository.TaskExecutionRepository; +import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.time.LocalDateTime; + +/** + * TaskExecutionRepositoryImpl + * + * @since 2025/10/23 + */ +@Repository +@RequiredArgsConstructor +public class TaskExecutionRepositoryImpl extends ServiceImpl + implements TaskExecutionRepository { + + private final TaskExecutionMapper taskExecutionMapper; + + @Override + public TaskExecution selectLatestByTaskId(String taskId) { + return taskExecutionMapper.selectLatestByTaskId(taskId); + } + + @Override + public void completeExecution(String executionId, String status, LocalDateTime completedAt, + Integer recordsProcessed, Long recordsTotal, + Long recordsSuccess, Long recordsFailed, String errorMessage) { + taskExecutionMapper.completeExecution(executionId, status, completedAt, + recordsProcessed, recordsTotal, + recordsSuccess, recordsFailed, errorMessage); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/typehandler/TaskStatusTypeHandler.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/typehandler/TaskStatusTypeHandler.java deleted file mode 100644 index e69de29b..00000000 diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java deleted file mode 100644 index db57d2ae..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.datamate.collection.infrastructure.runtime.datax; - -import com.datamate.collection.domain.model.CollectionTask; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; - -import java.io.FileWriter; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * 根据任务配置拼装 DataX 作业 JSON 文件 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class DataxJobBuilder { - - private final DataxProperties props; - - public Path buildJobFile(CollectionTask task) throws IOException { - Files.createDirectories(Paths.get(props.getJobConfigPath())); - String fileName = String.format("datax-job-%s.json", task.getId()); - Path path = Paths.get(props.getJobConfigPath(), fileName); - // 简化:直接将任务中的 config 字段作为 DataX 作业 JSON - try (FileWriter fw = new FileWriter(path.toFile())) { - String json = task.getConfig() == null || task.getConfig().isEmpty() ? - defaultJobJson() : task.getConfig(); - if (StringUtils.isNotBlank(task.getConfig())) { - json = getJobConfig(task); - } - log.info("Job config: {}", json); - fw.write(json); - } - return path; - } - - private String getJobConfig(CollectionTask task) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - Map parameter = objectMapper.readValue( - task.getConfig(), - new TypeReference<>() {} - ); - Map job = new HashMap<>(); - Map content = new HashMap<>(); - Map reader = new HashMap<>(); - reader.put("name", "nfsreader"); - reader.put("parameter", parameter); - content.put("reader", reader); - Map writer = new HashMap<>(); - writer.put("name", "nfswriter"); - writer.put("parameter", parameter); - content.put("writer", writer); - job.put("content", List.of(content)); - Map setting = new HashMap<>(); - Map channel = new HashMap<>(); - channel.put("channel", 2); - setting.put("speed", channel); - job.put("setting", setting); - Map jobConfig = new HashMap<>(); - jobConfig.put("job", job); - return objectMapper.writeValueAsString(jobConfig); - } catch (Exception e) { - log.error("Failed to parse task config", e); - throw new RuntimeException("Failed to parse task config", e); - } - } - - private String defaultJobJson() { - // 提供一个最小可运行的空 job,实际会被具体任务覆盖 - return "{\n \"job\": {\n \"setting\": {\n \"speed\": {\n \"channel\": 1\n }\n },\n \"content\": []\n }\n}"; - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java deleted file mode 100644 index fda00a83..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.datamate.collection.infrastructure.runtime.datax; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.exec.*; -import org.springframework.stereotype.Component; - -import java.io.File; -import java.time.Duration; - -@Slf4j -@Component -@RequiredArgsConstructor -public class DataxProcessRunner { - - private final DataxProperties props; - - public int runJob(File jobFile, String executionId, Duration timeout) throws Exception { - File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId)); - String python = props.getPythonPath(); - String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py"; - String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath()); - - log.info("Execute DataX: {}", cmd); - - CommandLine cl = CommandLine.parse(cmd); - DefaultExecutor executor = new DefaultExecutor(); - - // 将日志追加输出到文件 - File parent = logFile.getParentFile(); - if (!parent.exists()) parent.mkdirs(); - - ExecuteStreamHandler streamHandler = new PumpStreamHandler( - new org.apache.commons.io.output.TeeOutputStream( - new java.io.FileOutputStream(logFile, true), System.out), - new org.apache.commons.io.output.TeeOutputStream( - new java.io.FileOutputStream(logFile, true), System.err) - ); - executor.setStreamHandler(streamHandler); - - ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis()); - executor.setWatchdog(watchdog); - - return executor.execute(cl); - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java index 1cbdde3c..2b453839 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java @@ -1,16 +1,18 @@ package com.datamate.collection.interfaces.converter; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.DataxTemplate; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.interfaces.dto.*; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.common.interfaces.PagedResponse; import com.fasterxml.jackson.databind.ObjectMapper; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; import org.mapstruct.factory.Mappers; +import java.util.List; import java.util.Map; @Mapper @@ -20,9 +22,7 @@ public interface CollectionTaskConverter { @Mapping(source = "config", target = "config", qualifiedByName = "parseJsonToMap") CollectionTaskResponse toResponse(CollectionTask task); - CollectionTaskSummary toSummary(CollectionTask task); - - DataxTemplateSummary toTemplateSummary(DataxTemplate template); + List toResponse(List tasks); @Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString") CollectionTask toCollectionTask(CreateCollectionTaskRequest request); @@ -30,11 +30,19 @@ public interface CollectionTaskConverter { @Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString") CollectionTask toCollectionTask(UpdateCollectionTaskRequest request); + @Mapping(source = "current", target = "page") + @Mapping(source = "size", target = "size") + @Mapping(source = "total", target = "totalElements") + @Mapping(source = "pages", target = "totalPages") + @Mapping(source = "records", target = "content") + PagedResponse toResponse(IPage tasks); + @Named("parseJsonToMap") default Map parseJsonToMap(String json) { try { ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(json, Map.class); + return + objectMapper.readValue(json, Map.class); } catch (Exception e) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java new file mode 100644 index 00000000..024d96d8 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java @@ -0,0 +1,25 @@ +package com.datamate.collection.interfaces.dto; + +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.common.interfaces.PagingQuery; +import lombok.Getter; +import lombok.Setter; + +/** + * 归集任务分页查询参数 + * + * @since 2025/10/23 + */ +@Getter +@Setter +public class CollectionTaskPagingQuery extends PagingQuery { + /** + * 任务状态 + */ + private TaskStatus status; + + /** + * 任务名称 + */ + private String name; +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java new file mode 100644 index 00000000..08f61034 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java @@ -0,0 +1,48 @@ +package com.datamate.collection.interfaces.dto; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.common.enums.SyncMode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.springframework.format.annotation.DateTimeFormat; +import jakarta.validation.Valid; + +/** + * CollectionTaskResponse + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class CollectionTaskResponse { + + private String id; + + private String name; + + private String description; + + @Valid + private Map config = new HashMap<>(); + + private TaskStatus status; + + private SyncMode syncMode; + + private String scheduleExpression; + + private String lastExecutionId; + + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + private LocalDateTime createdAt; + + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + private LocalDateTime updatedAt; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java new file mode 100644 index 00000000..f2f029f0 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java @@ -0,0 +1,53 @@ +package com.datamate.collection.interfaces.dto; + +import com.datamate.collection.common.enums.SyncMode; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.*; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * CreateCollectionTaskRequest + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class CreateCollectionTaskRequest { + @NotNull + @Size(min = 1, max = 100) + @Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("name") + private String name; + + @Size(max = 500) + @Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("description") + private String description; + + @Valid + @NotNull + @Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("config") + private Map config = new HashMap<>(); + + @NotNull + @Valid + @Schema(name = "syncMode", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("syncMode") + private SyncMode syncMode; + + @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("scheduleExpression") + private String scheduleExpression; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java new file mode 100644 index 00000000..9321be80 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java @@ -0,0 +1,50 @@ +package com.datamate.collection.interfaces.dto; + +import com.datamate.collection.common.enums.SyncMode; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.*; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * UpdateCollectionTaskRequest + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class UpdateCollectionTaskRequest { + @Size(min = 1, max = 100) + @Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("name") + private String name; + + @Size(max = 500) + @Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("description") + private String description; + + @Valid + @Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("config") + private Map config = new HashMap<>(); + + @Valid + @Schema(name = "syncMode", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("syncMode") + private SyncMode syncMode; + + @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("scheduleExpression") + private String scheduleExpression; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java index d9aa3535..b893a73c 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java @@ -1,38 +1,36 @@ package com.datamate.collection.interfaces.rest; -import com.datamate.collection.application.service.CollectionTaskService; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.DataxTemplate; -import com.datamate.collection.interfaces.api.CollectionTaskApi; +import com.datamate.collection.application.CollectionTaskService; +import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.interfaces.converter.CollectionTaskConverter; import com.datamate.collection.interfaces.dto.*; +import com.datamate.common.interfaces.PagedResponse; +import jakarta.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.*; -import java.util.stream.Collectors; @Slf4j @RestController +@RequestMapping("/data-collection/tasks") @RequiredArgsConstructor -@Validated -public class CollectionTaskController implements CollectionTaskApi { +public class CollectionTaskController{ private final CollectionTaskService taskService; - @Override - public ResponseEntity createTask(CreateCollectionTaskRequest request) { + @PostMapping + public ResponseEntity createTask(@Valid @RequestBody CreateCollectionTaskRequest request) { CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); task.setId(UUID.randomUUID().toString()); task.addPath(); return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task))); } - @Override - public ResponseEntity updateTask(String id, UpdateCollectionTaskRequest request) { + @PutMapping("/{id}") + public ResponseEntity updateTask(@PathVariable("id") String id, @Valid @RequestBody UpdateCollectionTaskRequest request) { if (taskService.get(id) == null) { return ResponseEntity.notFound().build(); } @@ -41,43 +39,20 @@ public ResponseEntity updateTask(String id, UpdateCollec return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task))); } - @Override - public ResponseEntity deleteTask(String id) { + @DeleteMapping("/{id}") + public ResponseEntity deleteTask(@PathVariable("id") String id) { taskService.delete(id); return ResponseEntity.ok().build(); } - @Override - public ResponseEntity getTaskDetail(String id) { + @GetMapping("/{id}") + public ResponseEntity getTaskDetail(@PathVariable("id") String id) { CollectionTask task = taskService.get(id); return task == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(task)); } - @Override - public ResponseEntity getTasks(Integer page, Integer size, TaskStatus status, String name) { - var list = taskService.list(page, size, status == null ? null : status.getValue(), name); - PagedCollectionTaskSummary response = new PagedCollectionTaskSummary(); - response.setContent(list.stream().map(CollectionTaskConverter.INSTANCE::toSummary).collect(Collectors.toList())); - response.setNumber(page); - response.setSize(size); - response.setTotalElements(list.size()); // 简化处理,实际项目中应该有单独的count查询 - response.setTotalPages(size == null || size == 0 ? 1 : (int) Math.ceil(list.size() * 1.0 / size)); - return ResponseEntity.ok(response); - } - - @Override - public ResponseEntity templatesGet(String sourceType, String targetType, - Integer page, Integer size) { - int pageNum = page != null ? page : 0; - int pageSize = size != null ? size : 20; - List templates = taskService.listTemplates(sourceType, targetType, pageNum, pageSize); - int totalElements = taskService.countTemplates(sourceType, targetType); - PagedDataxTemplates response = new PagedDataxTemplates(); - response.setContent(templates.stream().map(CollectionTaskConverter.INSTANCE::toTemplateSummary).collect(Collectors.toList())); - response.setNumber(pageNum); - response.setSize(pageSize); - response.setTotalElements(totalElements); - response.setTotalPages(pageSize > 0 ? (int) Math.ceil(totalElements * 1.0 / pageSize) : 1); - return ResponseEntity.ok(response); + @GetMapping + public ResponseEntity> getTasks(@Valid CollectionTaskPagingQuery query) { + return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query))); } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java deleted file mode 100644 index 6d392bbf..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.datamate.collection.interfaces.rest; - -import com.datamate.collection.application.service.CollectionTaskService; -import com.datamate.collection.application.service.TaskExecutionService; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.interfaces.api.TaskExecutionApi; -import com.datamate.collection.interfaces.dto.PagedTaskExecutions; -import com.datamate.collection.interfaces.dto.TaskExecutionDetail; -import com.datamate.collection.interfaces.dto.TaskExecutionResponse; -import com.datamate.collection.interfaces.dto.TaskStatus; // DTO enum -import lombok.RequiredArgsConstructor; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.RestController; - -import java.util.stream.Collectors; - -@RestController -@RequiredArgsConstructor -@Validated -public class TaskExecutionController implements TaskExecutionApi { - - private final TaskExecutionService executionService; - private final CollectionTaskService taskService; - - private TaskExecutionDetail toDetail(TaskExecution e) { - TaskExecutionDetail d = new TaskExecutionDetail(); - d.setId(e.getId()); - d.setTaskId(e.getTaskId()); - d.setTaskName(e.getTaskName()); - if (e.getStatus() != null) { d.setStatus(TaskStatus.fromValue(e.getStatus().name())); } - d.setProgress(e.getProgress()); - d.setRecordsTotal(e.getRecordsTotal() != null ? e.getRecordsTotal().intValue() : null); - d.setRecordsProcessed(e.getRecordsProcessed() != null ? e.getRecordsProcessed().intValue() : null); - d.setRecordsSuccess(e.getRecordsSuccess() != null ? e.getRecordsSuccess().intValue() : null); - d.setRecordsFailed(e.getRecordsFailed() != null ? e.getRecordsFailed().intValue() : null); - d.setThroughput(e.getThroughput()); - d.setDataSizeBytes(e.getDataSizeBytes() != null ? e.getDataSizeBytes().intValue() : null); - d.setStartedAt(e.getStartedAt()); - d.setCompletedAt(e.getCompletedAt()); - d.setDurationSeconds(e.getDurationSeconds()); - d.setErrorMessage(e.getErrorMessage()); - return d; - } - - // GET /executions/{id} - @Override - public ResponseEntity executionsIdGet(String id) { - var exec = executionService.get(id); - return exec == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(toDetail(exec)); - } - - // DELETE /executions/{id} - @Override - public ResponseEntity executionsIdDelete(String id) { - executionService.stop(id); // 幂等处理,在service内部判断状态 - return ResponseEntity.noContent().build(); - } - - // POST /tasks/{id}/execute -> 201 - @Override - public ResponseEntity tasksIdExecutePost(String id) { - var task = taskService.get(id); - if (task == null) { return ResponseEntity.notFound().build(); } - var latestExec = executionService.getLatestByTaskId(id); - if (latestExec != null && latestExec.getStatus() == com.datamate.collection.domain.model.TaskStatus.RUNNING) { - TaskExecutionResponse r = new TaskExecutionResponse(); - r.setId(latestExec.getId()); - r.setTaskId(latestExec.getTaskId()); - r.setTaskName(latestExec.getTaskName()); - r.setStatus(TaskStatus.fromValue(latestExec.getStatus().name())); - r.setStartedAt(latestExec.getStartedAt()); - return ResponseEntity.status(HttpStatus.CREATED).body(r); // 返回已有运行实例 - } - var exec = taskService.startExecution(task); - TaskExecutionResponse r = new TaskExecutionResponse(); - r.setId(exec.getId()); - r.setTaskId(exec.getTaskId()); - r.setTaskName(exec.getTaskName()); - r.setStatus(TaskStatus.fromValue(exec.getStatus().name())); - r.setStartedAt(exec.getStartedAt()); - return ResponseEntity.status(HttpStatus.CREATED).body(r); - } - - // GET /tasks/{id}/executions -> 分页 - @Override - public ResponseEntity tasksIdExecutionsGet(String id, Integer page, Integer size) { - if (page == null || page < 0) { page = 0; } - if (size == null || size <= 0) { size = 20; } - var list = executionService.list(id, null, null, null, page, size); - long total = executionService.count(id, null, null, null); - PagedTaskExecutions p = new PagedTaskExecutions(); - p.setContent(list.stream().map(this::toDetail).collect(Collectors.toList())); - p.setNumber(page); - p.setSize(size); - p.setTotalElements((int) total); - p.setTotalPages(size == 0 ? 1 : (int) Math.ceil(total * 1.0 / size)); - return ResponseEntity.ok(p); - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/scheduler/TaskSchedulerInitializer.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java similarity index 68% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/application/scheduler/TaskSchedulerInitializer.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java index 0203cab3..d0d0f260 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/scheduler/TaskSchedulerInitializer.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java @@ -1,11 +1,10 @@ -package com.datamate.collection.application.scheduler; +package com.datamate.collection.interfaces.scheduler; -import com.datamate.collection.application.service.DataxExecutionService; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; +import com.datamate.collection.application.CollectionTaskService; +import com.datamate.collection.application.TaskExecutionService; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.model.entity.TaskExecution; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -21,14 +20,13 @@ @RequiredArgsConstructor public class TaskSchedulerInitializer { - private final CollectionTaskMapper taskMapper; - private final TaskExecutionMapper executionMapper; - private final DataxExecutionService dataxExecutionService; + private final CollectionTaskService collectionTaskService; + private final TaskExecutionService taskExecutionService; // 定期扫描激活的采集任务,根据 Cron 判断是否到期执行 @Scheduled(fixedDelayString = "${datamate.data-collection.scheduler.scan-interval-ms:10000}") public void scanAndTrigger() { - List tasks = taskMapper.selectActiveTasks(); + List tasks = collectionTaskService.selectActiveTasks(); if (tasks == null || tasks.isEmpty()) { return; } @@ -40,7 +38,7 @@ public void scanAndTrigger() { } try { // 如果最近一次执行仍在运行,则跳过 - TaskExecution latest = executionMapper.selectLatestByTaskId(task.getId()); + TaskExecution latest = taskExecutionService.selectLatestByTaskId(task.getId()); if (latest != null && latest.getStatus() == TaskStatus.RUNNING) { continue; } @@ -53,9 +51,9 @@ public void scanAndTrigger() { if (nextTime != null && !nextTime.isAfter(now)) { // 到期,触发一次执行 - TaskExecution exec = dataxExecutionService.createExecution(task); + TaskExecution exec = taskExecutionService.createExecution(task); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - dataxExecutionService.runAsync(task, exec.getId(), timeout); + taskExecutionService.runAsync(task, exec.getId(), timeout); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId()); } } catch (Exception ex) { diff --git a/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml b/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml index 3a195a28..6d3033bb 100644 --- a/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml +++ b/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml @@ -1,11 +1,10 @@ - - + @@ -24,21 +23,6 @@ - - - - - - - - - - - - - - - id, @@ -47,96 +31,6 @@ last_execution_id, created_at, updated_at, created_by, updated_by - - - id, name, source_type, target_type, template_content, description, version, is_system, created_at, updated_at, created_by - - - - - INSERT INTO t_dc_collection_tasks (id, name, description, config, status, sync_mode, - schedule_expression, retry_count, timeout_seconds, max_records, sort_field, - last_execution_id, created_at, updated_at, created_by, updated_by) - VALUES (#{id}, #{name}, #{description}, #{config}, #{status}, #{syncMode}, - #{scheduleExpression}, #{retryCount}, #{timeoutSeconds}, #{maxRecords}, #{sortField}, - #{lastExecutionId}, #{createdAt}, #{updatedAt}, #{createdBy}, #{updatedBy}) - - - - - UPDATE t_dc_collection_tasks - SET name = #{name}, - description = #{description}, - config = #{config}, - status = #{status}, - sync_mode = #{syncMode}, - schedule_expression = #{scheduleExpression}, - retry_count = #{retryCount}, - timeout_seconds = #{timeoutSeconds}, - max_records = #{maxRecords}, - sort_field = #{sortField}, - last_execution_id = #{lastExecutionId}, - updated_at = #{updatedAt}, - updated_by = #{updatedBy} - WHERE id = #{id} - - - - - DELETE FROM t_dc_collection_tasks WHERE id = #{id} - - - - - - - - - - - - - - - - - UPDATE t_dc_collection_tasks SET status = #{status}, updated_at = NOW() WHERE id = #{id} @@ -154,35 +48,4 @@ AND schedule_expression IS NOT NULL ORDER BY created_at DESC - - - - - - - diff --git a/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml b/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml index 6b6d0a03..e9e9c866 100644 --- a/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml +++ b/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml @@ -1,191 +1,28 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - id, task_id, task_name, status, progress, records_total, records_processed, - records_success, records_failed, throughput, data_size_bytes, started_at, - completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at - - - - - INSERT INTO t_dc_task_executions ( - id, task_id, task_name, status, progress, records_total, records_processed, - records_success, records_failed, throughput, data_size_bytes, started_at, - completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at - ) VALUES ( - #{id}, #{taskId}, #{taskName}, #{status}, #{progress}, #{recordsTotal}, #{recordsProcessed}, - #{recordsSuccess}, #{recordsFailed}, #{throughput}, #{dataSizeBytes}, #{startedAt}, - #{completedAt}, #{durationSeconds}, #{errorMessage}, #{dataxJobId}, #{config}, #{result}, #{createdAt} - ) - - - - - UPDATE t_dc_task_executions - SET status = #{status}, - progress = #{progress}, - records_total = #{recordsTotal}, - records_processed = #{recordsProcessed}, - records_success = #{recordsSuccess}, - records_failed = #{recordsFailed}, - throughput = #{throughput}, - data_size_bytes = #{dataSizeBytes}, - completed_at = #{completedAt}, - duration_seconds = #{durationSeconds}, - error_message = #{errorMessage}, - result = #{result} - WHERE id = #{id} - - - - - DELETE FROM t_dc_task_executions WHERE id = #{id} - - - - - - - + SELECT * FROM t_dc_task_executions WHERE task_id = #{taskId} ORDER BY started_at DESC - - LIMIT #{limit} - - - - - - - - - - - - - - UPDATE t_dc_task_executions - SET status = #{status}, - progress = #{progress}, - records_processed = #{recordsProcessed}, - throughput = #{throughput} - WHERE id = #{id} - - UPDATE t_dc_task_executions SET status = #{status}, - progress = 100.00, completed_at = #{completedAt}, - duration_seconds = #{durationSeconds}, + records_processed = #{recordsProcessed}, + records_total = #{recordsTotal}, records_success = #{recordsSuccess}, records_failed = #{recordsFailed}, - data_size_bytes = #{dataSizeBytes}, error_message = #{errorMessage}, - result = #{result} - WHERE id = #{id} + updated_at = NOW() + WHERE id = #{executionId} - - - - - - - - - DELETE FROM t_dc_task_executions - WHERE started_at < #{beforeDate} - - diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index fa6b07a0..b14db920 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -2,6 +2,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.datamate.common.domain.utils.ChunksSaver; import com.datamate.datamanagement.interfaces.dto.*; import com.datamate.common.infrastructure.exception.BusinessAssert; import com.datamate.common.interfaces.PagedResponse; @@ -100,8 +101,13 @@ public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatase /** * 删除数据集 */ + @Transactional public void deleteDataset(String datasetId) { + Dataset dataset = datasetRepository.getById(datasetId); datasetRepository.removeById(datasetId); + if (dataset != null) { + ChunksSaver.deleteFolder(dataset.getPath()); + } } /** diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java index 6b0429c9..f67121c9 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java @@ -24,7 +24,6 @@ List findByCriteria(@Param("datasetId") String datasetId, @Param("status") String status, RowBounds rowBounds); - int insert(DatasetFile file); int update(DatasetFile file); int deleteById(@Param("id") String id); } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java index 8aa059e8..1c8250b1 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java @@ -60,7 +60,7 @@ public FileUploadResult chunkUpload(ChunkUploadRequest uploadFileRequest) { boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum()); if (isFinish) { // 删除存分片的临时路径 - ChunksSaver.deleteFiles(new File(preRequest.getUploadPath(), + ChunksSaver.deleteFolder(new File(preRequest.getUploadPath(), String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId())).getPath()); chunkUploadRequestMapper.deleteById(preRequest.getId()); } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java index 539cbb64..079b2cdd 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java @@ -2,6 +2,8 @@ import com.datamate.common.domain.model.ChunkUploadPreRequest; import com.datamate.common.domain.model.ChunkUploadRequest; +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.springframework.web.multipart.MultipartFile; @@ -106,29 +108,17 @@ public static void appendToTargetFile(File targetFile, InputStream inputStream) * * @param uploadPath 文件路径 */ - public static void deleteFiles(String uploadPath) { - File dic = new File(uploadPath); - if (!dic.exists()) { - return; - } - File[] files = dic.listFiles(); - if (files == null || files.length == 0) { - dic.delete(); + public static void deleteFolder(String uploadPath) { + File folder = new File(uploadPath); + + if (!folder.exists()) { + log.info("folder {} does not exist", uploadPath); return; } try { - for (File file : files) { - if (file.isDirectory()) { - deleteFiles(file.getPath()); - } else { - file.delete(); - } - } - if (dic.exists()) { - dic.delete(); - } - } catch (SecurityException e) { - log.warn("Fail to delete file", e); + FileUtils.deleteDirectory(folder); + } catch (IOException e) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); } } } diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql index dbd923f9..62a611a5 100644 --- a/scripts/db/data-collection-init.sql +++ b/scripts/db/data-collection-init.sql @@ -31,7 +31,10 @@ CREATE TABLE t_dc_task_executions ( error_message TEXT COMMENT '错误信息', datax_job_id TEXT COMMENT 'datax任务ID', result TEXT COMMENT '执行结果', - created_at TIMESTAMP NULL COMMENT '创建时间', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + created_by VARCHAR(255) COMMENT '创建者', + updated_by VARCHAR(255) COMMENT '更新者', INDEX idx_task_id (task_id), INDEX idx_status (status), INDEX idx_started_at (started_at)