Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
12dc3a6
refactor: 修改调整数据归集实现,删除无用代码,优化代码结构
hefanli Oct 23, 2025
8809997
Merge remote-tracking branch 'origin/main'
hefanli Oct 23, 2025
008a1e4
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 24, 2025
8d7bb8a
feature: 每天凌晨00:00扫描所有数据集,检查数据集是否超过了预设的保留天数,超出保留天数的数据集调用删除接口进行删除
hefanli Oct 24, 2025
738127e
fix: 修改删除数据集文件的逻辑,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录
hefanli Oct 24, 2025
23a821a
Merge remote-tracking branch 'origin/main'
hefanli Oct 24, 2025
98ee5b3
fix: 增加参数校验和接口定义,删除不使用的接口
hefanli Oct 24, 2025
825bd4b
fix: 数据集统计数据默认为0
hefanli Oct 24, 2025
3b1a043
feature: 数据集状态增加流转,创建时为草稿状态,上传文件或者归集文件后修改为活动状态
hefanli Oct 24, 2025
dc3706d
refactor: 修改分页查询归集任务的代码
hefanli Oct 24, 2025
732b4e4
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 25, 2025
96c2247
Merge remote-tracking branch 'origin/main'
hefanli Oct 25, 2025
4398e56
fix: 更新后重新执行;归集任务执行增加事务控制
hefanli Oct 25, 2025
bb22e89
feature: 创建归集任务时能够同步创建数据集,更新归集任务时能更新到指定数据集
hefanli Oct 25, 2025
7edfacd
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 25, 2025
b42fa8f
fix: 创建归集任务不需要创建数据集时不应该报错
hefanli Oct 27, 2025
a9623f8
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 28, 2025
a177724
Merge remote-tracking branch 'origin/main'
hefanli Oct 28, 2025
8ad8980
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 28, 2025
5fb153e
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 28, 2025
d18b62d
fix: 修复删除文件时数据集的统计数据不变动
hefanli Oct 29, 2025
98b81a4
feature: 查询数据集详情时能够获取到文件标签分布
hefanli Oct 29, 2025
109aba9
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 30, 2025
5756ecd
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 30, 2025
e445455
fix: tags为空时不进行分析
hefanli Oct 30, 2025
ee0e93c
fix: 状态修改为ACTIVE
hefanli Oct 30, 2025
b77b0dc
fix: 修改解析tag的方法
hefanli Oct 31, 2025
9b986b5
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 31, 2025
834f213
Merge remote-tracking branch 'origin/main'
hefanli Oct 31, 2025
3d7cd24
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 31, 2025
37e30ed
feature: 实现创建、分页查询、删除配比任务
hefanli Oct 31, 2025
6276552
feature: 实现创建、分页查询、删除配比任务的前端交互
hefanli Oct 31, 2025
3f4e26e
fix: 修复进度计算异常导致的页面报错
hefanli Oct 31, 2025
6c88a30
Merge remote-tracking branch 'origin/main'
hefanli Oct 31, 2025
30e420f
fix: 配比生成的数据集增加路径path初始化
hefanli Oct 31, 2025
33ac4f8
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 31, 2025
32c4709
Merge remote-tracking branch 'origin/main'
hefanli Nov 3, 2025
a9caacb
feature: 创建数据集时增加保留天数设置
hefanli Nov 3, 2025
412ac07
fix: 删除不需要的最大执行次数限制
hefanli Nov 3, 2025
bb80b18
Merge branch 'ModelEngine-Group:main' into main
hefanli Nov 5, 2025
2ac1457
Merge branch 'ModelEngine-Group:main' into main
hefanli Nov 18, 2025
b37ab06
Merge remote-tracking branch 'origin/main'
hefanli Nov 18, 2025
e38ad19
feature: 实现obs归集
hefanli Nov 18, 2025
c478072
feature: 增加数据集中出现同名文件时的处理方式
hefanli Nov 18, 2025
fcd9a33
feature: 前端数据集导入数据时增加可以选择归集任务导入
hefanli Nov 19, 2025
c4b354e
Merge branch 'ModelEngine-Group:main' into main
hefanli Nov 19, 2025
ffd1ece
Merge remote-tracking branch 'origin/main'
hefanli Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.infrastructure.datax.config.MysqlConfig;
import com.datamate.collection.infrastructure.datax.config.NasConfig;
import com.datamate.collection.infrastructure.datax.config.ObsConfig;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -92,18 +93,21 @@ private String getJobConfig(CollectionTask task) {
try {
ObjectMapper objectMapper = new ObjectMapper();
TemplateType templateType = task.getTaskType();
switch (templateType) {
case NAS:
return switch (templateType) {
case NAS -> {
// NAS 特殊处理
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
return nasConfig.toJobConfig(objectMapper, task);
case OBS:
case MYSQL:
yield nasConfig.toJobConfig(objectMapper, task);
}
case OBS -> {
ObsConfig obsConfig = objectMapper.readValue(task.getConfig(), ObsConfig.class);
yield obsConfig.toJobConfig(objectMapper, task);
}
case MYSQL -> {
MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class);
return mysqlConfig.toJobConfig(objectMapper, task);
default:
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
}
yield mysqlConfig.toJobConfig(objectMapper, task);
}
};
} catch (Exception e) {
log.error("Failed to parse task config", e);
throw new RuntimeException("Failed to parse task config", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.datamate.collection.infrastructure.datax.config;

import com.datamate.collection.domain.model.entity.CollectionTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* OBS 归集配置类
*
* @since 2025/11/18
*/
@Getter
@Setter
public class ObsConfig implements BaseConfig{
private String endpoint;
private String bucket;
private String accessKey;
private String secretKey;
private String prefix;

/**
* 将当前 OBS 配置构造成 DataX 所需的 job JSON 字符串。
*/
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
Map<String, Object> parameter = new HashMap<>();
if (endpoint != null) parameter.put("endpoint", endpoint);
if (bucket != null) parameter.put("bucket", bucket);
if (accessKey != null) parameter.put("accessKey", accessKey);
if (secretKey != null) parameter.put("secretKey", secretKey);
if (prefix != null) parameter.put("prefix", prefix);
parameter.put("destPath", task.getTargetPath());

Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "obsreader");
reader.put("parameter", parameter);
content.put("reader", reader);

Map<String, Object> writer = new HashMap<>();
writer.put("name", "obswriter");
writer.put("parameter", parameter);
content.put("writer", writer);

job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);

Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import com.datamate.datamanagement.domain.model.dataset.Tag;
import com.datamate.datamanagement.infrastructure.client.CollectionTaskClient;
import com.datamate.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse;
import com.datamate.datamanagement.infrastructure.client.dto.LocalCollectionConfig;
import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode;
import com.datamate.datamanagement.infrastructure.persistence.mapper.TagMapper;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
import com.datamate.datamanagement.interfaces.dto.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -28,11 +26,13 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键)
Expand All @@ -46,8 +46,7 @@ public class DatasetApplicationService {
private final TagMapper tagMapper;
private final DatasetFileRepository datasetFileRepository;
private final CollectionTaskClient collectionTaskClient;
private final FileMetadataService fileMetadataService;
private final ObjectMapper objectMapper;
private final DatasetFileApplicationService datasetFileApplicationService;

@Value("${datamate.data-management.base-path:/dataset}")
private String datasetBasePath;
Expand Down Expand Up @@ -223,68 +222,38 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() {
@Async
public void processDataSourceAsync(String datasetId, String dataSourceId) {
try {
log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);
log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId);
List<String> filePaths = getFilePaths(dataSourceId);
if (CollectionUtils.isEmpty(filePaths)) {
return;
}
log.info("开始扫描文件,共 {} 个文件路径", filePaths.size());

List<DatasetFile> datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId);
// 查询数据集中已存在的文件
List<DatasetFile> existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId);
Map<String, DatasetFile> existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
Dataset dataset = datasetRepository.getById(datasetId);
dataset.setFiles(existDatasetFileList);

// 批量同步数据集文件表
asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths);
datasetRepository.updateById(dataset);
log.info("Starting file scan, total files: {}", filePaths.size());
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
}
}

private void asyncDatasetFile(List<DatasetFile> datasetFiles, Map<String, DatasetFile> existDatasetFilePathMap, Dataset dataset, List<DatasetFile> existDatasetFileList, List<String> filePaths) {
if (CollectionUtils.isNotEmpty(datasetFiles)) {
for (DatasetFile datasetFile : datasetFiles) {
if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) {
DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath());
dataset.removeFile(existDatasetFile);
existDatasetFile.setFileSize(datasetFile.getFileSize());
dataset.addFile(existDatasetFile);
dataset.active();
datasetFileRepository.updateById(existDatasetFile);
} else {
dataset.addFile(datasetFile);
dataset.active();
datasetFileRepository.save(datasetFile);
}
}
log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size());
} else {
log.warn("未扫描到有效文件");
}
for (DatasetFile datasetFile : existDatasetFileList) {
String existFilePath = datasetFile.getFilePath();
for (String filePath : filePaths) {
if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) {
if (Files.notExists(Paths.get(existFilePath))) {
dataset.removeFile(datasetFile);
datasetFileRepository.removeById(datasetFile.getId());
}
}
}
}
}

private List<String> getFilePaths(String dataSourceId) {
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
if (taskDetail == null) {
log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId);
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
return Collections.emptyList();
}
Path targetPath = Paths.get(taskDetail.getTargetPath());
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
return Collections.emptyList();
}

try (Stream<Path> paths = Files.walk(targetPath, 1)) {
return paths
.filter(Files::isRegularFile) // 只保留文件,排除目录
.map(Path::toString) // 转换为字符串路径
.collect(Collectors.toList());
} catch (IOException e) {
log.error("Fail to scan directory: {}", targetPath, e);
return Collections.emptyList();
}
log.info("获取到归集任务详情: {}", taskDetail);
return Collections.singletonList(taskDetail.getTargetPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import com.datamate.common.infrastructure.exception.BusinessAssert;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.datamanagement.common.enums.DuplicateMethod;
import com.datamate.datamanagement.domain.contants.DatasetConstant;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo;
import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
Expand Down Expand Up @@ -45,6 +47,8 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

Expand All @@ -63,6 +67,9 @@ public class DatasetFileApplicationService {
@Value("${datamate.data-management.base-path:/dataset}")
private String datasetBasePath;

@Value("${datamate.data-management.file.duplicate:COVER}")
private DuplicateMethod duplicateMethod;

@Autowired
public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository,
DatasetRepository datasetRepository, FileService fileService) {
Expand Down Expand Up @@ -254,13 +261,36 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque
.filePath(savedFile.getPath())
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
.build();

datasetFileRepository.save(datasetFile);
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
setDatasetFileId(datasetFile, dataset);
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
dataset.active();
datasetRepository.updateById(dataset);
}

/**
* 为数据集文件设置文件id
*
* @param datasetFile 要设置id的文件
* @param dataset 数据集(包含文件列表)
*/
private void setDatasetFileId(DatasetFile datasetFile, Dataset dataset) {
Map<String, DatasetFile> existDatasetFilMap = dataset.getFiles().stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
DatasetFile existDatasetFile = existDatasetFilMap.get(datasetFile.getFilePath());
if (Objects.isNull(existDatasetFile)) {
return;
}
if (duplicateMethod == DuplicateMethod.ERROR) {
log.error("file {} already exists in dataset {}", datasetFile.getFileName(), datasetFile.getDatasetId());
throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS);
}
if (duplicateMethod == DuplicateMethod.COVER) {
dataset.removeFile(existDatasetFile);
datasetFile.setId(existDatasetFile.getId());
}
}

/**
* 复制文件到数据集目录
*
Expand All @@ -273,6 +303,8 @@ public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesReques
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
List<DatasetFile> copiedFiles = new ArrayList<>();
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
dataset.setFiles(existDatasetFiles);
for (String sourceFilePath : req.sourcePaths()) {
Path sourcePath = Paths.get(sourceFilePath);
if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) {
Expand All @@ -292,10 +324,11 @@ public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesReques
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.build();
setDatasetFileId(datasetFile, dataset);
dataset.addFile(datasetFile);
copiedFiles.add(datasetFile);
}
datasetFileRepository.saveBatch(copiedFiles, 100);
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
dataset.active();
datasetRepository.updateById(dataset);
CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset));
Expand Down
Loading