Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,52 @@ public static void main(String[] args) {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
// 数据归集服务路由
.route("data-collection", r -> r.path("/api/data-collection/**")
.uri("lb://data-collection-service"))

// 数据管理服务路由
.route("data-management", r -> r.path("/api/data-management/**")
.uri("lb://data-management-service"))

// 算子市场服务路由
.route("operator-market", r -> r.path("/api/operators/**")
.uri("lb://operator-market-service"))

// 数据清洗服务路由
.route("data-cleaning", r -> r.path("/api/cleaning/**")
.uri("lb://data-cleaning-service"))

// 数据合成服务路由
.route("data-synthesis", r -> r.path("/api/synthesis/**")
.uri("lb://data-synthesis-service"))

// 数据标注服务路由
.route("data-annotation", r -> r.path("/api/annotation/**")
.uri("lb://data-annotation-service"))

// 数据评估服务路由
.route("data-evaluation", r -> r.path("/api/evaluation/**")
.uri("lb://data-evaluation-service"))

// 流程编排服务路由
.route("pipeline-orchestration", r -> r.path("/api/pipelines/**")
.uri("lb://pipeline-orchestration-service"))

// 执行引擎服务路由
.route("execution-engine", r -> r.path("/api/execution/**")
.uri("lb://execution-engine-service"))

// 认证服务路由
.route("auth-service", r -> r.path("/api/auth/**")
.uri("lb://auth-service"))

// RAG服务路由
.route("rag-indexer", r -> r.path("/api/rag/indexer/**")
.uri("lb://rag-indexer-service"))
.route("rag-query", r -> r.path("/api/rag/query/**")
.uri("lb://rag-query-service"))

.build();
}
}
9 changes: 7 additions & 2 deletions backend/openapi/specs/data-collection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ tags:
description: 任务执行管理

paths:
/tasks:
/data-collection/tasks:
get:
operationId: getTasks
tags: [CollectionTask]
summary: 获取归集任务列表
parameters:
Expand Down Expand Up @@ -56,6 +57,7 @@ paths:
$ref: '#/components/schemas/PagedCollectionTaskSummary'

post:
operationId: createTask
tags: [CollectionTask]
summary: 创建归集任务
description: 创建新的数据归集任务
Expand All @@ -73,8 +75,9 @@ paths:
schema:
$ref: '#/components/schemas/CollectionTaskResponse'

/tasks/{id}:
/data-collection/tasks/{id}:
get:
operationId: getTaskDetail
tags: [CollectionTask]
summary: 获取归集任务详情
parameters:
Expand All @@ -94,6 +97,7 @@ paths:
description: 归集任务不存在

put:
operationId: updateTask
tags: [CollectionTask]
summary: 更新归集任务
parameters:
Expand All @@ -117,6 +121,7 @@ paths:
$ref: '#/components/schemas/CollectionTaskResponse'

delete:
operationId: deleteTask
tags: [CollectionTask]
summary: 删除归集任务
parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private String mapToJsonString(Map<String, Object> map) {
}

@Override
public ResponseEntity<CollectionTaskResponse> tasksPost(CreateCollectionTaskRequest body) {
public ResponseEntity<CollectionTaskResponse> createTask(CreateCollectionTaskRequest body) {
CollectionTask t = new CollectionTask();
t.setName(body.getName());
t.setDescription(body.getDescription());
Expand All @@ -94,7 +94,7 @@ public ResponseEntity<CollectionTaskResponse> tasksPost(CreateCollectionTaskRequ
}

@Override
public ResponseEntity<CollectionTaskResponse> tasksIdPut(String id, UpdateCollectionTaskRequest body) {
public ResponseEntity<CollectionTaskResponse> updateTask(String id, UpdateCollectionTaskRequest body) {
CollectionTask t = taskService.get(id);
if (t == null) {
return ResponseEntity.notFound().build();
Expand All @@ -110,19 +110,19 @@ public ResponseEntity<CollectionTaskResponse> tasksIdPut(String id, UpdateCollec
}

@Override
public ResponseEntity<Void> tasksIdDelete(String id) {
public ResponseEntity<Void> deleteTask(String id) {
taskService.delete(id);
return ResponseEntity.noContent().build();
}

@Override
public ResponseEntity<CollectionTaskResponse> tasksIdGet(String id) {
public ResponseEntity<CollectionTaskResponse> getTaskDetail(String id) {
CollectionTask t = taskService.get(id);
return t == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(toResponse(t));
}

@Override
public ResponseEntity<PagedCollectionTaskSummary> tasksGet(Integer page, Integer size, TaskStatus status, String name) {
public ResponseEntity<PagedCollectionTaskSummary> getTasks(Integer page, Integer size, TaskStatus status, String name) {
var list = taskService.list(page, size, status == null ? null : status.getValue(), name);
PagedCollectionTaskSummary p = new PagedCollectionTaskSummary();
p.setContent(list.stream().map(this::toSummary).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package com.dataengine.datamanagement;

import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

/**
* Data Management Service Configuration
* 数据管理服务配置类 - 多源接入、元数据、血缘治理
*/
@Configuration
@EnableFeignClients(basePackages = "com.dataengine.datamanagement.infrastructure.client")
@EnableAsync
@ComponentScan(basePackages = {
"com.dataengine.datamanagement",
"com.dataengine.shared"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package com.dataengine.datamanagement.application.service;

import com.dataengine.datamanagement.domain.model.dataset.Dataset;
import com.dataengine.datamanagement.domain.model.dataset.DatasetFile;
import com.dataengine.datamanagement.domain.model.dataset.Tag;
import com.dataengine.datamanagement.infrastructure.client.CollectionTaskClient;
import com.dataengine.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse;
import com.dataengine.datamanagement.infrastructure.client.dto.LocalCollectionConfig;
import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper;
import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper;
import com.dataengine.datamanagement.infrastructure.persistence.mapper.TagMapper;
import com.dataengine.datamanagement.interfaces.converter.DatasetConverter;
import com.dataengine.datamanagement.interfaces.dto.AllDatasetStatisticsResponse;
import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.dataengine.datamanagement.interfaces.dto.DatasetPagingQuery;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.RowBounds;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -25,22 +33,34 @@
/**
* 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键)
*/
@Slf4j
@Service
@Transactional
public class DatasetApplicationService {

private final DatasetMapper datasetMapper;
private final TagMapper tagMapper;
private final DatasetFileMapper datasetFileMapper;
private final CollectionTaskClient collectionTaskClient;
private final FileMetadataService fileMetadataService;
private final ObjectMapper objectMapper;

@Value("${dataset.base.path:/dataset}")
private String datasetBasePath;

@Autowired
public DatasetApplicationService(DatasetMapper datasetMapper, TagMapper tagMapper, DatasetFileMapper datasetFileMapper) {
public DatasetApplicationService(DatasetMapper datasetMapper,
TagMapper tagMapper,
DatasetFileMapper datasetFileMapper,
CollectionTaskClient collectionTaskClient,
FileMetadataService fileMetadataService,
ObjectMapper objectMapper) {
this.datasetMapper = datasetMapper;
this.tagMapper = tagMapper;
this.datasetFileMapper = datasetFileMapper;
this.collectionTaskClient = collectionTaskClient;
this.fileMetadataService = fileMetadataService;
this.objectMapper = objectMapper;
}

/**
Expand All @@ -66,6 +86,11 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
}
}

if (StringUtils.isNotBlank(createDatasetRequest.getDataSource())) {
// 数据源id不为空,使用异步线程进行文件扫盘落库
processDataSourceAsync(dataset.getId(), createDatasetRequest.getDataSource());
}

// 返回创建的数据集,包含标签信息
Dataset createdDataset = datasetMapper.findById(dataset.getId());
createdDataset.getTags().addAll(processedTags);
Expand Down Expand Up @@ -241,4 +266,78 @@ public Map<String, Object> getDatasetStatistics(String datasetId) {
public AllDatasetStatisticsResponse getAllDatasetStatistics() {
return datasetMapper.getAllDatasetStatistics();
}

/**
* 异步处理数据源文件扫描
* @param datasetId 数据集ID
* @param dataSourceId 数据源ID(归集任务ID)
*/
@Async
public void processDataSourceAsync(String datasetId, String dataSourceId) {
try {
log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);

// 1. 调用数据归集服务获取任务详情
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId);
if (taskDetail == null) {
log.error("获取归集任务详情失败,任务ID: {}", dataSourceId);
return;
}

log.info("获取到归集任务详情: {}", taskDetail.getName());

// 2. 解析任务配置
LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
if (config == null) {
log.error("解析任务配置失败,任务ID: {}", dataSourceId);
return;
}

// 3. 检查任务类型是否为 LOCAL_COLLECTION
if (!"LOCAL_COLLECTION".equalsIgnoreCase(config.getType())) {
log.info("任务类型不是 LOCAL_COLLECTION,跳过文件扫描。任务类型: {}", config.getType());
return;
}

// 4. 获取文件路径列表
List<String> filePaths = config.getFilePaths();
if (CollectionUtils.isEmpty(filePaths)) {
log.warn("文件路径列表为空,任务ID: {}", dataSourceId);
return;
}

log.info("开始扫描文件,共 {} 个文件路径", filePaths.size());

// 5. 扫描文件元数据
List<DatasetFile> datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId);

// 6. 批量插入数据集文件表
if (CollectionUtils.isNotEmpty(datasetFiles)) {
for (DatasetFile datasetFile : datasetFiles) {
datasetFileMapper.insert(datasetFile);
}
log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size());
} else {
log.warn("未扫描到有效文件");
}

} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
}
}

/**
* 解析任务配置
*/
private LocalCollectionConfig parseTaskConfig(Map<String, Object> configMap) {
try {
if (configMap == null || configMap.isEmpty()) {
return null;
}
return objectMapper.convertValue(configMap, LocalCollectionConfig.class);
} catch (Exception e) {
log.error("解析任务配置失败", e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public DatasetFile uploadFile(String datasetId, MultipartFile file, String descr
datasetFile.setDatasetId(datasetId);
datasetFile.setFileName(fileName);
datasetFile.setFilePath(targetLocation.toString());
datasetFile.setFileType(file.getContentType());
datasetFile.setFileFormat(getFileExtension(originalFilename));
datasetFile.setFileType(getFileExtension(originalFilename));
datasetFile.setFileFormat(file.getContentType());
datasetFile.setFileSize(file.getSize());
datasetFile.setUploadTime(LocalDateTime.now());
datasetFile.setStatus(StatusConstants.DatasetFileStatuses.COMPLETED);
Expand Down Expand Up @@ -241,7 +241,6 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque
.fileSize(savedFile.length())
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.lastAccessTime(currentTime)
.fileName(uploadFile.getFileName())
.filePath(savedFile.getPath())
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
Expand Down
Loading