diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java index 41858a087..f0b31453c 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java @@ -6,6 +6,7 @@ import com.datamate.collection.domain.process.ProcessRunner; import com.datamate.collection.infrastructure.datax.config.MysqlConfig; import com.datamate.collection.infrastructure.datax.config.NasConfig; +import com.datamate.collection.infrastructure.datax.config.ObsConfig; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -92,18 +93,21 @@ private String getJobConfig(CollectionTask task) { try { ObjectMapper objectMapper = new ObjectMapper(); TemplateType templateType = task.getTaskType(); - switch (templateType) { - case NAS: + return switch (templateType) { + case NAS -> { // NAS 特殊处理 NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class); - return nasConfig.toJobConfig(objectMapper, task); - case OBS: - case MYSQL: + yield nasConfig.toJobConfig(objectMapper, task); + } + case OBS -> { + ObsConfig obsConfig = objectMapper.readValue(task.getConfig(), ObsConfig.class); + yield obsConfig.toJobConfig(objectMapper, task); + } + case MYSQL -> { MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class); - return mysqlConfig.toJobConfig(objectMapper, task); - default: - throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType); - } + yield mysqlConfig.toJobConfig(objectMapper, task); + } + }; } catch (Exception e) { log.error("Failed to parse task config", e); throw new RuntimeException("Failed to parse task config", e); diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/ObsConfig.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/ObsConfig.java new file mode 100644 index 000000000..9afd9005f --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/ObsConfig.java @@ -0,0 +1,61 @@ +package com.datamate.collection.infrastructure.datax.config; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.Setter; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * OBS 归集配置类 + * + * @since 2025/11/18 + */ +@Getter +@Setter +public class ObsConfig implements BaseConfig{ + private String endpoint; + private String bucket; + private String accessKey; + private String secretKey; + private String prefix; + + /** + * 将当前 OBS 配置构造成 DataX 所需的 job JSON 字符串。 + */ + public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception { + Map parameter = new HashMap<>(); + if (endpoint != null) parameter.put("endpoint", endpoint); + if (bucket != null) parameter.put("bucket", bucket); + if (accessKey != null) parameter.put("accessKey", accessKey); + if (secretKey != null) parameter.put("secretKey", secretKey); + if (prefix != null) parameter.put("prefix", prefix); + parameter.put("destPath", task.getTargetPath()); + + Map job = new HashMap<>(); + Map content = new HashMap<>(); + Map reader = new HashMap<>(); + reader.put("name", "obsreader"); + reader.put("parameter", parameter); + content.put("reader", reader); + + Map writer = new HashMap<>(); + writer.put("name", "obswriter"); + writer.put("parameter", parameter); + content.put("writer", writer); + + job.put("content", List.of(content)); + Map setting = new HashMap<>(); + Map channel = new HashMap<>(); + channel.put("channel", 2); + setting.put("speed", channel); + job.put("setting", setting); + + Map jobConfig = new HashMap<>(); + jobConfig.put("job", job); + return objectMapper.writeValueAsString(jobConfig); + } +} diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index f041fc0cb..c19a707ed 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -11,14 +11,12 @@ import com.datamate.datamanagement.domain.model.dataset.Tag; import com.datamate.datamanagement.infrastructure.client.CollectionTaskClient; import com.datamate.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse; -import com.datamate.datamanagement.infrastructure.client.dto.LocalCollectionConfig; import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode; import com.datamate.datamanagement.infrastructure.persistence.mapper.TagMapper; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; import com.datamate.datamanagement.interfaces.converter.DatasetConverter; import com.datamate.datamanagement.interfaces.dto.*; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -28,11 +26,13 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; +import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; -import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键) @@ -46,8 +46,7 @@ public class DatasetApplicationService { private final TagMapper tagMapper; private final DatasetFileRepository datasetFileRepository; private final CollectionTaskClient collectionTaskClient; - private final FileMetadataService fileMetadataService; - private final ObjectMapper objectMapper; + private final DatasetFileApplicationService datasetFileApplicationService; @Value("${datamate.data-management.base-path:/dataset}") private String datasetBasePath; @@ -223,68 +222,38 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() { @Async public void processDataSourceAsync(String datasetId, String dataSourceId) { try { - log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId); + log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId); List filePaths = getFilePaths(dataSourceId); if (CollectionUtils.isEmpty(filePaths)) { return; } - log.info("开始扫描文件,共 {} 个文件路径", filePaths.size()); - - List datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId); - // 查询数据集中已存在的文件 - List existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId); - Map existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity())); - Dataset dataset = datasetRepository.getById(datasetId); - dataset.setFiles(existDatasetFileList); - - // 批量同步数据集文件表 - asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths); - datasetRepository.updateById(dataset); + log.info("Starting file scan, total files: {}", filePaths.size()); + datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths)); } catch (Exception e) { log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e); } } - private void asyncDatasetFile(List datasetFiles, Map existDatasetFilePathMap, Dataset dataset, List existDatasetFileList, List filePaths) { - if (CollectionUtils.isNotEmpty(datasetFiles)) { - for (DatasetFile datasetFile : datasetFiles) { - if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) { - DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath()); - dataset.removeFile(existDatasetFile); - existDatasetFile.setFileSize(datasetFile.getFileSize()); - dataset.addFile(existDatasetFile); - dataset.active(); - datasetFileRepository.updateById(existDatasetFile); - } else { - dataset.addFile(datasetFile); - dataset.active(); - datasetFileRepository.save(datasetFile); - } - } - log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size()); - } else { - log.warn("未扫描到有效文件"); - } - for (DatasetFile datasetFile : existDatasetFileList) { - String existFilePath = datasetFile.getFilePath(); - for (String filePath : filePaths) { - if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) { - if (Files.notExists(Paths.get(existFilePath))) { - dataset.removeFile(datasetFile); - datasetFileRepository.removeById(datasetFile.getId()); - } - } - } - } - } - private List getFilePaths(String dataSourceId) { CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); if (taskDetail == null) { - log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId); + log.warn("Fail to get collection task detail, task ID: {}", dataSourceId); + return Collections.emptyList(); + } + Path targetPath = Paths.get(taskDetail.getTargetPath()); + if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) { + log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath()); + return Collections.emptyList(); + } + + try (Stream paths = Files.walk(targetPath, 1)) { + return paths + .filter(Files::isRegularFile) // 只保留文件,排除目录 + .map(Path::toString) // 转换为字符串路径 + .collect(Collectors.toList()); + } catch (IOException e) { + log.error("Fail to scan directory: {}", targetPath, e); return Collections.emptyList(); } - log.info("获取到归集任务详情: {}", taskDetail); - return Collections.singletonList(taskDetail.getTargetPath()); } } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index 5f467a269..390f71300 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -7,10 +7,12 @@ import com.datamate.common.infrastructure.exception.BusinessAssert; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.datamanagement.common.enums.DuplicateMethod; import com.datamate.datamanagement.domain.contants.DatasetConstant; import com.datamate.datamanagement.domain.model.dataset.Dataset; import com.datamate.datamanagement.domain.model.dataset.DatasetFile; import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo; +import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; import com.datamate.datamanagement.interfaces.converter.DatasetConverter; @@ -45,6 +47,8 @@ import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -63,6 +67,9 @@ public class DatasetFileApplicationService { @Value("${datamate.data-management.base-path:/dataset}") private String datasetBasePath; + @Value("${datamate.data-management.file.duplicate:COVER}") + private DuplicateMethod duplicateMethod; + @Autowired public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository, DatasetRepository datasetRepository, FileService fileService) { @@ -254,13 +261,36 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque .filePath(savedFile.getPath()) .fileType(AnalyzerUtils.getExtension(uploadFile.getFileName())) .build(); - - datasetFileRepository.save(datasetFile); + dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId)); + setDatasetFileId(datasetFile, dataset); + datasetFileRepository.saveOrUpdate(datasetFile); dataset.addFile(datasetFile); dataset.active(); datasetRepository.updateById(dataset); } + /** + * 为数据集文件设置文件id + * + * @param datasetFile 要设置id的文件 + * @param dataset 数据集(包含文件列表) + */ + private void setDatasetFileId(DatasetFile datasetFile, Dataset dataset) { + Map existDatasetFilMap = dataset.getFiles().stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity())); + DatasetFile existDatasetFile = existDatasetFilMap.get(datasetFile.getFilePath()); + if (Objects.isNull(existDatasetFile)) { + return; + } + if (duplicateMethod == DuplicateMethod.ERROR) { + log.error("file {} already exists in dataset {}", datasetFile.getFileName(), datasetFile.getDatasetId()); + throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); + } + if (duplicateMethod == DuplicateMethod.COVER) { + dataset.removeFile(existDatasetFile); + datasetFile.setId(existDatasetFile.getId()); + } + } + /** * 复制文件到数据集目录 * @@ -273,6 +303,8 @@ public List copyFilesToDatasetDir(String datasetId, CopyFilesReques Dataset dataset = datasetRepository.getById(datasetId); BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); List copiedFiles = new ArrayList<>(); + List existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); + dataset.setFiles(existDatasetFiles); for (String sourceFilePath : req.sourcePaths()) { Path sourcePath = Paths.get(sourceFilePath); if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { @@ -292,10 +324,11 @@ public List copyFilesToDatasetDir(String datasetId, CopyFilesReques .uploadTime(currentTime) .lastAccessTime(currentTime) .build(); + setDatasetFileId(datasetFile, dataset); dataset.addFile(datasetFile); copiedFiles.add(datasetFile); } - datasetFileRepository.saveBatch(copiedFiles, 100); + datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100); dataset.active(); datasetRepository.updateById(dataset); CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset)); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/FileMetadataService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/FileMetadataService.java deleted file mode 100644 index 58dedbb0f..000000000 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/FileMetadataService.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.datamate.datamanagement.application; - -import com.datamate.datamanagement.domain.model.dataset.DatasetFile; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -/** - * 文件元数据扫描服务 - */ -@Slf4j -@Service -public class FileMetadataService { - - /** - * 扫描文件路径列表,提取文件元数据 - * @param datasetId 数据集ID - * @return 数据集文件列表 - */ - public List scanFiles(List filePaths, String datasetId) { - List datasetFiles = new ArrayList<>(); - - if (filePaths == null || filePaths.isEmpty()) { - log.warn("文件路径列表为空,跳过扫描"); - return datasetFiles; - } - - for (String filePath : filePaths) { - try { - Path path = Paths.get(filePath); - - if (!Files.exists(path)) { - log.warn("路径不存在: {}", filePath); - continue; - } - - if (Files.isDirectory(path)) { - scanDirectory(datasetId, filePath, path, datasetFiles); - } else { - // 如果是文件,直接处理 - DatasetFile datasetFile = extractFileMetadata(filePath, datasetId); - if (datasetFile != null) { - datasetFiles.add(datasetFile); - } - } - } catch (Exception e) { - log.error("扫描路径失败: {}, 错误: {}", filePath, e.getMessage(), e); - } - } - - log.info("文件扫描完成,共扫描 {} 个文件", datasetFiles.size()); - return datasetFiles; - } - - private void scanDirectory(String datasetId, String filePath, Path path, - List datasetFiles) throws IOException { - // 如果是目录,扫描该目录下的所有文件(非递归) - List filesInDir = Files.list(path) - .filter(Files::isRegularFile) - .toList(); - - for (Path file : filesInDir) { - try { - DatasetFile datasetFile = extractFileMetadata(file.toString(), datasetId); - if (datasetFile != null) { - datasetFiles.add(datasetFile); - } - } catch (Exception e) { - log.error("处理目录中的文件失败: {}, 错误: {}", file, e.getMessage(), e); - } - } - log.info("已扫描目录 {} 下的 {} 个文件", filePath, filesInDir.size()); - } - /** - * @param filePath 文件路径 - * @param datasetId 数据集ID - * @return 数据集文件对象 - */ - private DatasetFile extractFileMetadata(String filePath, String datasetId) throws IOException { - Path path = Paths.get(filePath); - - if (!Files.exists(path)) { - log.warn("文件不存在: {}", filePath); - return null; - } - - if (!Files.isRegularFile(path)) { - log.warn("路径不是文件: {}", filePath); - return null; - } - - String fileName = path.getFileName().toString(); - long fileSize = Files.size(path); - String fileType = getFileExtension(fileName); - - return DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(datasetId) - .fileName(fileName) - .filePath(filePath) - .fileSize(fileSize) - .fileType(fileType) - .uploadTime(LocalDateTime.now()) - .lastAccessTime(LocalDateTime.now()) - .status("ACTIVE") - .build(); - } - - /** - * 获取文件扩展名 - */ - private String getFileExtension(String fileName) { - int lastDotIndex = fileName.lastIndexOf('.'); - if (lastDotIndex > 0 && lastDotIndex < fileName.length() - 1) { - return fileName.substring(lastDotIndex + 1).toLowerCase(); - } - return "unknown"; - } -} diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java new file mode 100644 index 000000000..56c886e44 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java @@ -0,0 +1,11 @@ +package com.datamate.datamanagement.common.enums; + +/** + * 文件重名时的处理方式 + * + * @since 2025/11/18 + */ +public enum DuplicateMethod { + ERROR, + COVER +} diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/exception/DataManagementErrorCode.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/exception/DataManagementErrorCode.java index 3be421c11..683f12b7f 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/exception/DataManagementErrorCode.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/exception/DataManagementErrorCode.java @@ -32,7 +32,11 @@ public enum DataManagementErrorCode implements ErrorCode { /** * 数据集标签已存在 */ - DATASET_TAG_ALREADY_EXISTS("data_management.0005", "数据集标签已存在"); + DATASET_TAG_ALREADY_EXISTS("data_management.0005", "数据集标签已存在"), + /** + * 数据集标签已存在 + */ + DATASET_FILE_ALREADY_EXISTS("data_management.0006", "数据集文件已存在"); private final String code; private final String message; diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java index 6ca64bd4e..498f69cd1 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/CreateDatasetRequest.java @@ -33,4 +33,6 @@ public class CreateDatasetRequest { private List tags; /** 数据源 */ private String dataSource; + /** 保留天数 */ + private Integer retentionDays; } diff --git a/frontend/src/pages/DataCollection/Create/CreateTask.tsx b/frontend/src/pages/DataCollection/Create/CreateTask.tsx index a14aa4f03..d13655eed 100644 --- a/frontend/src/pages/DataCollection/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCollection/Create/CreateTask.tsx @@ -192,9 +192,6 @@ export default function CollectionTaskCreate() { /> )} - - - {/* 模板配置 */}

diff --git a/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx b/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx index 0d73f2fe5..376caedc5 100644 --- a/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx +++ b/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx @@ -84,7 +84,7 @@ export default function ImportConfiguration({ form.resetFields(); setFileList([]); form.setFieldsValue({ files: null }); - setImportConfig({ source: DataSource.UPLOAD }); + setImportConfig({ source: importConfig.source ? importConfig.source : DataSource.UPLOAD }); }; const handleImportData = async () => { diff --git a/frontend/src/pages/DataManagement/dataset.const.tsx b/frontend/src/pages/DataManagement/dataset.const.tsx index 59810c12f..a2bc1f0e4 100644 --- a/frontend/src/pages/DataManagement/dataset.const.tsx +++ b/frontend/src/pages/DataManagement/dataset.const.tsx @@ -187,7 +187,7 @@ export const datasetStatusMap = { export const dataSourceMap: Record = { [DataSource.UPLOAD]: { label: "本地上传", value: DataSource.UPLOAD }, - // [DataSource.COLLECTION]: { label: "本地归集 ", value: DataSource.COLLECTION }, + [DataSource.COLLECTION]: { label: "归集任务导入 ", value: DataSource.COLLECTION }, // [DataSource.DATABASE]: { label: "数据库导入", value: DataSource.DATABASE }, // [DataSource.NAS]: { label: "NAS导入", value: DataSource.NAS }, // [DataSource.OBS]: { label: "OBS导入", value: DataSource.OBS }, diff --git a/runtime/datamate-python/app/module/synthesis/interface/ratio_task.py b/runtime/datamate-python/app/module/synthesis/interface/ratio_task.py index 53821e030..acc262a64 100644 --- a/runtime/datamate-python/app/module/synthesis/interface/ratio_task.py +++ b/runtime/datamate-python/app/module/synthesis/interface/ratio_task.py @@ -1,4 +1,5 @@ import asyncio +import uuid from typing import Set from datetime import datetime @@ -54,11 +55,13 @@ async def create_ratio_task( target_type = get_target_dataset_type(source_types) target_dataset = Dataset( + id=str(uuid.uuid4()), name=target_dataset_name, description=req.description or "", dataset_type=target_type, status="DRAFT", ) + target_dataset.path = f"/dataset/{target_dataset.id}" db.add(target_dataset) await db.flush() # 获取 target_dataset.id @@ -212,16 +215,18 @@ async def delete_ratio_tasks( raise HTTPException(status_code=500, detail=f"Fail to delete ratio task: {e}") -async def valid_exists(db, req: CreateRatioTaskRequest): - # 校验配比任务名称不能重复 - exist_task_q = await db.execute( - select(RatioInstance).where(RatioInstance.name == req.name) - ) - try: - exist_task_q.scalar_one_or_none() - except Exception as e: - logger.error(f"create ratio task failed: ratio task {req.name} already exists") - raise HTTPException(status_code=400, detail=f"ratio task {req.name} already exists") +async def valid_exists(db: AsyncSession, req: CreateRatioTaskRequest) -> None: + """校验配比任务名称不能重复(精确匹配,去除首尾空格)。""" + name = (req.name or "").strip() + if not name: + raise HTTPException(status_code=400, detail="ratio task name is required") + + # 查询是否已存在同名任务 + ratio_task = await db.execute(select(RatioInstance.id).where(RatioInstance.name == name)) + exists = ratio_task.scalar_one_or_none() + if exists is not None: + logger.error(f"create ratio task failed: ratio task '{name}' already exists (id={exists})") + raise HTTPException(status_code=400, detail=f"ratio task '{name}' already exists") async def get_dataset_types(dm_service: DatasetManagementService, req: CreateRatioTaskRequest) -> Set[str]: