diff --git a/backend/openapi/specs/data-management.yaml b/backend/openapi/specs/data-management.yaml index d77083b4..1aac250b 100644 --- a/backend/openapi/specs/data-management.yaml +++ b/backend/openapi/specs/data-management.yaml @@ -39,7 +39,7 @@ paths: schema: type: integer default: 0 - description: 页码,从0开始 + description: 页码,从1开始 - name: size in: query schema: @@ -65,7 +65,7 @@ paths: in: query schema: type: string - enum: [ACTIVE, INACTIVE, PROCESSING] + enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED] description: 数据集状态过滤 responses: '200': @@ -231,40 +231,6 @@ paths: schema: $ref: '#/components/schemas/PagedDatasetFileResponse' - post: - tags: [DatasetFile] - summary: 上传文件到数据集 - operationId: uploadDatasetFile - description: 向指定数据集上传文件 - parameters: - - name: datasetId - in: path - required: true - schema: - type: string - description: 数据集ID - requestBody: - required: true - content: - multipart/form-data: - schema: - type: object - properties: - file: - type: string - format: binary - description: 要上传的文件 - description: - type: string - description: 文件描述 - responses: - '201': - description: 上传成功 - content: - application/json: - schema: - $ref: '#/components/schemas/DatasetFileResponse' - /data-management/datasets/{datasetId}/files/{fileId}: get: tags: [DatasetFile] @@ -342,6 +308,78 @@ paths: type: string format: binary + /data-management/datasets/{datasetId}/files/download: + get: + tags: [ DatasetFile ] + operationId: downloadDatasetFileAsZip + summary: 下载文件 + description: 下载数据集中全部文件 + parameters: + - name: datasetId + in: path + required: true + schema: + type: string + description: 数据集ID + responses: + '200': + description: 文件内容 + content: + application/octet-stream: + schema: + type: string + format: binary + + /data-management/datasets/{datasetId}/files/upload/pre-upload: + post: + tags: [ DatasetFile ] + operationId: preUpload + summary: 切片上传预上传 + description: 预上传接口,返回后续分片上传所需的请求ID + parameters: + - name: datasetId + in: path + required: true + schema: + type: string + description: 数据集ID + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/UploadFilesPreRequest' + responses: + '200': + description: 预上传成功,返回请求ID + content: + application/json: + schema: + type: string + + /data-management/datasets/{datasetId}/files/upload/chunk: + post: + tags: [ DatasetFile ] + operationId: chunkUpload + summary: 切片上传 + description: 使用预上传返回的请求ID进行分片上传 + parameters: + - name: datasetId + in: path + required: true + schema: + type: string + description: 数据集ID + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/UploadFileRequest' + responses: + '200': + description: 上传成功 + /data-management/dataset-types: get: operationId: getDatasetTypes @@ -548,9 +586,59 @@ components: description: 标签列表 status: type: string - enum: [ACTIVE, INACTIVE] + enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED] description: 数据集状态 + UploadFilesPreRequest: + type: object + description: 切片上传预上传请求 + properties: + hasArchive: + type: boolean + description: 是否为压缩包上传 + default: false + totalFileNum: + type: integer + format: int32 + minimum: 1 + description: 总文件数量 + totalSize: + type: integer + format: int64 + description: 总文件大小(字节) + required: [ totalFileNum ] + + UploadFileRequest: + type: object + description: 分片上传请求 + properties: + reqId: + type: string + description: 预上传返回的请求ID + fileNo: + type: integer + format: int32 + description: 文件编号(批量中的第几个) + fileName: + type: string + description: 文件名称 + totalChunkNum: + type: integer + format: int32 + description: 文件总分片数量 + chunkNo: + type: integer + format: int32 + description: 当前分片编号(从1开始) + file: + type: string + format: binary + description: 分片二进制内容 + checkSumHex: + type: string + description: 分片校验和(十六进制) + required: [ reqId, fileNo, fileName, totalChunkNum, chunkNo, file ] + DatasetTypeResponse: type: object properties: diff --git a/backend/services/data-collection-service/pom.xml b/backend/services/data-collection-service/pom.xml index 6600bf11..9261ad74 100644 --- a/backend/services/data-collection-service/pom.xml +++ b/backend/services/data-collection-service/pom.xml @@ -87,7 +87,12 @@ com.datamate domain-common - 1.0.0-SNAPSHOT + ${project.version} + + + com.datamate + data-management-service + ${project.version} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java index f22c5b98..8230ab2a 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java @@ -17,6 +17,7 @@ @EnableTransactionManagement @ComponentScan(basePackages = { "com.datamate.collection", + "com.datamate.datamanagement", "com.datamate.shared" }) public class DataCollectionServiceConfiguration { diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java index 80bb8e75..6ee20b8b 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -7,12 +7,12 @@ import com.datamate.collection.domain.model.entity.TaskExecution; import com.datamate.collection.common.enums.TaskStatus; import com.datamate.collection.domain.repository.CollectionTaskRepository; -import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery; import com.datamate.collection.common.enums.SyncMode; import com.datamate.common.domain.utils.ChunksSaver; +import com.datamate.datamanagement.application.DatasetApplicationService; +import com.datamate.datamanagement.domain.model.dataset.Dataset; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -25,31 +25,32 @@ @RequiredArgsConstructor public class CollectionTaskService { private final TaskExecutionService taskExecutionService; + private final DatasetApplicationService datasetApplicationService; private final CollectionTaskRepository collectionTaskRepository; @Transactional - public CollectionTask create(CollectionTask task) { - task.setStatus(TaskStatus.READY); - task.setCreatedAt(LocalDateTime.now()); - task.setUpdatedAt(LocalDateTime.now()); + public CollectionTask create(CollectionTask task, String datasetId) { + task.initCreateParam(); collectionTaskRepository.save(task); - executeTaskNow(task); + executeTaskNow(task, datasetId); return task; } - private void executeTaskNow(CollectionTask task) { + private void executeTaskNow(CollectionTask task, String datasetId) { if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) { TaskExecution exec = taskExecutionService.createExecution(task); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - taskExecutionService.runAsync(task, exec.getId(), timeout); + taskExecutionService.runAsync(task, exec.getId(), timeout, datasetId); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId()); } } @Transactional - public CollectionTask update(CollectionTask task) { + public CollectionTask update(CollectionTask task, String datasetId) { task.setUpdatedAt(LocalDateTime.now()); + task.addPath(); collectionTaskRepository.updateById(task); + executeTaskNow(task, datasetId); return task; } @@ -66,11 +67,8 @@ public CollectionTask get(String id) { return collectionTaskRepository.getById(id); } - public IPage getTasks(CollectionTaskPagingQuery query) { - LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus()) - .like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName()); - return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper); + public IPage getTasks(Page page, LambdaQueryWrapper wrapper) { + return collectionTaskRepository.page(page, wrapper); } public List selectActiveTasks() { diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java index 58455154..2d4c874f 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -6,6 +6,7 @@ import com.datamate.collection.domain.process.ProcessRunner; import com.datamate.collection.domain.repository.CollectionTaskRepository; import com.datamate.collection.domain.repository.TaskExecutionRepository; +import com.datamate.datamanagement.application.DatasetApplicationService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; @@ -21,6 +22,7 @@ public class TaskExecutionService { private final ProcessRunner processRunner; private final TaskExecutionRepository executionRepository; private final CollectionTaskRepository collectionTaskRepository; + private final DatasetApplicationService datasetApplicationService; @Transactional @@ -39,7 +41,8 @@ public TaskExecution selectLatestByTaskId(String taskId) { } @Async - public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) { + @Transactional + public void runAsync(CollectionTask task, String executionId, int timeoutSeconds, String datasetId) { try { int code = processRunner.runJob(task, executionId, timeoutSeconds); log.info("DataX finished with code {} for execution {}", code, executionId); @@ -47,6 +50,7 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), 0, 0L, 0L, 0L, null); collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); + datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); } catch (Exception e) { log.error("DataX execution failed", e); executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java index 5bbdb2a2..773731b0 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java @@ -10,8 +10,10 @@ import lombok.Getter; import lombok.Setter; +import java.time.LocalDateTime; import java.util.Collections; import java.util.Map; +import java.util.UUID; /** * 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐) @@ -46,4 +48,12 @@ public void addPath() { throw new RuntimeException(e); } } + + public void initCreateParam() { + this.id = UUID.randomUUID().toString(); + this.addPath(); + this.status = TaskStatus.READY; + this.createdAt = LocalDateTime.now(); + this.updatedAt = LocalDateTime.now(); + } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java index 08f61034..2b0801ec 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java @@ -6,6 +6,7 @@ import com.datamate.collection.common.enums.TaskStatus; import com.datamate.collection.common.enums.SyncMode; +import com.datamate.datamanagement.interfaces.dto.DatasetResponse; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -44,5 +45,7 @@ public class CollectionTaskResponse { @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) private LocalDateTime updatedAt; + + private DatasetResponse dataset; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java index f2f029f0..74a2f2cb 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java @@ -1,6 +1,7 @@ package com.datamate.collection.interfaces.dto; import com.datamate.collection.common.enums.SyncMode; +import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.HashMap; @@ -49,5 +50,9 @@ public class CreateCollectionTaskRequest { @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) @JsonProperty("scheduleExpression") private String scheduleExpression; + + /** 创建数据集参数 */ + @Valid + private CreateDatasetRequest dataset; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java index 9321be80..4a4db4ce 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java @@ -46,5 +46,8 @@ public class UpdateCollectionTaskRequest { @Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED) @JsonProperty("scheduleExpression") private String scheduleExpression; + + /** 数据集id */ + private String datasetId; } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java index b893a73c..38deb639 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/rest/CollectionTaskController.java @@ -1,14 +1,21 @@ package com.datamate.collection.interfaces.rest; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.datamate.collection.application.CollectionTaskService; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.interfaces.converter.CollectionTaskConverter; import com.datamate.collection.interfaces.dto.*; import com.datamate.common.interfaces.PagedResponse; +import com.datamate.datamanagement.application.DatasetApplicationService; +import com.datamate.datamanagement.domain.model.dataset.Dataset; +import com.datamate.datamanagement.interfaces.converter.DatasetConverter; import jakarta.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.http.ResponseEntity; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import java.util.*; @@ -21,12 +28,19 @@ public class CollectionTaskController{ private final CollectionTaskService taskService; + private final DatasetApplicationService datasetService; + @PostMapping + @Transactional public ResponseEntity createTask(@Valid @RequestBody CreateCollectionTaskRequest request) { CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); - task.setId(UUID.randomUUID().toString()); - task.addPath(); - return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task))); + String datasetId = null; + if (Objects.nonNull(request.getDataset())) { + datasetId = datasetService.createDataset(request.getDataset()).getId(); + } + CollectionTaskResponse response = CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task, datasetId)); + response.setDataset(DatasetConverter.INSTANCE.convertToResponse(datasetService.getDataset(datasetId))); + return ResponseEntity.ok().body(response); } @PutMapping("/{id}") @@ -36,7 +50,7 @@ public ResponseEntity updateTask(@PathVariable("id") Str } CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); task.setId(id); - return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task))); + return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task, request.getDatasetId()))); } @DeleteMapping("/{id}") @@ -53,6 +67,10 @@ public ResponseEntity getTaskDetail(@PathVariable("id") @GetMapping public ResponseEntity> getTasks(@Valid CollectionTaskPagingQuery query) { - return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query))); + Page page = new Page<>(query.getPage(), query.getSize()); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus()) + .like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName()); + return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(page, wrapper))); } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java index d0d0f260..6fd8bb76 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/scheduler/TaskSchedulerInitializer.java @@ -53,7 +53,7 @@ public void scanAndTrigger() { // 到期,触发一次执行 TaskExecution exec = taskExecutionService.createExecution(task); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); - taskExecutionService.runAsync(task, exec.getId(), timeout); + taskExecutionService.runAsync(task, exec.getId(), timeout, null); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId()); } } catch (Exception ex) { diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index b14db920..e39443e6 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -28,6 +28,8 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -220,63 +222,75 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() { public void processDataSourceAsync(String datasetId, String dataSourceId) { try { log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId); - - // 1. 调用数据归集服务获取任务详情 - CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); - if (taskDetail == null) { - log.error("获取归集任务详情失败,任务ID: {}", dataSourceId); - return; - } - - log.info("获取到归集任务详情: {}", taskDetail); - - // 2. 解析任务配置 - LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); - if (config == null) { - log.error("解析任务配置失败,任务ID: {}", dataSourceId); - return; - } - - // 4. 获取文件路径列表 - List filePaths = config.getFilePaths(); + List filePaths = getFilePaths(dataSourceId); if (CollectionUtils.isEmpty(filePaths)) { - log.warn("文件路径列表为空,任务ID: {}", dataSourceId); return; } - log.info("开始扫描文件,共 {} 个文件路径", filePaths.size()); - // 5. 扫描文件元数据 List datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId); // 查询数据集中已存在的文件 List existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId); Map existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity())); Dataset dataset = datasetRepository.getById(datasetId); + dataset.setFiles(existDatasetFileList); - // 6. 批量插入数据集文件表 - if (CollectionUtils.isNotEmpty(datasetFiles)) { - for (DatasetFile datasetFile : datasetFiles) { - if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) { - DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath()); - dataset.removeFile(existDatasetFile); - existDatasetFile.setFileSize(datasetFile.getFileSize()); - dataset.addFile(existDatasetFile); - datasetFileRepository.updateById(existDatasetFile); - } else { - dataset.addFile(datasetFile); - datasetFileRepository.save(datasetFile); - } - } - log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size()); - } else { - log.warn("未扫描到有效文件"); - } + // 批量同步数据集文件表 + asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths); datasetRepository.updateById(dataset); } catch (Exception e) { log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e); } } + private void asyncDatasetFile(List datasetFiles, Map existDatasetFilePathMap, Dataset dataset, List existDatasetFileList, List filePaths) { + if (CollectionUtils.isNotEmpty(datasetFiles)) { + for (DatasetFile datasetFile : datasetFiles) { + if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) { + DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath()); + dataset.removeFile(existDatasetFile); + existDatasetFile.setFileSize(datasetFile.getFileSize()); + dataset.addFile(existDatasetFile); + dataset.active(); + datasetFileRepository.updateById(existDatasetFile); + } else { + dataset.addFile(datasetFile); + dataset.active(); + datasetFileRepository.save(datasetFile); + } + } + log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size()); + } else { + log.warn("未扫描到有效文件"); + } + for (DatasetFile datasetFile : existDatasetFileList) { + String existFilePath = datasetFile.getFilePath(); + for (String filePath : filePaths) { + if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) { + if (Files.notExists(Paths.get(existFilePath))) { + dataset.removeFile(datasetFile); + datasetFileRepository.removeById(datasetFile.getId()); + } + } + } + } + } + + private List getFilePaths(String dataSourceId) { + CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); + if (taskDetail == null) { + log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId); + return Collections.emptyList(); + } + log.info("获取到归集任务详情: {}", taskDetail); + LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); + if (config == null) { + log.warn("解析任务配置失败,任务ID: {}", dataSourceId); + return Collections.emptyList(); + } + return config.getFilePaths(); + } + /** * 解析任务配置 */ diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index eadecc0c..0fe61942 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -10,7 +10,6 @@ import com.datamate.datamanagement.domain.model.dataset.Dataset; import com.datamate.datamanagement.domain.model.dataset.DatasetFile; import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo; -import com.datamate.datamanagement.domain.model.dataset.StatusConstants; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; import com.datamate.datamanagement.interfaces.converter.DatasetConverter; @@ -31,7 +30,6 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.multipart.MultipartFile; import java.io.BufferedInputStream; import java.io.File; @@ -41,12 +39,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Objects; -import java.util.UUID; +import java.util.*; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -60,7 +55,6 @@ public class DatasetFileApplicationService { private final DatasetFileRepository datasetFileRepository; private final DatasetRepository datasetRepository; - private final Path fileStorageLocation; private final FileService fileService; @Value("${dataset.base.path:/dataset}") @@ -68,61 +62,10 @@ public class DatasetFileApplicationService { @Autowired public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository, - DatasetRepository datasetRepository, FileService fileService, - @Value("${app.file.upload-dir:./dataset}") String uploadDir) { + DatasetRepository datasetRepository, FileService fileService) { this.datasetFileRepository = datasetFileRepository; this.datasetRepository = datasetRepository; - this.fileStorageLocation = Paths.get(uploadDir).toAbsolutePath().normalize(); this.fileService = fileService; - try { - Files.createDirectories(this.fileStorageLocation); - } catch (Exception ex) { - throw new RuntimeException("Could not create the directory where the uploaded files will be stored.", ex); - } - } - - /** - * 上传文件到数据集 - */ - public DatasetFile uploadFile(String datasetId, MultipartFile file) { - Dataset dataset = datasetRepository.getById(datasetId); - if (dataset == null) { - throw new IllegalArgumentException("Dataset not found: " + datasetId); - } - - String originalFilename = file.getOriginalFilename(); - String fileName = originalFilename != null ? originalFilename : "file"; - try { - // 保存文件到磁盘 - Path targetLocation = this.fileStorageLocation.resolve(datasetId + File.separator + fileName); - // 确保目标目录存在 - Files.createDirectories(targetLocation); - Files.copy(file.getInputStream(), targetLocation, StandardCopyOption.REPLACE_EXISTING); - - // 创建文件实体(UUID 主键) - DatasetFile datasetFile = new DatasetFile(); - datasetFile.setId(UUID.randomUUID().toString()); - datasetFile.setDatasetId(datasetId); - datasetFile.setFileName(fileName); - datasetFile.setFilePath(targetLocation.toString()); - datasetFile.setFileType(getFileExtension(originalFilename)); - datasetFile.setFileSize(file.getSize()); - datasetFile.setUploadTime(LocalDateTime.now()); - datasetFile.setStatus(StatusConstants.DatasetFileStatuses.COMPLETED); - - // 保存到数据库 - datasetFileRepository.save(datasetFile); - - // 更新数据集统计 - dataset.addFile(datasetFile); - datasetRepository.updateById(dataset); - - return datasetFileRepository.findByDatasetIdAndFileName(datasetId, fileName); - - } catch (IOException ex) { - log.error("Could not store file {}", fileName, ex); - throw new RuntimeException("Could not store file " + fileName, ex); - } } /** @@ -155,20 +98,21 @@ public DatasetFile getDatasetFile(String datasetId, String fileId) { /** * 删除文件 */ + @Transactional public void deleteDatasetFile(String datasetId, String fileId) { DatasetFile file = getDatasetFile(datasetId, fileId); - try { - Path filePath = Paths.get(file.getFilePath()); - Files.deleteIfExists(filePath); - } catch (IOException ex) { - // ignore + Dataset dataset = datasetRepository.getById(datasetId); + // 删除文件时,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录 + if (file.getFilePath().startsWith(dataset.getPath())) { + try { + Path filePath = Paths.get(file.getFilePath()); + Files.deleteIfExists(filePath); + } catch (IOException ex) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } } datasetFileRepository.removeById(fileId); - - Dataset dataset = datasetRepository.getById(datasetId); - // 简单刷新统计(精确处理可从DB统计) - dataset.setFileCount(Math.max(0, dataset.getFileCount() - 1)); - dataset.setSizeBytes(Math.max(0, dataset.getSizeBytes() - (file.getFileSize() != null ? file.getFileSize() : 0))); + dataset.removeFile(file); datasetRepository.updateById(dataset); } @@ -197,6 +141,7 @@ public Resource downloadFile(String datasetId, String fileId) { @Transactional(readOnly = true) public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) { List allByDatasetId = datasetFileRepository.findAllByDatasetId(datasetId); + fileRename(allByDatasetId); response.setContentType("application/zip"); String zipName = String.format("dataset_%s.zip", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); @@ -211,6 +156,27 @@ public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse respo } } + private void fileRename(List files) { + Set uniqueFilenames = new HashSet<>(); + for (DatasetFile file : files) { + String originalFilename = file.getFileName(); + if (!uniqueFilenames.add(originalFilename)) { + String newFilename; + int counter = 1; + do { + newFilename = generateNewFilename(originalFilename, counter); + counter++; + } while (!uniqueFilenames.add(newFilename)); + file.setFileName(newFilename); + } + } + } + + private String generateNewFilename(String oldFilename, int counter) { + int dotIndex = oldFilename.lastIndexOf("."); + return oldFilename.substring(0, dotIndex) + "-(" + counter + ")" + oldFilename.substring(dotIndex); + } + private void addToZipFile(DatasetFile file, ZipOutputStream zos) throws IOException { if (file.getFilePath() == null || !Files.exists(Paths.get(file.getFilePath()))) { log.warn("The file hasn't been found on filesystem, id: {}", file.getId()); @@ -229,17 +195,6 @@ private void addToZipFile(DatasetFile file, ZipOutputStream zos) throws IOExcept } } - private String getFileExtension(String fileName) { - if (fileName == null || fileName.isEmpty()) { - return null; - } - int lastDotIndex = fileName.lastIndexOf("."); - if (lastDotIndex == -1) { - return null; - } - return fileName.substring(lastDotIndex + 1); - } - /** * 预上传 * @@ -275,9 +230,6 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) { FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest)); saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId); - if (uploadResult.isAllFilesUploaded()) { - // 解析文件,后续依据需求看是否添加校验文件元数据和解析半结构化文件的逻辑, - } } private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) { @@ -301,6 +253,7 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque datasetFileRepository.save(datasetFile); dataset.addFile(datasetFile); + dataset.active(); datasetRepository.updateById(dataset); } } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java index a3d55df0..3f1522c1 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java @@ -143,4 +143,10 @@ public void removeFile(DatasetFile file) { this.updatedAt = LocalDateTime.now(); } } + + public void active() { + if (this.status == DatasetStatusType.DRAFT) { + this.status = DatasetStatusType.ACTIVE; + } + } } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java index 0247ffef..251033e5 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/converter/DatasetConverter.java @@ -7,7 +7,6 @@ import com.datamate.common.domain.model.ChunkUploadRequest; import com.datamate.datamanagement.domain.model.dataset.Dataset; import com.datamate.datamanagement.domain.model.dataset.DatasetFile; -import com.datamate.datamanagement.interfaces.dto.*; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.factory.Mappers; diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java index 7da863fd..08f14b71 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/AllDatasetStatisticsResponse.java @@ -10,11 +10,11 @@ @Setter public class AllDatasetStatisticsResponse { /** 总数据集数 */ - private Integer totalDatasets; + private Integer totalDatasets = 0; /** 总文件数 */ - private Long totalSize; + private Long totalSize = 0L; /** 总大小(字节) */ - private Long totalFiles; + private Long totalFiles = 0L; } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java index 7e556570..6ca64bd4 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java @@ -3,6 +3,7 @@ import com.datamate.datamanagement.common.enums.DatasetType; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -19,9 +20,11 @@ @AllArgsConstructor public class CreateDatasetRequest { /** 数据集名称 */ + @Size(min = 1, max = 100) @NotBlank(message = "数据集名称不能为空") private String name; /** 数据集描述 */ + @Size(max = 500) private String description; /** 数据集类型 */ @NotNull(message = "数据集类型不能为空") @@ -30,6 +33,4 @@ public class CreateDatasetRequest { private List tags; /** 数据源 */ private String dataSource; - /** 目标位置 */ - private String targetLocation; } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java index e7b1779e..b09f9856 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetResponse.java @@ -24,6 +24,8 @@ public class DatasetResponse { private String status; /** 标签列表 */ private List tags; + /** 数据集保留天数 */ + private Integer retentionDays; /** 数据源 */ private String dataSource; /** 目标位置 */ diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java index 3aea04f7..91070f39 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/UpdateDatasetRequest.java @@ -1,6 +1,8 @@ package com.datamate.datamanagement.interfaces.dto; import com.datamate.datamanagement.common.enums.DatasetStatusType; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.Size; import lombok.Getter; import lombok.Setter; @@ -13,8 +15,11 @@ @Setter public class UpdateDatasetRequest { /** 数据集名称 */ + @Size(min = 1, max = 100) + @NotBlank(message = "数据集名称不能为空") private String name; /** 数据集描述 */ + @Size(max = 500) private String description; /** 归集任务id */ private String dataSource; diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java index 24fe4c3c..e5cb6998 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java @@ -68,22 +68,6 @@ public ResponseEntity> getDatasetFiles( return ResponseEntity.ok(Response.ok(response)); } - @PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE) - public ResponseEntity> uploadDatasetFile( - @PathVariable("datasetId") String datasetId, - @RequestPart(value = "file", required = false) MultipartFile file) { - try { - DatasetFile datasetFile = datasetFileApplicationService.uploadFile(datasetId, file); - - return ResponseEntity.status(HttpStatus.CREATED).body(Response.ok(DatasetConverter.INSTANCE.convertToResponse(datasetFile))); - } catch (IllegalArgumentException e) { - return ResponseEntity.badRequest().body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null)); - } catch (Exception e) { - log.error("upload fail", e); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null)); - } - } - @GetMapping("/{fileId}") public ResponseEntity> getDatasetFileById( @PathVariable("datasetId") String datasetId, @@ -109,10 +93,9 @@ public ResponseEntity> deleteDatasetFile( } @IgnoreResponseWrap - @GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public ResponseEntity downloadDatasetFileById( - @PathVariable("datasetId") String datasetId, - @PathVariable("fileId") String fileId) { + @GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8") + public ResponseEntity downloadDatasetFileById(@PathVariable("datasetId") String datasetId, + @PathVariable("fileId") String fileId) { try { DatasetFile datasetFile = datasetFileApplicationService.getDatasetFile(datasetId, fileId); Resource resource = datasetFileApplicationService.downloadFile(datasetId, fileId); @@ -142,8 +125,8 @@ public void downloadDatasetFileAsZip(@PathVariable("datasetId") String datasetId * @return 批量上传请求id */ @PostMapping("/upload/pre-upload") - public ResponseEntity> preUpload(@PathVariable("datasetId") String datasetId, @RequestBody @Valid UploadFilesPreRequest request) { - + public ResponseEntity> preUpload(@PathVariable("datasetId") String datasetId, + @RequestBody @Valid UploadFilesPreRequest request) { return ResponseEntity.ok(Response.ok(datasetFileApplicationService.preUpload(request, datasetId))); } @@ -153,7 +136,7 @@ public ResponseEntity> preUpload(@PathVariable("datasetId") Str * @param uploadFileRequest 上传文件请求 */ @PostMapping("/upload/chunk") - public ResponseEntity chunkUpload(@PathVariable("datasetId") String datasetId, UploadFileRequest uploadFileRequest) { + public ResponseEntity chunkUpload(@PathVariable("datasetId") String datasetId, @Valid UploadFileRequest uploadFileRequest) { log.info("file upload reqId:{}, fileNo:{}, total chunk num:{}, current chunkNo:{}", uploadFileRequest.getReqId(), uploadFileRequest.getFileNo(), uploadFileRequest.getTotalChunkNum(), uploadFileRequest.getChunkNo()); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java new file mode 100644 index 00000000..a9b4bb46 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/scheduler/DatasetScheduler.java @@ -0,0 +1,65 @@ +package com.datamate.datamanagement.interfaces.scheduler; + +import com.datamate.common.interfaces.PagedResponse; +import com.datamate.datamanagement.application.DatasetApplicationService; +import com.datamate.datamanagement.interfaces.dto.DatasetPagingQuery; +import com.datamate.datamanagement.interfaces.dto.DatasetResponse; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 数据集定时任务触发 + * + * @since 2025/10/24 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DatasetScheduler { + private final DatasetApplicationService datasetApplicationService; + + /** + * 每天凌晨 00:00 扫描并删除超出保留期的数据集 + */ + @Scheduled(cron = "0 0 0 * * ?") + public void cleanupExpiredDatasets() { + int pageNo = 1; + int pageSize = 500; + + while (true) { + DatasetPagingQuery datasetPagingQuery = new DatasetPagingQuery(); + datasetPagingQuery.setPage(pageNo); + datasetPagingQuery.setSize(pageSize); + PagedResponse datasets = datasetApplicationService.getDatasets(datasetPagingQuery); + if (CollectionUtils.isEmpty(datasets.getContent())) { + break; + } + + datasets.getContent().forEach(dataset -> { + Integer retentionDays = dataset.getRetentionDays(); + LocalDateTime createdAt = dataset.getCreatedAt(); + if (retentionDays != null && retentionDays > 0 && createdAt != null) { + LocalDateTime expireAt = createdAt.plusDays(retentionDays); + if (expireAt.isBefore(LocalDateTime.now())) { + try { + log.info("Deleting dataset {}, expired at {} (retentionDays={})", dataset.getId(), expireAt, retentionDays); + datasetApplicationService.deleteDataset(dataset.getId()); + } catch (Exception e) { + log.warn("Failed to delete expired dataset {}: {}", dataset.getId(), e.getMessage()); + } + } + } + }); + + if (datasets.getPage() >= datasets.getTotalPages()) { + break; + } + pageNo++; + } + } +}