From 12dc3a693a460f01e25c308dd70264594910cc12 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Thu, 23 Oct 2025 20:39:57 +0800 Subject: [PATCH 1/9] =?UTF-8?q?refactor:=20=E4=BF=AE=E6=94=B9=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E6=95=B0=E6=8D=AE=E5=BD=92=E9=9B=86=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=EF=BC=8C=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=EF=BC=8C=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../services/data-collection-service/pom.xml | 29 --- .../application/CollectionTaskService.java | 79 ++++++++ .../application/TaskExecutionService.java | 57 ++++++ .../service/CollectionTaskService.java | 85 --------- .../service/DataxExecutionService.java | 60 ------ .../service/TaskExecutionService.java | 83 -------- .../collection/common/enums/SyncMode.java | 12 ++ .../model => common/enums}/TaskStatus.java | 3 +- .../model/{ => entity}/CollectionTask.java | 26 +-- .../model/{ => entity}/DataxTemplate.java | 2 +- .../model/{ => entity}/TaskExecution.java | 15 +- .../domain/process/ProcessRunner.java | 21 ++ .../repository/CollectionTaskRepository.java | 19 ++ .../repository/TaskExecutionRepository.java | 19 ++ .../datax/DataxProcessRunner.java | 124 ++++++++++++ .../{runtime => }/datax/DataxProperties.java | 2 +- .../mapper/CollectionTaskMapper.java | 38 +--- .../mapper/TaskExecutionMapper.java | 40 ++-- .../CollectionTaskRepositoryImpl.java | 36 ++++ .../TaskExecutionRepositoryImpl.java | 37 ++++ .../typehandler/TaskStatusTypeHandler.java | 0 .../runtime/datax/DataxJobBuilder.java | 83 -------- .../runtime/datax/DataxProcessRunner.java | 46 ----- .../converter/CollectionTaskConverter.java | 20 +- .../dto/CollectionTaskPagingQuery.java | 25 +++ .../dto/CollectionTaskResponse.java | 48 +++++ .../dto/CreateCollectionTaskRequest.java | 53 ++++++ .../dto/UpdateCollectionTaskRequest.java | 50 +++++ .../rest/CollectionTaskController.java | 61 ++---- .../rest/TaskExecutionController.java | 101 ---------- .../scheduler/TaskSchedulerInitializer.java | 26 ++- .../mappers/CollectionTaskMapper.xml | 139 +------------- .../resources/mappers/TaskExecutionMapper.xml | 179 +----------------- .../DatasetApplicationService.java | 6 + .../persistence/mapper/DatasetFileMapper.java | 1 - .../common/domain/service/FileService.java | 2 +- .../common/domain/utils/ChunksSaver.java | 30 +-- scripts/db/data-collection-init.sql | 5 +- 38 files changed, 698 insertions(+), 964 deletions(-) create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java rename backend/services/data-collection-service/src/main/java/com/datamate/collection/{domain/model => common/enums}/TaskStatus.java (59%) rename backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/{ => entity}/CollectionTask.java (66%) rename backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/{ => entity}/DataxTemplate.java (95%) rename backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/{ => entity}/TaskExecution.java (70%) create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java rename backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/{runtime => }/datax/DataxProperties.java (90%) create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/typehandler/TaskStatusTypeHandler.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java delete mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java rename backend/services/data-collection-service/src/main/java/com/datamate/collection/{application => interfaces}/scheduler/TaskSchedulerInitializer.java (68%) diff --git a/backend/services/data-collection-service/pom.xml b/backend/services/data-collection-service/pom.xml index 1ea9d3c4..6600bf11 100644 --- a/backend/services/data-collection-service/pom.xml +++ b/backend/services/data-collection-service/pom.xml @@ -127,35 +127,6 @@ - - - org.openapitools - openapi-generator-maven-plugin - 6.6.0 - - - - generate - - - ${project.basedir}/../../openapi/specs/data-collection.yaml - spring - ${project.build.directory}/generated-sources/openapi - com.datamate.collection.interfaces.api - com.datamate.collection.interfaces.dto - - true - true - true - springdoc - java8-localdatetime - true - - - - - - org.springframework.boot spring-boot-maven-plugin diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java new file mode 100644 index 00000000..80bb8e75 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -0,0 +1,79 @@ +package com.datamate.collection.application; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.model.entity.TaskExecution; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.domain.repository.CollectionTaskRepository; +import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery; +import com.datamate.collection.common.enums.SyncMode; +import com.datamate.common.domain.utils.ChunksSaver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Objects; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CollectionTaskService { + private final TaskExecutionService taskExecutionService; + private final CollectionTaskRepository collectionTaskRepository; + + @Transactional + public CollectionTask create(CollectionTask task) { + task.setStatus(TaskStatus.READY); + task.setCreatedAt(LocalDateTime.now()); + task.setUpdatedAt(LocalDateTime.now()); + collectionTaskRepository.save(task); + executeTaskNow(task); + return task; + } + + private void executeTaskNow(CollectionTask task) { + if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) { + TaskExecution exec = taskExecutionService.createExecution(task); + int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); + taskExecutionService.runAsync(task, exec.getId(), timeout); + log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId()); + } + } + + @Transactional + public CollectionTask update(CollectionTask task) { + task.setUpdatedAt(LocalDateTime.now()); + collectionTaskRepository.updateById(task); + return task; + } + + @Transactional + public void delete(String id) { + CollectionTask task = collectionTaskRepository.getById(id); + if (task != null) { + ChunksSaver.deleteFolder("/dataset/local/" + task.getId()); + } + collectionTaskRepository.removeById(id); + } + + public CollectionTask get(String id) { + return collectionTaskRepository.getById(id); + } + + public IPage getTasks(CollectionTaskPagingQuery query) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus()) + .like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName()); + return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper); + } + + public List selectActiveTasks() { + return collectionTaskRepository.selectActiveTasks(); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java new file mode 100644 index 00000000..58455154 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -0,0 +1,57 @@ +package com.datamate.collection.application; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.model.entity.TaskExecution; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.collection.domain.repository.CollectionTaskRepository; +import com.datamate.collection.domain.repository.TaskExecutionRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TaskExecutionService { + private final ProcessRunner processRunner; + private final TaskExecutionRepository executionRepository; + private final CollectionTaskRepository collectionTaskRepository; + + + @Transactional + public TaskExecution createExecution(CollectionTask task) { + TaskExecution exec = TaskExecution.initTaskExecution(); + exec.setTaskId(task.getId()); + exec.setTaskName(task.getName()); + executionRepository.save(exec); + collectionTaskRepository.updateLastExecution(task.getId(), exec.getId()); + collectionTaskRepository.updateStatus(task.getId(), TaskStatus.RUNNING.name()); + return exec; + } + + public TaskExecution selectLatestByTaskId(String taskId) { + return executionRepository.selectLatestByTaskId(taskId); + } + + @Async + public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { + try { + int code = processRunner.runJob(task, executionId, timeoutSeconds); + log.info("DataX finished with code {} for execution {}", code, executionId); + // 简化:成功即完成 + executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), + 0, 0L, 0L, 0L, null); + collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); + } catch (Exception e) { + log.error("DataX execution failed", e); + executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), + 0, 0L, 0L, 0L, e.getMessage()); + collectionTaskRepository.updateStatus(task.getId(), TaskStatus.FAILED.name()); + } + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java deleted file mode 100644 index d2b6e21a..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/CollectionTaskService.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.datamate.collection.application.service; - -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.domain.model.DataxTemplate; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; -import com.datamate.collection.interfaces.dto.SyncMode; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -@Slf4j -@Service -@RequiredArgsConstructor -public class CollectionTaskService { - private final CollectionTaskMapper taskMapper; - private final TaskExecutionMapper executionMapper; - private final DataxExecutionService dataxExecutionService; - - @Transactional - public CollectionTask create(CollectionTask task) { - task.setStatus(TaskStatus.READY); - task.setCreatedAt(LocalDateTime.now()); - task.setUpdatedAt(LocalDateTime.now()); - taskMapper.insert(task); - executeTaskNow(task); - return task; - } - - private void executeTaskNow(CollectionTask task) { - if (Objects.equals(task.getSyncMode(), SyncMode.ONCE.getValue())) { - TaskExecution exec = dataxExecutionService.createExecution(task); - int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - dataxExecutionService.runAsync(task, exec.getId(), timeout); - log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId()); - } - } - - @Transactional - public CollectionTask update(CollectionTask task) { - task.setUpdatedAt(LocalDateTime.now()); - taskMapper.update(task); - return task; - } - - @Transactional - public void delete(String id) { taskMapper.deleteById(id); } - - public CollectionTask get(String id) { return taskMapper.selectById(id); } - - public List list(Integer page, Integer size, String status, String name) { - Map p = new HashMap<>(); - p.put("status", status); - p.put("name", name); - if (page != null && size != null) { - p.put("offset", page * size); - p.put("limit", size); - } - return taskMapper.selectAll(p); - } - - @Transactional - public TaskExecution startExecution(CollectionTask task) { - return dataxExecutionService.createExecution(task); - } - - // ---- Template related merged methods ---- - public List listTemplates(String sourceType, String targetType, int page, int size) { - int offset = page * size; - return taskMapper.selectList(sourceType, targetType, offset, size); - } - - public int countTemplates(String sourceType, String targetType) { - return taskMapper.countTemplates(sourceType, targetType); - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java deleted file mode 100644 index fc2eb9e4..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/DataxExecutionService.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.datamate.collection.application.service; - -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; -import com.datamate.collection.infrastructure.runtime.datax.DataxJobBuilder; -import com.datamate.collection.infrastructure.runtime.datax.DataxProcessRunner; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.nio.file.Path; -import java.time.Duration; -import java.time.LocalDateTime; - -@Slf4j -@Service -@RequiredArgsConstructor -public class DataxExecutionService { - - private final DataxJobBuilder jobBuilder; - private final DataxProcessRunner processRunner; - private final TaskExecutionMapper executionMapper; - private final CollectionTaskMapper taskMapper; - - - @Transactional - public TaskExecution createExecution(CollectionTask task) { - TaskExecution exec = TaskExecution.initTaskExecution(); - exec.setTaskId(task.getId()); - exec.setTaskName(task.getName()); - executionMapper.insert(exec); - taskMapper.updateLastExecution(task.getId(), exec.getId()); - taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name()); - return exec; - } - - @Async - public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { - try { - Path job = jobBuilder.buildJobFile(task); - - int code = processRunner.runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); - log.info("DataX finished with code {} for execution {}", code, executionId); - // 简化:成功即完成 - executionMapper.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), - 0, 0L, 0L, 0L, null, null); - taskMapper.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); - } catch (Exception e) { - log.error("DataX execution failed", e); - executionMapper.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), - 0, 0L, 0L, 0L, e.getMessage(), null); - taskMapper.updateStatus(task.getId(), TaskStatus.FAILED.name()); - } - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java deleted file mode 100644 index 68f9e3b6..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/service/TaskExecutionService.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.datamate.collection.application.service; - -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Service -@RequiredArgsConstructor -public class TaskExecutionService { - private final TaskExecutionMapper executionMapper; - private final CollectionTaskMapper taskMapper; - - public List list(String taskId, String status, LocalDateTime startDate, - LocalDateTime endDate, Integer page, Integer size) { - Map p = new HashMap<>(); - p.put("taskId", taskId); - p.put("status", status); - p.put("startDate", startDate); - p.put("endDate", endDate); - if (page != null && size != null) { - p.put("offset", page * size); - p.put("limit", size); - } - return executionMapper.selectAll(p); - } - - public long count(String taskId, String status, LocalDateTime startDate, LocalDateTime endDate) { - Map p = new HashMap<>(); - p.put("taskId", taskId); - p.put("status", status); - p.put("startDate", startDate); - p.put("endDate", endDate); - return executionMapper.count(p); - } - - // --- Added convenience methods --- - public TaskExecution get(String id) { return executionMapper.selectById(id); } - public TaskExecution getLatestByTaskId(String taskId) { return executionMapper.selectLatestByTaskId(taskId); } - - @Transactional - public void complete(String executionId, boolean success, long successCount, long failedCount, - long dataSizeBytes, String errorMessage, String resultJson) { - LocalDateTime now = LocalDateTime.now(); - TaskExecution exec = executionMapper.selectById(executionId); - if (exec == null) { return; } - int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds(); - executionMapper.completeExecution(executionId, success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name(), - now, duration, successCount, failedCount, dataSizeBytes, errorMessage, resultJson); - CollectionTask task = taskMapper.selectById(exec.getTaskId()); - if (task != null) { - taskMapper.updateStatus(task.getId(), success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name()); - } - } - - @Transactional - public void stop(String executionId) { - TaskExecution exec = executionMapper.selectById(executionId); - if (exec == null || exec.getStatus() != TaskStatus.RUNNING) { return; } - LocalDateTime now = LocalDateTime.now(); - int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds(); - // Reuse completeExecution to persist STOPPED status and timing info - executionMapper.completeExecution(exec.getId(), TaskStatus.STOPPED.name(), now, duration, - exec.getRecordsSuccess(), exec.getRecordsFailed(), exec.getDataSizeBytes(), null, exec.getResult()); - taskMapper.updateStatus(exec.getTaskId(), TaskStatus.STOPPED.name()); - } - - @Transactional - public void stopLatestByTaskId(String taskId) { - TaskExecution latest = executionMapper.selectLatestByTaskId(taskId); - if (latest != null) { stop(latest.getId()); } - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java new file mode 100644 index 00000000..28399a27 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java @@ -0,0 +1,12 @@ +package com.datamate.collection.common.enums; + +/** + * 同步方式:一次性(ONCE) 或 定时(SCHEDULED) + */ +public enum SyncMode { + /** 一次性(ONCE) */ + ONCE, + /// 定时(SCHEDULED) + SCHEDULED; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskStatus.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TaskStatus.java similarity index 59% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskStatus.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TaskStatus.java index c5273d28..ed9585ab 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskStatus.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TaskStatus.java @@ -1,7 +1,8 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.common.enums; /** * 统一的任务和执行状态枚举 + * 任务和执行状态枚举: - DRAFT: 草稿状态 - READY: 就绪状态 - RUNNING: 运行中 - SUCCESS: 执行成功 (对应原来的COMPLETED/SUCCESS) - FAILED: 执行失败 - STOPPED: 已停止 * * @author Data Mate Platform Team */ diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/CollectionTask.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java similarity index 66% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/CollectionTask.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java index f40afa7e..5bbdb2a2 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/CollectionTask.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java @@ -1,32 +1,36 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.domain.model.entity; +import com.baomidou.mybatisplus.annotation.TableName; +import com.datamate.collection.common.enums.SyncMode; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.common.domain.model.base.BaseEntity; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; -import java.time.LocalDateTime; import java.util.Collections; import java.util.Map; -@Data -public class CollectionTask { - private String id; +/** + * 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐) + */ +@Getter +@Setter +@TableName(value = "t_dc_collection_tasks", autoResultMap = true) +public class CollectionTask extends BaseEntity { private String name; private String description; private String config; // DataX JSON 配置,包含源端和目标端配置信息 private TaskStatus status; - private String syncMode; // ONCE / SCHEDULED + private SyncMode syncMode; // ONCE / SCHEDULED private String scheduleExpression; private Integer retryCount; private Integer timeoutSeconds; private Long maxRecords; private String sortField; private String lastExecutionId; - private LocalDateTime createdAt; - private LocalDateTime updatedAt; - private String createdBy; - private String updatedBy; public void addPath() { try { diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/DataxTemplate.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/DataxTemplate.java similarity index 95% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/DataxTemplate.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/DataxTemplate.java index c537a67f..54792fdc 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/DataxTemplate.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/DataxTemplate.java @@ -1,4 +1,4 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.domain.model.entity; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskExecution.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/TaskExecution.java similarity index 70% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskExecution.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/TaskExecution.java index 05f1fe89..6009957e 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/TaskExecution.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/TaskExecution.java @@ -1,13 +1,19 @@ -package com.datamate.collection.domain.model; +package com.datamate.collection.domain.model.entity; +import com.baomidou.mybatisplus.annotation.TableName; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.common.domain.model.base.BaseEntity; import lombok.Data; +import lombok.Getter; +import lombok.Setter; import java.time.LocalDateTime; import java.util.UUID; -@Data -public class TaskExecution { - private String id; +@Getter +@Setter +@TableName(value = "t_dc_task_executions", autoResultMap = true) +public class TaskExecution extends BaseEntity { private String taskId; private String taskName; private TaskStatus status; @@ -25,7 +31,6 @@ public class TaskExecution { private String dataxJobId; private String config; private String result; - private LocalDateTime createdAt; public static TaskExecution initTaskExecution() { TaskExecution exec = new TaskExecution(); diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java new file mode 100644 index 00000000..73d904e2 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/process/ProcessRunner.java @@ -0,0 +1,21 @@ +package com.datamate.collection.domain.process; + +import com.datamate.collection.domain.model.entity.CollectionTask; + +/** + * 归集执行器接口 + * + * @since 2025/10/23 + */ +public interface ProcessRunner { + /** + * 执行归集任务 + * + * @param task 任务 + * @param executionId 执行ID + * @param timeoutSeconds 超时时间(秒) + * @return 执行结果 + * @throws Exception 执行异常 + */ + int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception; +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java new file mode 100644 index 00000000..21b0d169 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/CollectionTaskRepository.java @@ -0,0 +1,19 @@ +package com.datamate.collection.domain.repository; + +import com.baomidou.mybatisplus.extension.repository.IRepository; +import com.datamate.collection.domain.model.entity.CollectionTask; + +import java.util.List; + +/** + * 归集任务仓储层 + * + * @since 2025/10/23 + */ +public interface CollectionTaskRepository extends IRepository { + List selectActiveTasks(); + + void updateStatus(String id, String status); + + void updateLastExecution(String id, String lastExecutionId); +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java new file mode 100644 index 00000000..9b0c059d --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/repository/TaskExecutionRepository.java @@ -0,0 +1,19 @@ +package com.datamate.collection.domain.repository; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.datamate.collection.domain.model.entity.TaskExecution; + +import java.time.LocalDateTime; + +/** + * TaskExecutionRepository + * + * @since 2025/10/23 + */ +public interface TaskExecutionRepository extends IService { + TaskExecution selectLatestByTaskId(String taskId); + + void completeExecution(String executionId, String status, LocalDateTime completedAt, + Integer recordsProcessed, Long recordsTotal, + Long recordsSuccess, Long recordsFailed, String errorMessage); +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java new file mode 100644 index 00000000..29ffcd23 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java @@ -0,0 +1,124 @@ +package com.datamate.collection.infrastructure.datax; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.exec.*; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DataxProcessRunner implements ProcessRunner { + + private final DataxProperties props; + + @Override + public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception { + Path job = buildJobFile(task); + return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); + } + + private int runJob(File jobFile, String executionId, Duration timeout) throws Exception { + File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId)); + String python = props.getPythonPath(); + String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py"; + String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath()); + + log.info("Execute DataX: {}", cmd); + + CommandLine cl = CommandLine.parse(cmd); + DefaultExecutor executor = getExecutor(timeout, logFile); + + return executor.execute(cl); + } + + private static DefaultExecutor getExecutor(Duration timeout, File logFile) throws FileNotFoundException { + DefaultExecutor executor = new DefaultExecutor(); + + // 将日志追加输出到文件 + File parent = logFile.getParentFile(); + if (!parent.exists()) { + parent.mkdirs(); + } + + ExecuteStreamHandler streamHandler = new PumpStreamHandler( + new org.apache.commons.io.output.TeeOutputStream( + new java.io.FileOutputStream(logFile, true), System.out), + new org.apache.commons.io.output.TeeOutputStream( + new java.io.FileOutputStream(logFile, true), System.err) + ); + executor.setStreamHandler(streamHandler); + + ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis()); + executor.setWatchdog(watchdog); + return executor; + } + + private Path buildJobFile(CollectionTask task) throws IOException { + Files.createDirectories(Paths.get(props.getJobConfigPath())); + String fileName = String.format("datax-job-%s.json", task.getId()); + Path path = Paths.get(props.getJobConfigPath(), fileName); + // 简化:直接将任务中的 config 字段作为 DataX 作业 JSON + try (FileWriter fw = new FileWriter(path.toFile())) { + if (StringUtils.isBlank(task.getConfig())) { + throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); + } + String json = getJobConfig(task); + log.info("Job config: {}", json); + fw.write(json); + } + return path; + } + + private String getJobConfig(CollectionTask task) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + Map parameter = objectMapper.readValue( + task.getConfig(), + new TypeReference<>() { + } + ); + Map job = new HashMap<>(); + Map content = new HashMap<>(); + Map reader = new HashMap<>(); + reader.put("name", "nfsreader"); + reader.put("parameter", parameter); + content.put("reader", reader); + Map writer = new HashMap<>(); + writer.put("name", "nfswriter"); + writer.put("parameter", parameter); + content.put("writer", writer); + job.put("content", List.of(content)); + Map setting = new HashMap<>(); + Map channel = new HashMap<>(); + channel.put("channel", 2); + setting.put("speed", channel); + job.put("setting", setting); + Map jobConfig = new HashMap<>(); + jobConfig.put("job", job); + return objectMapper.writeValueAsString(jobConfig); + } catch (Exception e) { + log.error("Failed to parse task config", e); + throw new RuntimeException("Failed to parse task config", e); + } + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProperties.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProperties.java similarity index 90% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProperties.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProperties.java index e194444e..8a3644ce 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProperties.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProperties.java @@ -1,4 +1,4 @@ -package com.datamate.collection.infrastructure.runtime.datax; +package com.datamate.collection.infrastructure.datax; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java index 9359cc99..7c65b475 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/CollectionTaskMapper.java @@ -1,47 +1,15 @@ package com.datamate.collection.infrastructure.persistence.mapper; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.DataxTemplate; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.datamate.collection.domain.model.entity.CollectionTask; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; -import java.util.Map; @Mapper -public interface CollectionTaskMapper { - int insert(CollectionTask entity); - int update(CollectionTask entity); - int deleteById(@Param("id") String id); - CollectionTask selectById(@Param("id") String id); - CollectionTask selectByName(@Param("name") String name); - List selectByStatus(@Param("status") String status); - List selectAll(Map params); +public interface CollectionTaskMapper extends BaseMapper { int updateStatus(@Param("id") String id, @Param("status") String status); int updateLastExecution(@Param("id") String id, @Param("lastExecutionId") String lastExecutionId); List selectActiveTasks(); - - /** - * 查询模板列表 - * - * @param sourceType 源数据源类型(可选) - * @param targetType 目标数据源类型(可选) - * @param offset 偏移量 - * @param limit 限制数量 - * @return 模板列表 - */ - List selectList(@Param("sourceType") String sourceType, - @Param("targetType") String targetType, - @Param("offset") int offset, - @Param("limit") int limit); - - /** - * 统计模板数量 - * - * @param sourceType 源数据源类型(可选) - * @param targetType 目标数据源类型(可选) - * @return 模板总数 - */ - int countTemplates(@Param("sourceType") String sourceType, - @Param("targetType") String targetType); } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java index 645d885e..2c224267 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/mapper/TaskExecutionMapper.java @@ -1,38 +1,22 @@ package com.datamate.collection.infrastructure.persistence.mapper; -import com.datamate.collection.domain.model.TaskExecution; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.datamate.collection.domain.model.entity.TaskExecution; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; @Mapper -public interface TaskExecutionMapper { - int insert(TaskExecution entity); - int update(TaskExecution entity); - int deleteById(@Param("id") String id); - TaskExecution selectById(@Param("id") String id); - List selectByTaskId(@Param("taskId") String taskId, @Param("limit") Integer limit); - List selectByStatus(@Param("status") String status); - List selectAll(Map params); - long count(Map params); - int updateProgress(@Param("id") String id, - @Param("status") String status, - @Param("progress") Double progress, - @Param("recordsProcessed") Long recordsProcessed, - @Param("throughput") Double throughput); - int completeExecution(@Param("id") String id, - @Param("status") String status, - @Param("completedAt") LocalDateTime completedAt, - @Param("durationSeconds") Integer durationSeconds, - @Param("recordsSuccess") Long recordsSuccess, - @Param("recordsFailed") Long recordsFailed, - @Param("dataSizeBytes") Long dataSizeBytes, - @Param("errorMessage") String errorMessage, - @Param("result") String result); - List selectRunningExecutions(); +public interface TaskExecutionMapper extends BaseMapper { TaskExecution selectLatestByTaskId(@Param("taskId") String taskId); - int deleteOldExecutions(@Param("beforeDate") LocalDateTime beforeDate); + + void completeExecution(@Param("executionId") String executionId, + @Param("status") String status, + @Param("completedAt") LocalDateTime completedAt, + @Param("recordsProcessed") Integer recordsProcessed, + @Param("recordsTotal") Long recordsTotal, + @Param("recordsSuccess") Long recordsSuccess, + @Param("recordsFailed") Long recordsFailed, + @Param("errorMessage") String errorMessage); } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java new file mode 100644 index 00000000..c17e1853 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/CollectionTaskRepositoryImpl.java @@ -0,0 +1,36 @@ +package com.datamate.collection.infrastructure.persistence.repository; + +import com.baomidou.mybatisplus.extension.repository.CrudRepository; +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.repository.CollectionTaskRepository; +import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.util.List; + +/** + * CollectionTaskRepositoryImpl + * + * @since 2025/10/23 + */ +@Repository +@RequiredArgsConstructor +public class CollectionTaskRepositoryImpl extends CrudRepository implements CollectionTaskRepository { + private final CollectionTaskMapper collectionTaskMapper; + + @Override + public List selectActiveTasks() { + return collectionTaskMapper.selectActiveTasks(); + } + + @Override + public void updateStatus(String id, String status) { + collectionTaskMapper.updateStatus(id, status); + } + + @Override + public void updateLastExecution(String id, String lastExecutionId) { + collectionTaskMapper.updateLastExecution(id, lastExecutionId); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java new file mode 100644 index 00000000..2263fc98 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/repository/TaskExecutionRepositoryImpl.java @@ -0,0 +1,37 @@ +package com.datamate.collection.infrastructure.persistence.repository; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.datamate.collection.domain.model.entity.TaskExecution; +import com.datamate.collection.domain.repository.TaskExecutionRepository; +import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.time.LocalDateTime; + +/** + * TaskExecutionRepositoryImpl + * + * @since 2025/10/23 + */ +@Repository +@RequiredArgsConstructor +public class TaskExecutionRepositoryImpl extends ServiceImpl + implements TaskExecutionRepository { + + private final TaskExecutionMapper taskExecutionMapper; + + @Override + public TaskExecution selectLatestByTaskId(String taskId) { + return taskExecutionMapper.selectLatestByTaskId(taskId); + } + + @Override + public void completeExecution(String executionId, String status, LocalDateTime completedAt, + Integer recordsProcessed, Long recordsTotal, + Long recordsSuccess, Long recordsFailed, String errorMessage) { + taskExecutionMapper.completeExecution(executionId, status, completedAt, + recordsProcessed, recordsTotal, + recordsSuccess, recordsFailed, errorMessage); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/typehandler/TaskStatusTypeHandler.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/persistence/typehandler/TaskStatusTypeHandler.java deleted file mode 100644 index e69de29b..00000000 diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java deleted file mode 100644 index db57d2ae..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxJobBuilder.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.datamate.collection.infrastructure.runtime.datax; - -import com.datamate.collection.domain.model.CollectionTask; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; - -import java.io.FileWriter; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * 根据任务配置拼装 DataX 作业 JSON 文件 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class DataxJobBuilder { - - private final DataxProperties props; - - public Path buildJobFile(CollectionTask task) throws IOException { - Files.createDirectories(Paths.get(props.getJobConfigPath())); - String fileName = String.format("datax-job-%s.json", task.getId()); - Path path = Paths.get(props.getJobConfigPath(), fileName); - // 简化:直接将任务中的 config 字段作为 DataX 作业 JSON - try (FileWriter fw = new FileWriter(path.toFile())) { - String json = task.getConfig() == null || task.getConfig().isEmpty() ? - defaultJobJson() : task.getConfig(); - if (StringUtils.isNotBlank(task.getConfig())) { - json = getJobConfig(task); - } - log.info("Job config: {}", json); - fw.write(json); - } - return path; - } - - private String getJobConfig(CollectionTask task) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - Map parameter = objectMapper.readValue( - task.getConfig(), - new TypeReference<>() {} - ); - Map job = new HashMap<>(); - Map content = new HashMap<>(); - Map reader = new HashMap<>(); - reader.put("name", "nfsreader"); - reader.put("parameter", parameter); - content.put("reader", reader); - Map writer = new HashMap<>(); - writer.put("name", "nfswriter"); - writer.put("parameter", parameter); - content.put("writer", writer); - job.put("content", List.of(content)); - Map setting = new HashMap<>(); - Map channel = new HashMap<>(); - channel.put("channel", 2); - setting.put("speed", channel); - job.put("setting", setting); - Map jobConfig = new HashMap<>(); - jobConfig.put("job", job); - return objectMapper.writeValueAsString(jobConfig); - } catch (Exception e) { - log.error("Failed to parse task config", e); - throw new RuntimeException("Failed to parse task config", e); - } - } - - private String defaultJobJson() { - // 提供一个最小可运行的空 job,实际会被具体任务覆盖 - return "{\n \"job\": {\n \"setting\": {\n \"speed\": {\n \"channel\": 1\n }\n },\n \"content\": []\n }\n}"; - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java deleted file mode 100644 index fda00a83..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/runtime/datax/DataxProcessRunner.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.datamate.collection.infrastructure.runtime.datax; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.exec.*; -import org.springframework.stereotype.Component; - -import java.io.File; -import java.time.Duration; - -@Slf4j -@Component -@RequiredArgsConstructor -public class DataxProcessRunner { - - private final DataxProperties props; - - public int runJob(File jobFile, String executionId, Duration timeout) throws Exception { - File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId)); - String python = props.getPythonPath(); - String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py"; - String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath()); - - log.info("Execute DataX: {}", cmd); - - CommandLine cl = CommandLine.parse(cmd); - DefaultExecutor executor = new DefaultExecutor(); - - // 将日志追加输出到文件 - File parent = logFile.getParentFile(); - if (!parent.exists()) parent.mkdirs(); - - ExecuteStreamHandler streamHandler = new PumpStreamHandler( - new org.apache.commons.io.output.TeeOutputStream( - new java.io.FileOutputStream(logFile, true), System.out), - new org.apache.commons.io.output.TeeOutputStream( - new java.io.FileOutputStream(logFile, true), System.err) - ); - executor.setStreamHandler(streamHandler); - - ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis()); - executor.setWatchdog(watchdog); - - return executor.execute(cl); - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java index 1cbdde3c..2b453839 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java @@ -1,16 +1,18 @@ package com.datamate.collection.interfaces.converter; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.DataxTemplate; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.interfaces.dto.*; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.common.interfaces.PagedResponse; import com.fasterxml.jackson.databind.ObjectMapper; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; import org.mapstruct.factory.Mappers; +import java.util.List; import java.util.Map; @Mapper @@ -20,9 +22,7 @@ public interface CollectionTaskConverter { @Mapping(source = "config", target = "config", qualifiedByName = "parseJsonToMap") CollectionTaskResponse toResponse(CollectionTask task); - CollectionTaskSummary toSummary(CollectionTask task); - - DataxTemplateSummary toTemplateSummary(DataxTemplate template); + List toResponse(List tasks); @Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString") CollectionTask toCollectionTask(CreateCollectionTaskRequest request); @@ -30,11 +30,19 @@ public interface CollectionTaskConverter { @Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString") CollectionTask toCollectionTask(UpdateCollectionTaskRequest request); + @Mapping(source = "current", target = "page") + @Mapping(source = "size", target = "size") + @Mapping(source = "total", target = "totalElements") + @Mapping(source = "pages", target = "totalPages") + @Mapping(source = "records", target = "content") + PagedResponse toResponse(IPage tasks); + @Named("parseJsonToMap") default Map parseJsonToMap(String json) { try { ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(json, Map.class); + return + objectMapper.readValue(json, Map.class); } catch (Exception e) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java new file mode 100644 index 00000000..024d96d8 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskPagingQuery.java @@ -0,0 +1,25 @@ +package com.datamate.collection.interfaces.dto; + +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.common.interfaces.PagingQuery; +import lombok.Getter; +import lombok.Setter; + +/** + * 归集任务分页查询参数 + * + * @since 2025/10/23 + */ +@Getter +@Setter +public class CollectionTaskPagingQuery extends PagingQuery { + /** + * 任务状态 + */ + private TaskStatus status; + + /** + * 任务名称 + */ + private String name; +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java new file mode 100644 index 00000000..08f61034 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java @@ -0,0 +1,48 @@ +package com.datamate.collection.interfaces.dto; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.common.enums.SyncMode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.springframework.format.annotation.DateTimeFormat; +import jakarta.validation.Valid; + +/** + * CollectionTaskResponse + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class CollectionTaskResponse { + + private String id; + + private String name; + + private String description; + + @Valid + private Map config = new HashMap<>(); + + private TaskStatus status; + + private SyncMode syncMode; + + private String scheduleExpression; + + private String lastExecutionId; + + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + private LocalDateTime createdAt; + + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + private LocalDateTime updatedAt; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java new file mode 100644 index 00000000..f2f029f0 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java @@ -0,0 +1,53 @@ +package com.datamate.collection.interfaces.dto; + +import com.datamate.collection.common.enums.SyncMode; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.*; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * CreateCollectionTaskRequest + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class CreateCollectionTaskRequest { + @NotNull + @Size(min = 1, max = 100) + @Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("name") + private String name; + + @Size(max = 500) + @Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("description") + private String description; + + @Valid + @NotNull + @Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("config") + private Map config = new HashMap<>(); + + @NotNull + @Valid + @Schema(name = "syncMode", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("syncMode") + private SyncMode syncMode; + + @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("scheduleExpression") + private String scheduleExpression; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java new file mode 100644 index 00000000..9321be80 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java @@ -0,0 +1,50 @@ +package com.datamate.collection.interfaces.dto; + +import com.datamate.collection.common.enums.SyncMode; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.*; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * UpdateCollectionTaskRequest + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class UpdateCollectionTaskRequest { + @Size(min = 1, max = 100) + @Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("name") + private String name; + + @Size(max = 500) + @Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("description") + private String description; + + @Valid + @Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("config") + private Map config = new HashMap<>(); + + @Valid + @Schema(name = "syncMode", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("syncMode") + private SyncMode syncMode; + + @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) + @JsonProperty("scheduleExpression") + private String scheduleExpression; +} + diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java index d9aa3535..b893a73c 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java @@ -1,38 +1,36 @@ package com.datamate.collection.interfaces.rest; -import com.datamate.collection.application.service.CollectionTaskService; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.DataxTemplate; -import com.datamate.collection.interfaces.api.CollectionTaskApi; +import com.datamate.collection.application.CollectionTaskService; +import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.interfaces.converter.CollectionTaskConverter; import com.datamate.collection.interfaces.dto.*; +import com.datamate.common.interfaces.PagedResponse; +import jakarta.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.*; -import java.util.stream.Collectors; @Slf4j @RestController +@RequestMapping("/data-collection/tasks") @RequiredArgsConstructor -@Validated -public class CollectionTaskController implements CollectionTaskApi { +public class CollectionTaskController{ private final CollectionTaskService taskService; - @Override - public ResponseEntity createTask(CreateCollectionTaskRequest request) { + @PostMapping + public ResponseEntity createTask(@Valid @RequestBody CreateCollectionTaskRequest request) { CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); task.setId(UUID.randomUUID().toString()); task.addPath(); return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task))); } - @Override - public ResponseEntity updateTask(String id, UpdateCollectionTaskRequest request) { + @PutMapping("/{id}") + public ResponseEntity updateTask(@PathVariable("id") String id, @Valid @RequestBody UpdateCollectionTaskRequest request) { if (taskService.get(id) == null) { return ResponseEntity.notFound().build(); } @@ -41,43 +39,20 @@ public ResponseEntity updateTask(String id, UpdateCollec return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task))); } - @Override - public ResponseEntity deleteTask(String id) { + @DeleteMapping("/{id}") + public ResponseEntity deleteTask(@PathVariable("id") String id) { taskService.delete(id); return ResponseEntity.ok().build(); } - @Override - public ResponseEntity getTaskDetail(String id) { + @GetMapping("/{id}") + public ResponseEntity getTaskDetail(@PathVariable("id") String id) { CollectionTask task = taskService.get(id); return task == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(task)); } - @Override - public ResponseEntity getTasks(Integer page, Integer size, TaskStatus status, String name) { - var list = taskService.list(page, size, status == null ? null : status.getValue(), name); - PagedCollectionTaskSummary response = new PagedCollectionTaskSummary(); - response.setContent(list.stream().map(CollectionTaskConverter.INSTANCE::toSummary).collect(Collectors.toList())); - response.setNumber(page); - response.setSize(size); - response.setTotalElements(list.size()); // 简化处理,实际项目中应该有单独的count查询 - response.setTotalPages(size == null || size == 0 ? 1 : (int) Math.ceil(list.size() * 1.0 / size)); - return ResponseEntity.ok(response); - } - - @Override - public ResponseEntity templatesGet(String sourceType, String targetType, - Integer page, Integer size) { - int pageNum = page != null ? page : 0; - int pageSize = size != null ? size : 20; - List templates = taskService.listTemplates(sourceType, targetType, pageNum, pageSize); - int totalElements = taskService.countTemplates(sourceType, targetType); - PagedDataxTemplates response = new PagedDataxTemplates(); - response.setContent(templates.stream().map(CollectionTaskConverter.INSTANCE::toTemplateSummary).collect(Collectors.toList())); - response.setNumber(pageNum); - response.setSize(pageSize); - response.setTotalElements(totalElements); - response.setTotalPages(pageSize > 0 ? (int) Math.ceil(totalElements * 1.0 / pageSize) : 1); - return ResponseEntity.ok(response); + @GetMapping + public ResponseEntity> getTasks(@Valid CollectionTaskPagingQuery query) { + return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query))); } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java deleted file mode 100644 index 6d392bbf..00000000 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/TaskExecutionController.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.datamate.collection.interfaces.rest; - -import com.datamate.collection.application.service.CollectionTaskService; -import com.datamate.collection.application.service.TaskExecutionService; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.interfaces.api.TaskExecutionApi; -import com.datamate.collection.interfaces.dto.PagedTaskExecutions; -import com.datamate.collection.interfaces.dto.TaskExecutionDetail; -import com.datamate.collection.interfaces.dto.TaskExecutionResponse; -import com.datamate.collection.interfaces.dto.TaskStatus; // DTO enum -import lombok.RequiredArgsConstructor; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.RestController; - -import java.util.stream.Collectors; - -@RestController -@RequiredArgsConstructor -@Validated -public class TaskExecutionController implements TaskExecutionApi { - - private final TaskExecutionService executionService; - private final CollectionTaskService taskService; - - private TaskExecutionDetail toDetail(TaskExecution e) { - TaskExecutionDetail d = new TaskExecutionDetail(); - d.setId(e.getId()); - d.setTaskId(e.getTaskId()); - d.setTaskName(e.getTaskName()); - if (e.getStatus() != null) { d.setStatus(TaskStatus.fromValue(e.getStatus().name())); } - d.setProgress(e.getProgress()); - d.setRecordsTotal(e.getRecordsTotal() != null ? e.getRecordsTotal().intValue() : null); - d.setRecordsProcessed(e.getRecordsProcessed() != null ? e.getRecordsProcessed().intValue() : null); - d.setRecordsSuccess(e.getRecordsSuccess() != null ? e.getRecordsSuccess().intValue() : null); - d.setRecordsFailed(e.getRecordsFailed() != null ? e.getRecordsFailed().intValue() : null); - d.setThroughput(e.getThroughput()); - d.setDataSizeBytes(e.getDataSizeBytes() != null ? e.getDataSizeBytes().intValue() : null); - d.setStartedAt(e.getStartedAt()); - d.setCompletedAt(e.getCompletedAt()); - d.setDurationSeconds(e.getDurationSeconds()); - d.setErrorMessage(e.getErrorMessage()); - return d; - } - - // GET /executions/{id} - @Override - public ResponseEntity executionsIdGet(String id) { - var exec = executionService.get(id); - return exec == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(toDetail(exec)); - } - - // DELETE /executions/{id} - @Override - public ResponseEntity executionsIdDelete(String id) { - executionService.stop(id); // 幂等处理,在service内部判断状态 - return ResponseEntity.noContent().build(); - } - - // POST /tasks/{id}/execute -> 201 - @Override - public ResponseEntity tasksIdExecutePost(String id) { - var task = taskService.get(id); - if (task == null) { return ResponseEntity.notFound().build(); } - var latestExec = executionService.getLatestByTaskId(id); - if (latestExec != null && latestExec.getStatus() == com.datamate.collection.domain.model.TaskStatus.RUNNING) { - TaskExecutionResponse r = new TaskExecutionResponse(); - r.setId(latestExec.getId()); - r.setTaskId(latestExec.getTaskId()); - r.setTaskName(latestExec.getTaskName()); - r.setStatus(TaskStatus.fromValue(latestExec.getStatus().name())); - r.setStartedAt(latestExec.getStartedAt()); - return ResponseEntity.status(HttpStatus.CREATED).body(r); // 返回已有运行实例 - } - var exec = taskService.startExecution(task); - TaskExecutionResponse r = new TaskExecutionResponse(); - r.setId(exec.getId()); - r.setTaskId(exec.getTaskId()); - r.setTaskName(exec.getTaskName()); - r.setStatus(TaskStatus.fromValue(exec.getStatus().name())); - r.setStartedAt(exec.getStartedAt()); - return ResponseEntity.status(HttpStatus.CREATED).body(r); - } - - // GET /tasks/{id}/executions -> 分页 - @Override - public ResponseEntity tasksIdExecutionsGet(String id, Integer page, Integer size) { - if (page == null || page < 0) { page = 0; } - if (size == null || size <= 0) { size = 20; } - var list = executionService.list(id, null, null, null, page, size); - long total = executionService.count(id, null, null, null); - PagedTaskExecutions p = new PagedTaskExecutions(); - p.setContent(list.stream().map(this::toDetail).collect(Collectors.toList())); - p.setNumber(page); - p.setSize(size); - p.setTotalElements((int) total); - p.setTotalPages(size == 0 ? 1 : (int) Math.ceil(total * 1.0 / size)); - return ResponseEntity.ok(p); - } -} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/scheduler/TaskSchedulerInitializer.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java similarity index 68% rename from backend/services/data-collection-service/src/main/java/com/datamate/collection/application/scheduler/TaskSchedulerInitializer.java rename to backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java index 0203cab3..d0d0f260 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/scheduler/TaskSchedulerInitializer.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java @@ -1,11 +1,10 @@ -package com.datamate.collection.application.scheduler; +package com.datamate.collection.interfaces.scheduler; -import com.datamate.collection.application.service.DataxExecutionService; -import com.datamate.collection.domain.model.CollectionTask; -import com.datamate.collection.domain.model.TaskStatus; -import com.datamate.collection.domain.model.TaskExecution; -import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; -import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper; +import com.datamate.collection.application.CollectionTaskService; +import com.datamate.collection.application.TaskExecutionService; +import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.datamate.collection.domain.model.entity.TaskExecution; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -21,14 +20,13 @@ @RequiredArgsConstructor public class TaskSchedulerInitializer { - private final CollectionTaskMapper taskMapper; - private final TaskExecutionMapper executionMapper; - private final DataxExecutionService dataxExecutionService; + private final CollectionTaskService collectionTaskService; + private final TaskExecutionService taskExecutionService; // 定期扫描激活的采集任务,根据 Cron 判断是否到期执行 @Scheduled(fixedDelayString = "${datamate.data-collection.scheduler.scan-interval-ms:10000}") public void scanAndTrigger() { - List tasks = taskMapper.selectActiveTasks(); + List tasks = collectionTaskService.selectActiveTasks(); if (tasks == null || tasks.isEmpty()) { return; } @@ -40,7 +38,7 @@ public void scanAndTrigger() { } try { // 如果最近一次执行仍在运行,则跳过 - TaskExecution latest = executionMapper.selectLatestByTaskId(task.getId()); + TaskExecution latest = taskExecutionService.selectLatestByTaskId(task.getId()); if (latest != null && latest.getStatus() == TaskStatus.RUNNING) { continue; } @@ -53,9 +51,9 @@ public void scanAndTrigger() { if (nextTime != null && !nextTime.isAfter(now)) { // 到期,触发一次执行 - TaskExecution exec = dataxExecutionService.createExecution(task); + TaskExecution exec = taskExecutionService.createExecution(task); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - dataxExecutionService.runAsync(task, exec.getId(), timeout); + taskExecutionService.runAsync(task, exec.getId(), timeout); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId()); } } catch (Exception ex) { diff --git a/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml b/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml index 3a195a28..6d3033bb 100644 --- a/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml +++ b/backend/services/data-collection-service/src/main/resources/mappers/CollectionTaskMapper.xml @@ -1,11 +1,10 @@ - - + @@ -24,21 +23,6 @@ - - - - - - - - - - - - - - - id, @@ -47,96 +31,6 @@ last_execution_id, created_at, updated_at, created_by, updated_by - - - id, name, source_type, target_type, template_content, description, version, is_system, created_at, updated_at, created_by - - - - - INSERT INTO t_dc_collection_tasks (id, name, description, config, status, sync_mode, - schedule_expression, retry_count, timeout_seconds, max_records, sort_field, - last_execution_id, created_at, updated_at, created_by, updated_by) - VALUES (#{id}, #{name}, #{description}, #{config}, #{status}, #{syncMode}, - #{scheduleExpression}, #{retryCount}, #{timeoutSeconds}, #{maxRecords}, #{sortField}, - #{lastExecutionId}, #{createdAt}, #{updatedAt}, #{createdBy}, #{updatedBy}) - - - - - UPDATE t_dc_collection_tasks - SET name = #{name}, - description = #{description}, - config = #{config}, - status = #{status}, - sync_mode = #{syncMode}, - schedule_expression = #{scheduleExpression}, - retry_count = #{retryCount}, - timeout_seconds = #{timeoutSeconds}, - max_records = #{maxRecords}, - sort_field = #{sortField}, - last_execution_id = #{lastExecutionId}, - updated_at = #{updatedAt}, - updated_by = #{updatedBy} - WHERE id = #{id} - - - - - DELETE FROM t_dc_collection_tasks WHERE id = #{id} - - - - - - - - - - - - - - - - - UPDATE t_dc_collection_tasks SET status = #{status}, updated_at = NOW() WHERE id = #{id} @@ -154,35 +48,4 @@ AND schedule_expression IS NOT NULL ORDER BY created_at DESC - - - - - - - diff --git a/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml b/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml index 6b6d0a03..e9e9c866 100644 --- a/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml +++ b/backend/services/data-collection-service/src/main/resources/mappers/TaskExecutionMapper.xml @@ -1,191 +1,28 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - id, task_id, task_name, status, progress, records_total, records_processed, - records_success, records_failed, throughput, data_size_bytes, started_at, - completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at - - - - - INSERT INTO t_dc_task_executions ( - id, task_id, task_name, status, progress, records_total, records_processed, - records_success, records_failed, throughput, data_size_bytes, started_at, - completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at - ) VALUES ( - #{id}, #{taskId}, #{taskName}, #{status}, #{progress}, #{recordsTotal}, #{recordsProcessed}, - #{recordsSuccess}, #{recordsFailed}, #{throughput}, #{dataSizeBytes}, #{startedAt}, - #{completedAt}, #{durationSeconds}, #{errorMessage}, #{dataxJobId}, #{config}, #{result}, #{createdAt} - ) - - - - - UPDATE t_dc_task_executions - SET status = #{status}, - progress = #{progress}, - records_total = #{recordsTotal}, - records_processed = #{recordsProcessed}, - records_success = #{recordsSuccess}, - records_failed = #{recordsFailed}, - throughput = #{throughput}, - data_size_bytes = #{dataSizeBytes}, - completed_at = #{completedAt}, - duration_seconds = #{durationSeconds}, - error_message = #{errorMessage}, - result = #{result} - WHERE id = #{id} - - - - - DELETE FROM t_dc_task_executions WHERE id = #{id} - - - - - - - + SELECT * FROM t_dc_task_executions WHERE task_id = #{taskId} ORDER BY started_at DESC - - LIMIT #{limit} - - - - - - - - - - - - - - UPDATE t_dc_task_executions - SET status = #{status}, - progress = #{progress}, - records_processed = #{recordsProcessed}, - throughput = #{throughput} - WHERE id = #{id} - - UPDATE t_dc_task_executions SET status = #{status}, - progress = 100.00, completed_at = #{completedAt}, - duration_seconds = #{durationSeconds}, + records_processed = #{recordsProcessed}, + records_total = #{recordsTotal}, records_success = #{recordsSuccess}, records_failed = #{recordsFailed}, - data_size_bytes = #{dataSizeBytes}, error_message = #{errorMessage}, - result = #{result} - WHERE id = #{id} + updated_at = NOW() + WHERE id = #{executionId} - - - - - - - - - DELETE FROM t_dc_task_executions - WHERE started_at < #{beforeDate} - - diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index fa6b07a0..b14db920 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -2,6 +2,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.datamate.common.domain.utils.ChunksSaver; import com.datamate.datamanagement.interfaces.dto.*; import com.datamate.common.infrastructure.exception.BusinessAssert; import com.datamate.common.interfaces.PagedResponse; @@ -100,8 +101,13 @@ public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatase /** * 删除数据集 */ + @Transactional public void deleteDataset(String datasetId) { + Dataset dataset = datasetRepository.getById(datasetId); datasetRepository.removeById(datasetId); + if (dataset != null) { + ChunksSaver.deleteFolder(dataset.getPath()); + } } /** diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java index 6b0429c9..f67121c9 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/mapper/DatasetFileMapper.java @@ -24,7 +24,6 @@ List findByCriteria(@Param("datasetId") String datasetId, @Param("status") String status, RowBounds rowBounds); - int insert(DatasetFile file); int update(DatasetFile file); int deleteById(@Param("id") String id); } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java index 8aa059e8..1c8250b1 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java @@ -60,7 +60,7 @@ public FileUploadResult chunkUpload(ChunkUploadRequest uploadFileRequest) { boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum()); if (isFinish) { // 删除存分片的临时路径 - ChunksSaver.deleteFiles(new File(preRequest.getUploadPath(), + ChunksSaver.deleteFolder(new File(preRequest.getUploadPath(), String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId())).getPath()); chunkUploadRequestMapper.deleteById(preRequest.getId()); } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java index 539cbb64..079b2cdd 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java @@ -2,6 +2,8 @@ import com.datamate.common.domain.model.ChunkUploadPreRequest; import com.datamate.common.domain.model.ChunkUploadRequest; +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.springframework.web.multipart.MultipartFile; @@ -106,29 +108,17 @@ public static void appendToTargetFile(File targetFile, InputStream inputStream) * * @param uploadPath 文件路径 */ - public static void deleteFiles(String uploadPath) { - File dic = new File(uploadPath); - if (!dic.exists()) { - return; - } - File[] files = dic.listFiles(); - if (files == null || files.length == 0) { - dic.delete(); + public static void deleteFolder(String uploadPath) { + File folder = new File(uploadPath); + + if (!folder.exists()) { + log.info("folder {} does not exist", uploadPath); return; } try { - for (File file : files) { - if (file.isDirectory()) { - deleteFiles(file.getPath()); - } else { - file.delete(); - } - } - if (dic.exists()) { - dic.delete(); - } - } catch (SecurityException e) { - log.warn("Fail to delete file", e); + FileUtils.deleteDirectory(folder); + } catch (IOException e) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); } } } diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql index dbd923f9..62a611a5 100644 --- a/scripts/db/data-collection-init.sql +++ b/scripts/db/data-collection-init.sql @@ -31,7 +31,10 @@ CREATE TABLE t_dc_task_executions ( error_message TEXT COMMENT '错误信息', datax_job_id TEXT COMMENT 'datax任务ID', result TEXT COMMENT '执行结果', - created_at TIMESTAMP NULL COMMENT '创建时间', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + created_by VARCHAR(255) COMMENT '创建者', + updated_by VARCHAR(255) COMMENT '更新者', INDEX idx_task_id (task_id), INDEX idx_status (status), INDEX idx_started_at (started_at) From 8d7bb8aecfc6a0f7b5a9cbb4de6eb2359598a3ef Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Fri, 24 Oct 2025 10:42:21 +0800 Subject: [PATCH 2/9] =?UTF-8?q?feature:=20=E6=AF=8F=E5=A4=A9=E5=87=8C?= =?UTF-8?q?=E6=99=A800=EF=BC=9A00=E6=89=AB=E6=8F=8F=E6=89=80=E6=9C=89?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=9B=86=EF=BC=8C=E6=A3=80=E6=9F=A5=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=9B=86=E6=98=AF=E5=90=A6=E8=B6=85=E8=BF=87=E4=BA=86?= =?UTF-8?q?=E9=A2=84=E8=AE=BE=E7=9A=84=E4=BF=9D=E7=95=99=E5=A4=A9=E6=95=B0?= =?UTF-8?q?=EF=BC=8C=E8=B6=85=E5=87=BA=E4=BF=9D=E7=95=99=E5=A4=A9=E6=95=B0?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE=E9=9B=86=E8=B0=83=E7=94=A8=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E6=8E=A5=E5=8F=A3=E8=BF=9B=E8=A1=8C=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../converter/DatasetConverter.java | 1 - .../interfaces/dto/DatasetResponse.java | 2 + .../scheduler/DatasetScheduler.java | 65 +++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java index 0247ffef..251033e5 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java @@ -7,7 +7,6 @@ import com.datamate.common.domain.model.ChunkUploadRequest; import com.datamate.datamanagement.domain.model.dataset.Dataset; import com.datamate.datamanagement.domain.model.dataset.DatasetFile; -import com.datamate.datamanagement.interfaces.dto.*; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.factory.Mappers; diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java index e7b1779e..b09f9856 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java @@ -24,6 +24,8 @@ public class DatasetResponse { private String status; /** 标签列表 */ private List tags; + /** 数据集保留天数 */ + private Integer retentionDays; /** 数据源 */ private String dataSource; /** 目标位置 */ diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java new file mode 100644 index 00000000..a9b4bb46 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java @@ -0,0 +1,65 @@ +package com.datamate.datamanagement.interfaces.scheduler; + +import com.datamate.common.interfaces.PagedResponse; +import com.datamate.datamanagement.application.DatasetApplicationService; +import com.datamate.datamanagement.interfaces.dto.DatasetPagingQuery; +import com.datamate.datamanagement.interfaces.dto.DatasetResponse; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 数据集定时任务触发 + * + * @since 2025/10/24 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DatasetScheduler { + private final DatasetApplicationService datasetApplicationService; + + /** + * 每天凌晨 00:00 扫描并删除超出保留期的数据集 + */ + @Scheduled(cron = "0 0 0 * * ?") + public void cleanupExpiredDatasets() { + int pageNo = 1; + int pageSize = 500; + + while (true) { + DatasetPagingQuery datasetPagingQuery = new DatasetPagingQuery(); + datasetPagingQuery.setPage(pageNo); + datasetPagingQuery.setSize(pageSize); + PagedResponse datasets = datasetApplicationService.getDatasets(datasetPagingQuery); + if (CollectionUtils.isEmpty(datasets.getContent())) { + break; + } + + datasets.getContent().forEach(dataset -> { + Integer retentionDays = dataset.getRetentionDays(); + LocalDateTime createdAt = dataset.getCreatedAt(); + if (retentionDays != null && retentionDays > 0 && createdAt != null) { + LocalDateTime expireAt = createdAt.plusDays(retentionDays); + if (expireAt.isBefore(LocalDateTime.now())) { + try { + log.info("Deleting dataset {}, expired at {} (retentionDays={})", dataset.getId(), expireAt, retentionDays); + datasetApplicationService.deleteDataset(dataset.getId()); + } catch (Exception e) { + log.warn("Failed to delete expired dataset {}: {}", dataset.getId(), e.getMessage()); + } + } + } + }); + + if (datasets.getPage() >= datasets.getTotalPages()) { + break; + } + pageNo++; + } + } +} From 738127ea2d03b789d59ed890f7d970f4ff925355 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Fri, 24 Oct 2025 11:04:25 +0800 Subject: [PATCH 3/9] =?UTF-8?q?fix:=20=E4=BF=AE=E6=94=B9=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=9B=86=E6=96=87=E4=BB=B6=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E4=B8=8A=E4=BC=A0=E5=88=B0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=9B=86=E4=B8=AD=E7=9A=84=E6=96=87=E4=BB=B6=E4=BC=9A=E5=90=8C?= =?UTF-8?q?=E6=97=B6=E5=88=A0=E9=99=A4=E6=95=B0=E6=8D=AE=E5=BA=93=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E8=AE=B0=E5=BD=95=E5=92=8C=E6=96=87=E4=BB=B6=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E4=B8=AD=E7=9A=84=E6=96=87=E4=BB=B6=EF=BC=8C=E5=BD=92?= =?UTF-8?q?=E9=9B=86=E8=BF=87=E6=9D=A5=E7=9A=84=E6=96=87=E4=BB=B6=E4=BB=85?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=95=B0=E6=8D=AE=E5=BA=93=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DatasetFileApplicationService.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index eadecc0c..da8e1980 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -155,20 +155,21 @@ public DatasetFile getDatasetFile(String datasetId, String fileId) { /** * 删除文件 */ + @Transactional public void deleteDatasetFile(String datasetId, String fileId) { DatasetFile file = getDatasetFile(datasetId, fileId); - try { - Path filePath = Paths.get(file.getFilePath()); - Files.deleteIfExists(filePath); - } catch (IOException ex) { - // ignore + Dataset dataset = datasetRepository.getById(datasetId); + // 删除文件时,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录 + if (file.getFilePath().startsWith(dataset.getPath())) { + try { + Path filePath = Paths.get(file.getFilePath()); + Files.deleteIfExists(filePath); + } catch (IOException ex) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } } datasetFileRepository.removeById(fileId); - - Dataset dataset = datasetRepository.getById(datasetId); - // 简单刷新统计(精确处理可从DB统计) - dataset.setFileCount(Math.max(0, dataset.getFileCount() - 1)); - dataset.setSizeBytes(Math.max(0, dataset.getSizeBytes() - (file.getFileSize() != null ? file.getFileSize() : 0))); + dataset.removeFile(file); datasetRepository.updateById(dataset); } From 98ee5b3f05a1f27064ce81d3ab6510a711da5f1c Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Fri, 24 Oct 2025 14:27:03 +0800 Subject: [PATCH 4/9] =?UTF-8?q?fix:=20=E5=A2=9E=E5=8A=A0=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E5=92=8C=E6=8E=A5=E5=8F=A3=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=EF=BC=8C=E5=88=A0=E9=99=A4=E4=B8=8D=E4=BD=BF=E7=94=A8=E7=9A=84?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/openapi/specs/data-management.yaml | 162 ++++++++++++++---- .../DatasetFileApplicationService.java | 71 +------- .../interfaces/dto/CreateDatasetRequest.java | 5 +- .../interfaces/dto/UpdateDatasetRequest.java | 5 + .../rest/DatasetFileController.java | 27 +-- 5 files changed, 139 insertions(+), 131 deletions(-) diff --git a/backend/openapi/specs/data-management.yaml b/backend/openapi/specs/data-management.yaml index d77083b4..1aac250b 100644 --- a/backend/openapi/specs/data-management.yaml +++ b/backend/openapi/specs/data-management.yaml @@ -39,7 +39,7 @@ paths: schema: type: integer default: 0 - description: 页码,从0开始 + description: 页码,从1开始 - name: size in: query schema: @@ -65,7 +65,7 @@ paths: in: query schema: type: string - enum: [ACTIVE, INACTIVE, PROCESSING] + enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED] description: 数据集状态过滤 responses: '200': @@ -231,40 +231,6 @@ paths: schema: $ref: '#/components/schemas/PagedDatasetFileResponse' - post: - tags: [DatasetFile] - summary: 上传文件到数据集 - operationId: uploadDatasetFile - description: 向指定数据集上传文件 - parameters: - - name: datasetId - in: path - required: true - schema: - type: string - description: 数据集ID - requestBody: - required: true - content: - multipart/form-data: - schema: - type: object - properties: - file: - type: string - format: binary - description: 要上传的文件 - description: - type: string - description: 文件描述 - responses: - '201': - description: 上传成功 - content: - application/json: - schema: - $ref: '#/components/schemas/DatasetFileResponse' - /data-management/datasets/{datasetId}/files/{fileId}: get: tags: [DatasetFile] @@ -342,6 +308,78 @@ paths: type: string format: binary + /data-management/datasets/{datasetId}/files/download: + get: + tags: [ DatasetFile ] + operationId: downloadDatasetFileAsZip + summary: 下载文件 + description: 下载数据集中全部文件 + parameters: + - name: datasetId + in: path + required: true + schema: + type: string + description: 数据集ID + responses: + '200': + description: 文件内容 + content: + application/octet-stream: + schema: + type: string + format: binary + + /data-management/datasets/{datasetId}/files/upload/pre-upload: + post: + tags: [ DatasetFile ] + operationId: preUpload + summary: 切片上传预上传 + description: 预上传接口,返回后续分片上传所需的请求ID + parameters: + - name: datasetId + in: path + required: true + schema: + type: string + description: 数据集ID + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/UploadFilesPreRequest' + responses: + '200': + description: 预上传成功,返回请求ID + content: + application/json: + schema: + type: string + + /data-management/datasets/{datasetId}/files/upload/chunk: + post: + tags: [ DatasetFile ] + operationId: chunkUpload + summary: 切片上传 + description: 使用预上传返回的请求ID进行分片上传 + parameters: + - name: datasetId + in: path + required: true + schema: + type: string + description: 数据集ID + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/UploadFileRequest' + responses: + '200': + description: 上传成功 + /data-management/dataset-types: get: operationId: getDatasetTypes @@ -548,9 +586,59 @@ components: description: 标签列表 status: type: string - enum: [ACTIVE, INACTIVE] + enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED] description: 数据集状态 + UploadFilesPreRequest: + type: object + description: 切片上传预上传请求 + properties: + hasArchive: + type: boolean + description: 是否为压缩包上传 + default: false + totalFileNum: + type: integer + format: int32 + minimum: 1 + description: 总文件数量 + totalSize: + type: integer + format: int64 + description: 总文件大小(字节) + required: [ totalFileNum ] + + UploadFileRequest: + type: object + description: 分片上传请求 + properties: + reqId: + type: string + description: 预上传返回的请求ID + fileNo: + type: integer + format: int32 + description: 文件编号(批量中的第几个) + fileName: + type: string + description: 文件名称 + totalChunkNum: + type: integer + format: int32 + description: 文件总分片数量 + chunkNo: + type: integer + format: int32 + description: 当前分片编号(从1开始) + file: + type: string + format: binary + description: 分片二进制内容 + checkSumHex: + type: string + description: 分片校验和(十六进制) + required: [ reqId, fileNo, fileName, totalChunkNum, chunkNo, file ] + DatasetTypeResponse: type: object properties: diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index da8e1980..399a07e4 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -10,7 +10,6 @@ import com.datamate.datamanagement.domain.model.dataset.Dataset; import com.datamate.datamanagement.domain.model.dataset.DatasetFile; import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo; -import com.datamate.datamanagement.domain.model.dataset.StatusConstants; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; import com.datamate.datamanagement.interfaces.converter.DatasetConverter; @@ -31,7 +30,6 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.multipart.MultipartFile; import java.io.BufferedInputStream; import java.io.File; @@ -41,7 +39,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; @@ -60,7 +57,6 @@ public class DatasetFileApplicationService { private final DatasetFileRepository datasetFileRepository; private final DatasetRepository datasetRepository; - private final Path fileStorageLocation; private final FileService fileService; @Value("${dataset.base.path:/dataset}") @@ -68,61 +64,10 @@ public class DatasetFileApplicationService { @Autowired public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository, - DatasetRepository datasetRepository, FileService fileService, - @Value("${app.file.upload-dir:./dataset}") String uploadDir) { + DatasetRepository datasetRepository, FileService fileService) { this.datasetFileRepository = datasetFileRepository; this.datasetRepository = datasetRepository; - this.fileStorageLocation = Paths.get(uploadDir).toAbsolutePath().normalize(); this.fileService = fileService; - try { - Files.createDirectories(this.fileStorageLocation); - } catch (Exception ex) { - throw new RuntimeException("Could not create the directory where the uploaded files will be stored.", ex); - } - } - - /** - * 上传文件到数据集 - */ - public DatasetFile uploadFile(String datasetId, MultipartFile file) { - Dataset dataset = datasetRepository.getById(datasetId); - if (dataset == null) { - throw new IllegalArgumentException("Dataset not found: " + datasetId); - } - - String originalFilename = file.getOriginalFilename(); - String fileName = originalFilename != null ? originalFilename : "file"; - try { - // 保存文件到磁盘 - Path targetLocation = this.fileStorageLocation.resolve(datasetId + File.separator + fileName); - // 确保目标目录存在 - Files.createDirectories(targetLocation); - Files.copy(file.getInputStream(), targetLocation, StandardCopyOption.REPLACE_EXISTING); - - // 创建文件实体(UUID 主键) - DatasetFile datasetFile = new DatasetFile(); - datasetFile.setId(UUID.randomUUID().toString()); - datasetFile.setDatasetId(datasetId); - datasetFile.setFileName(fileName); - datasetFile.setFilePath(targetLocation.toString()); - datasetFile.setFileType(getFileExtension(originalFilename)); - datasetFile.setFileSize(file.getSize()); - datasetFile.setUploadTime(LocalDateTime.now()); - datasetFile.setStatus(StatusConstants.DatasetFileStatuses.COMPLETED); - - // 保存到数据库 - datasetFileRepository.save(datasetFile); - - // 更新数据集统计 - dataset.addFile(datasetFile); - datasetRepository.updateById(dataset); - - return datasetFileRepository.findByDatasetIdAndFileName(datasetId, fileName); - - } catch (IOException ex) { - log.error("Could not store file {}", fileName, ex); - throw new RuntimeException("Could not store file " + fileName, ex); - } } /** @@ -230,17 +175,6 @@ private void addToZipFile(DatasetFile file, ZipOutputStream zos) throws IOExcept } } - private String getFileExtension(String fileName) { - if (fileName == null || fileName.isEmpty()) { - return null; - } - int lastDotIndex = fileName.lastIndexOf("."); - if (lastDotIndex == -1) { - return null; - } - return fileName.substring(lastDotIndex + 1); - } - /** * 预上传 * @@ -276,9 +210,6 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) { FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest)); saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId); - if (uploadResult.isAllFilesUploaded()) { - // 解析文件,后续依据需求看是否添加校验文件元数据和解析半结构化文件的逻辑, - } } private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) { diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java index 7e556570..6ca64bd4 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java @@ -3,6 +3,7 @@ import com.datamate.datamanagement.common.enums.DatasetType; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -19,9 +20,11 @@ @AllArgsConstructor public class CreateDatasetRequest { /** 数据集名称 */ + @Size(min = 1, max = 100) @NotBlank(message = "数据集名称不能为空") private String name; /** 数据集描述 */ + @Size(max = 500) private String description; /** 数据集类型 */ @NotNull(message = "数据集类型不能为空") @@ -30,6 +33,4 @@ public class CreateDatasetRequest { private List tags; /** 数据源 */ private String dataSource; - /** 目标位置 */ - private String targetLocation; } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java index 3aea04f7..91070f39 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java @@ -1,6 +1,8 @@ package com.datamate.datamanagement.interfaces.dto; import com.datamate.datamanagement.common.enums.DatasetStatusType; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.Size; import lombok.Getter; import lombok.Setter; @@ -13,8 +15,11 @@ @Setter public class UpdateDatasetRequest { /** 数据集名称 */ + @Size(min = 1, max = 100) + @NotBlank(message = "数据集名称不能为空") private String name; /** 数据集描述 */ + @Size(max = 500) private String description; /** 归集任务id */ private String dataSource; diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java index 24fe4c3c..d7b2a614 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java @@ -68,22 +68,6 @@ public ResponseEntity> getDatasetFiles( return ResponseEntity.ok(Response.ok(response)); } - @PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE) - public ResponseEntity> uploadDatasetFile( - @PathVariable("datasetId") String datasetId, - @RequestPart(value = "file", required = false) MultipartFile file) { - try { - DatasetFile datasetFile = datasetFileApplicationService.uploadFile(datasetId, file); - - return ResponseEntity.status(HttpStatus.CREATED).body(Response.ok(DatasetConverter.INSTANCE.convertToResponse(datasetFile))); - } catch (IllegalArgumentException e) { - return ResponseEntity.badRequest().body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null)); - } catch (Exception e) { - log.error("upload fail", e); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null)); - } - } - @GetMapping("/{fileId}") public ResponseEntity> getDatasetFileById( @PathVariable("datasetId") String datasetId, @@ -110,9 +94,8 @@ public ResponseEntity> deleteDatasetFile( @IgnoreResponseWrap @GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public ResponseEntity downloadDatasetFileById( - @PathVariable("datasetId") String datasetId, - @PathVariable("fileId") String fileId) { + public ResponseEntity downloadDatasetFileById(@PathVariable("datasetId") String datasetId, + @PathVariable("fileId") String fileId) { try { DatasetFile datasetFile = datasetFileApplicationService.getDatasetFile(datasetId, fileId); Resource resource = datasetFileApplicationService.downloadFile(datasetId, fileId); @@ -142,8 +125,8 @@ public void downloadDatasetFileAsZip(@PathVariable("datasetId") String datasetId * @return 批量上传请求id */ @PostMapping("/upload/pre-upload") - public ResponseEntity> preUpload(@PathVariable("datasetId") String datasetId, @RequestBody @Valid UploadFilesPreRequest request) { - + public ResponseEntity> preUpload(@PathVariable("datasetId") String datasetId, + @RequestBody @Valid UploadFilesPreRequest request) { return ResponseEntity.ok(Response.ok(datasetFileApplicationService.preUpload(request, datasetId))); } @@ -153,7 +136,7 @@ public ResponseEntity> preUpload(@PathVariable("datasetId") Str * @param uploadFileRequest 上传文件请求 */ @PostMapping("/upload/chunk") - public ResponseEntity chunkUpload(@PathVariable("datasetId") String datasetId, UploadFileRequest uploadFileRequest) { + public ResponseEntity chunkUpload(@PathVariable("datasetId") String datasetId, @Valid UploadFileRequest uploadFileRequest) { log.info("file upload reqId:{}, fileNo:{}, total chunk num:{}, current chunkNo:{}", uploadFileRequest.getReqId(), uploadFileRequest.getFileNo(), uploadFileRequest.getTotalChunkNum(), uploadFileRequest.getChunkNo()); From 825bd4b6fa1836ca0b548da0ec1494ab1b1bad02 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Fri, 24 Oct 2025 15:19:46 +0800 Subject: [PATCH 5/9] =?UTF-8?q?fix:=20=E6=95=B0=E6=8D=AE=E9=9B=86=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E6=8D=AE=E9=BB=98=E8=AE=A4=E4=B8=BA0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interfaces/dto/AllDatasetStatisticsResponse.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java index 7da863fd..08f14b71 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java @@ -10,11 +10,11 @@ @Setter public class AllDatasetStatisticsResponse { /** 总数据集数 */ - private Integer totalDatasets; + private Integer totalDatasets = 0; /** 总文件数 */ - private Long totalSize; + private Long totalSize = 0L; /** 总大小(字节) */ - private Long totalFiles; + private Long totalFiles = 0L; } From 3b1a043a2694eabbb80a425e39a502bca8a853c9 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Fri, 24 Oct 2025 15:20:46 +0800 Subject: [PATCH 6/9] =?UTF-8?q?feature:=20=E6=95=B0=E6=8D=AE=E9=9B=86?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=A2=9E=E5=8A=A0=E6=B5=81=E8=BD=AC=EF=BC=8C?= =?UTF-8?q?=E5=88=9B=E5=BB=BA=E6=97=B6=E4=B8=BA=E8=8D=89=E7=A8=BF=E7=8A=B6?= =?UTF-8?q?=E6=80=81=EF=BC=8C=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E6=88=96?= =?UTF-8?q?=E8=80=85=E5=BD=92=E9=9B=86=E6=96=87=E4=BB=B6=E5=90=8E=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E6=B4=BB=E5=8A=A8=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DatasetApplicationService.java | 42 +++++++++---------- .../DatasetFileApplicationService.java | 1 + .../domain/model/dataset/Dataset.java | 6 +++ 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index b14db920..2246e028 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -220,30 +220,10 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() { public void processDataSourceAsync(String datasetId, String dataSourceId) { try { log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId); - - // 1. 调用数据归集服务获取任务详情 - CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); - if (taskDetail == null) { - log.error("获取归集任务详情失败,任务ID: {}", dataSourceId); - return; - } - - log.info("获取到归集任务详情: {}", taskDetail); - - // 2. 解析任务配置 - LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); - if (config == null) { - log.error("解析任务配置失败,任务ID: {}", dataSourceId); - return; - } - - // 4. 获取文件路径列表 - List filePaths = config.getFilePaths(); + List filePaths = getFilePaths(dataSourceId); if (CollectionUtils.isEmpty(filePaths)) { - log.warn("文件路径列表为空,任务ID: {}", dataSourceId); return; } - log.info("开始扫描文件,共 {} 个文件路径", filePaths.size()); // 5. 扫描文件元数据 @@ -261,9 +241,11 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) { dataset.removeFile(existDatasetFile); existDatasetFile.setFileSize(datasetFile.getFileSize()); dataset.addFile(existDatasetFile); + dataset.active(); datasetFileRepository.updateById(existDatasetFile); } else { dataset.addFile(datasetFile); + dataset.active(); datasetFileRepository.save(datasetFile); } } @@ -277,6 +259,24 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) { } } + private List getFilePaths(String dataSourceId) { + // 1. 调用数据归集服务获取任务详情 + CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); + if (taskDetail == null) { + log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId); + return Collections.emptyList(); + } + log.info("获取到归集任务详情: {}", taskDetail); + // 2. 解析任务配置 + LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); + if (config == null) { + log.warn("解析任务配置失败,任务ID: {}", dataSourceId); + return Collections.emptyList(); + } + // 4. 获取文件路径列表 + return config.getFilePaths(); + } + /** * 解析任务配置 */ diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index 399a07e4..d55bd208 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -233,6 +233,7 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque datasetFileRepository.save(datasetFile); dataset.addFile(datasetFile); + dataset.active(); datasetRepository.updateById(dataset); } } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java index a3d55df0..3f1522c1 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java @@ -143,4 +143,10 @@ public void removeFile(DatasetFile file) { this.updatedAt = LocalDateTime.now(); } } + + public void active() { + if (this.status == DatasetStatusType.DRAFT) { + this.status = DatasetStatusType.ACTIVE; + } + } } From dc3706d66c98709433e3ffc39121d0f1a88dcd76 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Fri, 24 Oct 2025 16:10:00 +0800 Subject: [PATCH 7/9] =?UTF-8?q?refactor:=20=E4=BF=AE=E6=94=B9=E5=88=86?= =?UTF-8?q?=E9=A1=B5=E6=9F=A5=E8=AF=A2=E5=BD=92=E9=9B=86=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collection/application/CollectionTaskService.java | 9 ++------- .../interfaces/rest/CollectionTaskController.java | 9 ++++++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java index 80bb8e75..fbee2816 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -7,12 +7,10 @@ import com.datamate.collection.domain.model.entity.TaskExecution; import com.datamate.collection.common.enums.TaskStatus; import com.datamate.collection.domain.repository.CollectionTaskRepository; -import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery; import com.datamate.collection.common.enums.SyncMode; import com.datamate.common.domain.utils.ChunksSaver; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -66,11 +64,8 @@ public CollectionTask get(String id) { return collectionTaskRepository.getById(id); } - public IPage getTasks(CollectionTaskPagingQuery query) { - LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus()) - .like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName()); - return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper); + public IPage getTasks(Page page, LambdaQueryWrapper wrapper) { + return collectionTaskRepository.page(page, wrapper); } public List selectActiveTasks() { diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java index b893a73c..3192df09 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java @@ -1,5 +1,7 @@ package com.datamate.collection.interfaces.rest; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.datamate.collection.application.CollectionTaskService; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.interfaces.converter.CollectionTaskConverter; @@ -8,6 +10,7 @@ import jakarta.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -53,6 +56,10 @@ public ResponseEntity getTaskDetail(@PathVariable("id") @GetMapping public ResponseEntity> getTasks(@Valid CollectionTaskPagingQuery query) { - return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query))); + Page page = new Page<>(query.getPage(), query.getSize()); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus()) + .like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName()); + return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(page, wrapper))); } } From 4398e56cd8a1f7d3c14662520a15ee44b85bbac7 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Sat, 25 Oct 2025 10:10:07 +0800 Subject: [PATCH 8/9] =?UTF-8?q?fix:=20=E6=9B=B4=E6=96=B0=E5=90=8E=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E6=89=A7=E8=A1=8C=EF=BC=9B=E5=BD=92=E9=9B=86=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=89=A7=E8=A1=8C=E5=A2=9E=E5=8A=A0=E4=BA=8B=E5=8A=A1?= =?UTF-8?q?=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datamate/collection/application/CollectionTaskService.java | 1 + .../datamate/collection/application/TaskExecutionService.java | 1 + 2 files changed, 2 insertions(+) diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java index fbee2816..ba73a6e6 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -48,6 +48,7 @@ private void executeTaskNow(CollectionTask task) { public CollectionTask update(CollectionTask task) { task.setUpdatedAt(LocalDateTime.now()); collectionTaskRepository.updateById(task); + executeTaskNow(task); return task; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java index 58455154..f0d78627 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -39,6 +39,7 @@ public TaskExecution selectLatestByTaskId(String taskId) { } @Async + @Transactional public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { try { int code = processRunner.runJob(task, executionId, timeoutSeconds); From bb22e897c61f0cad112c03a337b2aab03c952201 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Sat, 25 Oct 2025 15:00:24 +0800 Subject: [PATCH 9/9] =?UTF-8?q?feature:=20=E5=88=9B=E5=BB=BA=E5=BD=92?= =?UTF-8?q?=E9=9B=86=E4=BB=BB=E5=8A=A1=E6=97=B6=E8=83=BD=E5=A4=9F=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=88=9B=E5=BB=BA=E6=95=B0=E6=8D=AE=E9=9B=86=EF=BC=8C?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=BD=92=E9=9B=86=E4=BB=BB=E5=8A=A1=E6=97=B6?= =?UTF-8?q?=E8=83=BD=E6=9B=B4=E6=96=B0=E5=88=B0=E6=8C=87=E5=AE=9A=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=9B=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../services/data-collection-service/pom.xml | 7 ++- .../DataCollectionServiceConfiguration.java | 1 + .../application/CollectionTaskService.java | 20 +++--- .../application/TaskExecutionService.java | 5 +- .../domain/model/entity/CollectionTask.java | 10 +++ .../dto/CollectionTaskResponse.java | 3 + .../dto/CreateCollectionTaskRequest.java | 5 ++ .../dto/UpdateCollectionTaskRequest.java | 3 + .../rest/CollectionTaskController.java | 19 ++++-- .../scheduler/TaskSchedulerInitializer.java | 2 +- .../DatasetApplicationService.java | 62 ++++++++++++------- .../DatasetFileApplicationService.java | 26 +++++++- .../rest/DatasetFileController.java | 2 +- 13 files changed, 121 insertions(+), 44 deletions(-) diff --git a/backend/services/data-collection-service/pom.xml b/backend/services/data-collection-service/pom.xml index 6600bf11..9261ad74 100644 --- a/backend/services/data-collection-service/pom.xml +++ b/backend/services/data-collection-service/pom.xml @@ -87,7 +87,12 @@ com.datamate domain-common - 1.0.0-SNAPSHOT + ${project.version} + + + com.datamate + data-management-service + ${project.version} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java index f22c5b98..8230ab2a 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java @@ -17,6 +17,7 @@ @EnableTransactionManagement @ComponentScan(basePackages = { "com.datamate.collection", + "com.datamate.datamanagement", "com.datamate.shared" }) public class DataCollectionServiceConfiguration { diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java index ba73a6e6..6ee20b8b 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -9,6 +9,8 @@ import com.datamate.collection.domain.repository.CollectionTaskRepository; import com.datamate.collection.common.enums.SyncMode; import com.datamate.common.domain.utils.ChunksSaver; +import com.datamate.datamanagement.application.DatasetApplicationService; +import com.datamate.datamanagement.domain.model.dataset.Dataset; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -23,32 +25,32 @@ @RequiredArgsConstructor public class CollectionTaskService { private final TaskExecutionService taskExecutionService; + private final DatasetApplicationService datasetApplicationService; private final CollectionTaskRepository collectionTaskRepository; @Transactional - public CollectionTask create(CollectionTask task) { - task.setStatus(TaskStatus.READY); - task.setCreatedAt(LocalDateTime.now()); - task.setUpdatedAt(LocalDateTime.now()); + public CollectionTask create(CollectionTask task, String datasetId) { + task.initCreateParam(); collectionTaskRepository.save(task); - executeTaskNow(task); + executeTaskNow(task, datasetId); return task; } - private void executeTaskNow(CollectionTask task) { + private void executeTaskNow(CollectionTask task, String datasetId) { if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) { TaskExecution exec = taskExecutionService.createExecution(task); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - taskExecutionService.runAsync(task, exec.getId(), timeout); + taskExecutionService.runAsync(task, exec.getId(), timeout, datasetId); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId()); } } @Transactional - public CollectionTask update(CollectionTask task) { + public CollectionTask update(CollectionTask task, String datasetId) { task.setUpdatedAt(LocalDateTime.now()); + task.addPath(); collectionTaskRepository.updateById(task); - executeTaskNow(task); + executeTaskNow(task, datasetId); return task; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java index f0d78627..2d4c874f 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -6,6 +6,7 @@ import com.datamate.collection.domain.process.ProcessRunner; import com.datamate.collection.domain.repository.CollectionTaskRepository; import com.datamate.collection.domain.repository.TaskExecutionRepository; +import com.datamate.datamanagement.application.DatasetApplicationService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; @@ -21,6 +22,7 @@ public class TaskExecutionService { private final ProcessRunner processRunner; private final TaskExecutionRepository executionRepository; private final CollectionTaskRepository collectionTaskRepository; + private final DatasetApplicationService datasetApplicationService; @Transactional @@ -40,7 +42,7 @@ public TaskExecution selectLatestByTaskId(String taskId) { @Async @Transactional - public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { + public void runAsync(CollectionTask task, String executionId, int timeoutSeconds, String datasetId) { try { int code = processRunner.runJob(task, executionId, timeoutSeconds); log.info("DataX finished with code {} for execution {}", code, executionId); @@ -48,6 +50,7 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), 0, 0L, 0L, 0L, null); collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); + datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); } catch (Exception e) { log.error("DataX execution failed", e); executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java index 5bbdb2a2..773731b0 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java @@ -10,8 +10,10 @@ import lombok.Getter; import lombok.Setter; +import java.time.LocalDateTime; import java.util.Collections; import java.util.Map; +import java.util.UUID; /** * 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐) @@ -46,4 +48,12 @@ public void addPath() { throw new RuntimeException(e); } } + + public void initCreateParam() { + this.id = UUID.randomUUID().toString(); + this.addPath(); + this.status = TaskStatus.READY; + this.createdAt = LocalDateTime.now(); + this.updatedAt = LocalDateTime.now(); + } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java index 08f61034..2b0801ec 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java @@ -6,6 +6,7 @@ import com.datamate.collection.common.enums.TaskStatus; import com.datamate.collection.common.enums.SyncMode; +import com.datamate.datamanagement.interfaces.dto.DatasetResponse; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -44,5 +45,7 @@ public class CollectionTaskResponse { @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) private LocalDateTime updatedAt; + + private DatasetResponse dataset; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java index f2f029f0..74a2f2cb 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java @@ -1,6 +1,7 @@ package com.datamate.collection.interfaces.dto; import com.datamate.collection.common.enums.SyncMode; +import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.HashMap; @@ -49,5 +50,9 @@ public class CreateCollectionTaskRequest { @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) @JsonProperty("scheduleExpression") private String scheduleExpression; + + /** 创建数据集参数 */ + @Valid + private CreateDatasetRequest dataset; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java index 9321be80..4a4db4ce 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java @@ -46,5 +46,8 @@ public class UpdateCollectionTaskRequest { @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) @JsonProperty("scheduleExpression") private String scheduleExpression; + + /** 数据集id */ + private String datasetId; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java index 3192df09..38deb639 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java @@ -7,11 +7,15 @@ import com.datamate.collection.interfaces.converter.CollectionTaskConverter; import com.datamate.collection.interfaces.dto.*; import com.datamate.common.interfaces.PagedResponse; +import com.datamate.datamanagement.application.DatasetApplicationService; +import com.datamate.datamanagement.domain.model.dataset.Dataset; +import com.datamate.datamanagement.interfaces.converter.DatasetConverter; import jakarta.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.http.ResponseEntity; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import java.util.*; @@ -24,12 +28,19 @@ public class CollectionTaskController{ private final CollectionTaskService taskService; + private final DatasetApplicationService datasetService; + @PostMapping + @Transactional public ResponseEntity createTask(@Valid @RequestBody CreateCollectionTaskRequest request) { CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); - task.setId(UUID.randomUUID().toString()); - task.addPath(); - return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task))); + String datasetId = null; + if (Objects.nonNull(request.getDataset())) { + datasetId = datasetService.createDataset(request.getDataset()).getId(); + } + CollectionTaskResponse response = CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task, datasetId)); + response.setDataset(DatasetConverter.INSTANCE.convertToResponse(datasetService.getDataset(datasetId))); + return ResponseEntity.ok().body(response); } @PutMapping("/{id}") @@ -39,7 +50,7 @@ public ResponseEntity updateTask(@PathVariable("id") Str } CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); task.setId(id); - return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task))); + return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task, request.getDatasetId()))); } @DeleteMapping("/{id}") diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java index d0d0f260..6fd8bb76 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java @@ -53,7 +53,7 @@ public void scanAndTrigger() { // 到期,触发一次执行 TaskExecution exec = taskExecutionService.createExecution(task); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - taskExecutionService.runAsync(task, exec.getId(), timeout); + taskExecutionService.runAsync(task, exec.getId(), timeout, null); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId()); } } catch (Exception ex) { diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index 2246e028..e39443e6 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -28,6 +28,8 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -226,54 +228,66 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) { } log.info("开始扫描文件,共 {} 个文件路径", filePaths.size()); - // 5. 扫描文件元数据 List datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId); // 查询数据集中已存在的文件 List existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId); Map existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity())); Dataset dataset = datasetRepository.getById(datasetId); + dataset.setFiles(existDatasetFileList); - // 6. 批量插入数据集文件表 - if (CollectionUtils.isNotEmpty(datasetFiles)) { - for (DatasetFile datasetFile : datasetFiles) { - if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) { - DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath()); - dataset.removeFile(existDatasetFile); - existDatasetFile.setFileSize(datasetFile.getFileSize()); - dataset.addFile(existDatasetFile); - dataset.active(); - datasetFileRepository.updateById(existDatasetFile); - } else { - dataset.addFile(datasetFile); - dataset.active(); - datasetFileRepository.save(datasetFile); - } - } - log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size()); - } else { - log.warn("未扫描到有效文件"); - } + // 批量同步数据集文件表 + asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths); datasetRepository.updateById(dataset); } catch (Exception e) { log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e); } } + private void asyncDatasetFile(List datasetFiles, Map existDatasetFilePathMap, Dataset dataset, List existDatasetFileList, List filePaths) { + if (CollectionUtils.isNotEmpty(datasetFiles)) { + for (DatasetFile datasetFile : datasetFiles) { + if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) { + DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath()); + dataset.removeFile(existDatasetFile); + existDatasetFile.setFileSize(datasetFile.getFileSize()); + dataset.addFile(existDatasetFile); + dataset.active(); + datasetFileRepository.updateById(existDatasetFile); + } else { + dataset.addFile(datasetFile); + dataset.active(); + datasetFileRepository.save(datasetFile); + } + } + log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size()); + } else { + log.warn("未扫描到有效文件"); + } + for (DatasetFile datasetFile : existDatasetFileList) { + String existFilePath = datasetFile.getFilePath(); + for (String filePath : filePaths) { + if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) { + if (Files.notExists(Paths.get(existFilePath))) { + dataset.removeFile(datasetFile); + datasetFileRepository.removeById(datasetFile.getId()); + } + } + } + } + } + private List getFilePaths(String dataSourceId) { - // 1. 调用数据归集服务获取任务详情 CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); if (taskDetail == null) { log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId); return Collections.emptyList(); } log.info("获取到归集任务详情: {}", taskDetail); - // 2. 解析任务配置 LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); if (config == null) { log.warn("解析任务配置失败,任务ID: {}", dataSourceId); return Collections.emptyList(); } - // 4. 获取文件路径列表 return config.getFilePaths(); } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index d55bd208..0fe61942 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -41,9 +41,7 @@ import java.nio.file.Paths; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Objects; -import java.util.UUID; +import java.util.*; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -143,6 +141,7 @@ public Resource downloadFile(String datasetId, String fileId) { @Transactional(readOnly = true) public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) { List allByDatasetId = datasetFileRepository.findAllByDatasetId(datasetId); + fileRename(allByDatasetId); response.setContentType("application/zip"); String zipName = String.format("dataset_%s.zip", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); @@ -157,6 +156,27 @@ public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse respo } } + private void fileRename(List files) { + Set uniqueFilenames = new HashSet<>(); + for (DatasetFile file : files) { + String originalFilename = file.getFileName(); + if (!uniqueFilenames.add(originalFilename)) { + String newFilename; + int counter = 1; + do { + newFilename = generateNewFilename(originalFilename, counter); + counter++; + } while (!uniqueFilenames.add(newFilename)); + file.setFileName(newFilename); + } + } + } + + private String generateNewFilename(String oldFilename, int counter) { + int dotIndex = oldFilename.lastIndexOf("."); + return oldFilename.substring(0, dotIndex) + "-(" + counter + ")" + oldFilename.substring(dotIndex); + } + private void addToZipFile(DatasetFile file, ZipOutputStream zos) throws IOException { if (file.getFilePath() == null || !Files.exists(Paths.get(file.getFilePath()))) { log.warn("The file hasn't been found on filesystem, id: {}", file.getId()); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java index d7b2a614..e5cb6998 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java @@ -93,7 +93,7 @@ public ResponseEntity> deleteDatasetFile( } @IgnoreResponseWrap - @GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) + @GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8") public ResponseEntity downloadDatasetFileById(@PathVariable("datasetId") String datasetId, @PathVariable("fileId") String fileId) { try {