diff --git a/backend/api-gateway/src/main/java/com/dataengine/gateway/ApiGatewayApplication.java b/backend/api-gateway/src/main/java/com/dataengine/gateway/ApiGatewayApplication.java index 66589394..a30720d8 100644 --- a/backend/api-gateway/src/main/java/com/dataengine/gateway/ApiGatewayApplication.java +++ b/backend/api-gateway/src/main/java/com/dataengine/gateway/ApiGatewayApplication.java @@ -26,48 +26,52 @@ public static void main(String[] args) { @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { return builder.routes() + // 数据归集服务路由 + .route("data-collection", r -> r.path("/api/data-collection/**") + .uri("lb://data-collection-service")) + // 数据管理服务路由 .route("data-management", r -> r.path("/api/data-management/**") .uri("lb://data-management-service")) - + // 算子市场服务路由 .route("operator-market", r -> r.path("/api/operators/**") .uri("lb://operator-market-service")) - + // 数据清洗服务路由 .route("data-cleaning", r -> r.path("/api/cleaning/**") .uri("lb://data-cleaning-service")) - + // 数据合成服务路由 .route("data-synthesis", r -> r.path("/api/synthesis/**") .uri("lb://data-synthesis-service")) - + // 数据标注服务路由 .route("data-annotation", r -> r.path("/api/annotation/**") .uri("lb://data-annotation-service")) - + // 数据评估服务路由 .route("data-evaluation", r -> r.path("/api/evaluation/**") .uri("lb://data-evaluation-service")) - + // 流程编排服务路由 .route("pipeline-orchestration", r -> r.path("/api/pipelines/**") .uri("lb://pipeline-orchestration-service")) - + // 执行引擎服务路由 .route("execution-engine", r -> r.path("/api/execution/**") .uri("lb://execution-engine-service")) - + // 认证服务路由 .route("auth-service", r -> r.path("/api/auth/**") .uri("lb://auth-service")) - + // RAG服务路由 .route("rag-indexer", r -> r.path("/api/rag/indexer/**") .uri("lb://rag-indexer-service")) .route("rag-query", r -> r.path("/api/rag/query/**") .uri("lb://rag-query-service")) - + .build(); } } diff --git a/backend/openapi/specs/data-collection.yaml b/backend/openapi/specs/data-collection.yaml index 3cb4f576..cc2731f4 100644 --- a/backend/openapi/specs/data-collection.yaml +++ b/backend/openapi/specs/data-collection.yaml @@ -23,8 +23,9 @@ tags: description: 任务执行管理 paths: - /tasks: + /data-collection/tasks: get: + operationId: getTasks tags: [CollectionTask] summary: 获取归集任务列表 parameters: @@ -56,6 +57,7 @@ paths: $ref: '#/components/schemas/PagedCollectionTaskSummary' post: + operationId: createTask tags: [CollectionTask] summary: 创建归集任务 description: 创建新的数据归集任务 @@ -73,8 +75,9 @@ paths: schema: $ref: '#/components/schemas/CollectionTaskResponse' - /tasks/{id}: + /data-collection/tasks/{id}: get: + operationId: getTaskDetail tags: [CollectionTask] summary: 获取归集任务详情 parameters: @@ -94,6 +97,7 @@ paths: description: 归集任务不存在 put: + operationId: updateTask tags: [CollectionTask] summary: 更新归集任务 parameters: @@ -117,6 +121,7 @@ paths: $ref: '#/components/schemas/CollectionTaskResponse' delete: + operationId: deleteTask tags: [CollectionTask] summary: 删除归集任务 parameters: diff --git a/backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java b/backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java index 1057ab30..299c8baf 100644 --- a/backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java +++ b/backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java @@ -82,7 +82,7 @@ private String mapToJsonString(Map map) { } @Override - public ResponseEntity tasksPost(CreateCollectionTaskRequest body) { + public ResponseEntity createTask(CreateCollectionTaskRequest body) { CollectionTask t = new CollectionTask(); t.setName(body.getName()); t.setDescription(body.getDescription()); @@ -94,7 +94,7 @@ public ResponseEntity tasksPost(CreateCollectionTaskRequ } @Override - public ResponseEntity tasksIdPut(String id, UpdateCollectionTaskRequest body) { + public ResponseEntity updateTask(String id, UpdateCollectionTaskRequest body) { CollectionTask t = taskService.get(id); if (t == null) { return ResponseEntity.notFound().build(); @@ -110,19 +110,19 @@ public ResponseEntity tasksIdPut(String id, UpdateCollec } @Override - public ResponseEntity tasksIdDelete(String id) { + public ResponseEntity deleteTask(String id) { taskService.delete(id); return ResponseEntity.noContent().build(); } @Override - public ResponseEntity tasksIdGet(String id) { + public ResponseEntity getTaskDetail(String id) { CollectionTask t = taskService.get(id); return t == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(toResponse(t)); } @Override - public ResponseEntity tasksGet(Integer page, Integer size, TaskStatus status, String name) { + public ResponseEntity getTasks(Integer page, Integer size, TaskStatus status, String name) { var list = taskService.list(page, size, status == null ? null : status.getValue(), name); PagedCollectionTaskSummary p = new PagedCollectionTaskSummary(); p.setContent(list.stream().map(this::toSummary).collect(Collectors.toList())); diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/DataManagementServiceConfiguration.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/DataManagementServiceConfiguration.java index 43e735dd..4e05894e 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/DataManagementServiceConfiguration.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/DataManagementServiceConfiguration.java @@ -1,13 +1,17 @@ package com.dataengine.datamanagement; +import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; /** * Data Management Service Configuration * 数据管理服务配置类 - 多源接入、元数据、血缘治理 */ @Configuration +@EnableFeignClients(basePackages = "com.dataengine.datamanagement.infrastructure.client") +@EnableAsync @ComponentScan(basePackages = { "com.dataengine.datamanagement", "com.dataengine.shared" diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java index f016545b..407f9680 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java @@ -1,7 +1,11 @@ package com.dataengine.datamanagement.application.service; import com.dataengine.datamanagement.domain.model.dataset.Dataset; +import com.dataengine.datamanagement.domain.model.dataset.DatasetFile; import com.dataengine.datamanagement.domain.model.dataset.Tag; +import com.dataengine.datamanagement.infrastructure.client.CollectionTaskClient; +import com.dataengine.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse; +import com.dataengine.datamanagement.infrastructure.client.dto.LocalCollectionConfig; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.TagMapper; @@ -9,13 +13,17 @@ import com.dataengine.datamanagement.interfaces.dto.AllDatasetStatisticsResponse; import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest; import com.dataengine.datamanagement.interfaces.dto.DatasetPagingQuery; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.session.RowBounds; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -25,6 +33,7 @@ /** * 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键) */ +@Slf4j @Service @Transactional public class DatasetApplicationService { @@ -32,15 +41,26 @@ public class DatasetApplicationService { private final DatasetMapper datasetMapper; private final TagMapper tagMapper; private final DatasetFileMapper datasetFileMapper; + private final CollectionTaskClient collectionTaskClient; + private final FileMetadataService fileMetadataService; + private final ObjectMapper objectMapper; @Value("${dataset.base.path:/dataset}") private String datasetBasePath; @Autowired - public DatasetApplicationService(DatasetMapper datasetMapper, TagMapper tagMapper, DatasetFileMapper datasetFileMapper) { + public DatasetApplicationService(DatasetMapper datasetMapper, + TagMapper tagMapper, + DatasetFileMapper datasetFileMapper, + CollectionTaskClient collectionTaskClient, + FileMetadataService fileMetadataService, + ObjectMapper objectMapper) { this.datasetMapper = datasetMapper; this.tagMapper = tagMapper; this.datasetFileMapper = datasetFileMapper; + this.collectionTaskClient = collectionTaskClient; + this.fileMetadataService = fileMetadataService; + this.objectMapper = objectMapper; } /** @@ -66,6 +86,11 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) { } } + if (StringUtils.isNotBlank(createDatasetRequest.getDataSource())) { + // 数据源id不为空,使用异步线程进行文件扫盘落库 + processDataSourceAsync(dataset.getId(), createDatasetRequest.getDataSource()); + } + // 返回创建的数据集,包含标签信息 Dataset createdDataset = datasetMapper.findById(dataset.getId()); createdDataset.getTags().addAll(processedTags); @@ -241,4 +266,78 @@ public Map getDatasetStatistics(String datasetId) { public AllDatasetStatisticsResponse getAllDatasetStatistics() { return datasetMapper.getAllDatasetStatistics(); } + + /** + * 异步处理数据源文件扫描 + * @param datasetId 数据集ID + * @param dataSourceId 数据源ID(归集任务ID) + */ + @Async + public void processDataSourceAsync(String datasetId, String dataSourceId) { + try { + log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId); + + // 1. 调用数据归集服务获取任务详情 + CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId); + if (taskDetail == null) { + log.error("获取归集任务详情失败,任务ID: {}", dataSourceId); + return; + } + + log.info("获取到归集任务详情: {}", taskDetail.getName()); + + // 2. 解析任务配置 + LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); + if (config == null) { + log.error("解析任务配置失败,任务ID: {}", dataSourceId); + return; + } + + // 3. 检查任务类型是否为 LOCAL_COLLECTION + if (!"LOCAL_COLLECTION".equalsIgnoreCase(config.getType())) { + log.info("任务类型不是 LOCAL_COLLECTION,跳过文件扫描。任务类型: {}", config.getType()); + return; + } + + // 4. 获取文件路径列表 + List filePaths = config.getFilePaths(); + if (CollectionUtils.isEmpty(filePaths)) { + log.warn("文件路径列表为空,任务ID: {}", dataSourceId); + return; + } + + log.info("开始扫描文件,共 {} 个文件路径", filePaths.size()); + + // 5. 扫描文件元数据 + List datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId); + + // 6. 批量插入数据集文件表 + if (CollectionUtils.isNotEmpty(datasetFiles)) { + for (DatasetFile datasetFile : datasetFiles) { + datasetFileMapper.insert(datasetFile); + } + log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size()); + } else { + log.warn("未扫描到有效文件"); + } + + } catch (Exception e) { + log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e); + } + } + + /** + * 解析任务配置 + */ + private LocalCollectionConfig parseTaskConfig(Map configMap) { + try { + if (configMap == null || configMap.isEmpty()) { + return null; + } + return objectMapper.convertValue(configMap, LocalCollectionConfig.class); + } catch (Exception e) { + log.error("解析任务配置失败", e); + return null; + } + } } diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java index 7ecbd831..e8141e1a 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java @@ -92,8 +92,8 @@ public DatasetFile uploadFile(String datasetId, MultipartFile file, String descr datasetFile.setDatasetId(datasetId); datasetFile.setFileName(fileName); datasetFile.setFilePath(targetLocation.toString()); - datasetFile.setFileType(file.getContentType()); - datasetFile.setFileFormat(getFileExtension(originalFilename)); + datasetFile.setFileType(getFileExtension(originalFilename)); + datasetFile.setFileFormat(file.getContentType()); datasetFile.setFileSize(file.getSize()); datasetFile.setUploadTime(LocalDateTime.now()); datasetFile.setStatus(StatusConstants.DatasetFileStatuses.COMPLETED); @@ -241,7 +241,6 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque .fileSize(savedFile.length()) .uploadTime(currentTime) .lastAccessTime(currentTime) - .lastAccessTime(currentTime) .fileName(uploadFile.getFileName()) .filePath(savedFile.getPath()) .fileType(AnalyzerUtils.getExtension(uploadFile.getFileName())) diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/FileMetadataService.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/FileMetadataService.java new file mode 100644 index 00000000..1e9b4e6d --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/FileMetadataService.java @@ -0,0 +1,136 @@ +package com.dataengine.datamanagement.application.service; + +import com.dataengine.datamanagement.domain.model.dataset.DatasetFile; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * 文件元数据扫描服务 + */ +@Slf4j +@Service +public class FileMetadataService { + + /** + * 扫描文件路径列表,提取文件元数据 + * @param filePaths 文件路径列表 + * @param datasetId 数据集ID + * @return 数据集文件列表 + */ + public List scanFiles(List filePaths, String datasetId) { + List datasetFiles = new ArrayList<>(); + + if (filePaths == null || filePaths.isEmpty()) { + log.warn("文件路径列表为空,跳过扫描"); + return datasetFiles; + } + + for (String filePath : filePaths) { + try { + DatasetFile datasetFile = extractFileMetadata(filePath, datasetId); + if (datasetFile != null) { + datasetFiles.add(datasetFile); + } + } catch (Exception e) { + log.error("扫描文件失败: {}, 错误: {}", filePath, e.getMessage(), e); + } + } + + log.info("文件扫描完成,共扫描 {} 个文件", datasetFiles.size()); + return datasetFiles; + } + + /** + * 提取单个文件的元数据 + * @param filePath 文件路径 + * @param datasetId 数据集ID + * @return 数据集文件对象 + */ + private DatasetFile extractFileMetadata(String filePath, String datasetId) throws IOException { + Path path = Paths.get(filePath); + + if (!Files.exists(path)) { + log.warn("文件不存在: {}", filePath); + return null; + } + + if (!Files.isRegularFile(path)) { + log.warn("路径不是文件: {}", filePath); + return null; + } + + String fileName = path.getFileName().toString(); + long fileSize = Files.size(path); + String fileFormat = getFileExtension(fileName); + String fileType = determineFileType(fileFormat); + + return DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(datasetId) + .fileName(fileName) + .filePath(filePath) + .fileSize(fileSize) + .fileFormat(fileFormat) + .fileType(fileType) + .uploadTime(LocalDateTime.now()) + .lastAccessTime(LocalDateTime.now()) + .status("UPLOADED") + .build(); + } + + /** + * 获取文件扩展名 + */ + private String getFileExtension(String fileName) { + int lastDotIndex = fileName.lastIndexOf('.'); + if (lastDotIndex > 0 && lastDotIndex < fileName.length() - 1) { + return fileName.substring(lastDotIndex + 1).toLowerCase(); + } + return "unknown"; + } + + /** + * 根据文件扩展名判断文件类型 + */ + private String determineFileType(String fileFormat) { + if (fileFormat == null) { + return "UNKNOWN"; + } + + // 图片类型 + if (fileFormat.matches("jpg|jpeg|png|gif|bmp|webp|svg|tiff|ico")) { + return "IMAGE"; + } + + // 视频类型 + if (fileFormat.matches("mp4|avi|mov|wmv|flv|mkv|webm|m4v")) { + return "VIDEO"; + } + + // 音频类型 + if (fileFormat.matches("mp3|wav|flac|aac|ogg|wma|m4a")) { + return "AUDIO"; + } + + // 文本类型 + if (fileFormat.matches("txt|md|json|xml|csv|log|yaml|yml|properties|conf")) { + return "TEXT"; + } + + // 文档类型 + if (fileFormat.matches("pdf|doc|docx|xls|xlsx|ppt|pptx")) { + return "DOCUMENT"; + } + + return "OTHER"; + } +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/CollectionTaskClient.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/CollectionTaskClient.java new file mode 100644 index 00000000..f717ac9a --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/CollectionTaskClient.java @@ -0,0 +1,21 @@ +package com.dataengine.datamanagement.infrastructure.client; + +import com.dataengine.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; + +/** + * 数据归集服务 Feign Client + */ +@FeignClient(name = "collection-service", url = "${collection.service.url:http://localhost:8080}") +public interface CollectionTaskClient { + + /** + * 获取归集任务详情 + * @param taskId 任务ID + * @return 任务详情 + */ + @GetMapping("/api/data-collection/tasks/{id}") + CollectionTaskDetailResponse getTaskDetail(@PathVariable("id") String taskId); +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java new file mode 100644 index 00000000..8fc619b0 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java @@ -0,0 +1,22 @@ +package com.dataengine.datamanagement.infrastructure.client.dto; + +import lombok.Data; +import java.time.LocalDateTime; +import java.util.Map; + +/** + * 归集任务详情响应 + */ +@Data +public class CollectionTaskDetailResponse { + private String id; + private String name; + private String description; + private Map config; + private String status; + private String syncMode; + private String scheduleExpression; + private String lastExecutionId; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/LocalCollectionConfig.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/LocalCollectionConfig.java new file mode 100644 index 00000000..8a78549b --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/LocalCollectionConfig.java @@ -0,0 +1,26 @@ +package com.dataengine.datamanagement.infrastructure.client.dto; + +import lombok.Data; +import java.util.List; + +/** + * 本地归集任务配置 + */ +@Data +public class LocalCollectionConfig { + /** + * 归集类型 + */ + private String type; + + /** + * 文件路径列表 + */ + private List filePaths; + + /** + * 其他配置项 + */ + private String sourceType; + private String targetType; +}