Skip to content

Commit a07fba2

Browse files
authored
feature:数据集导入数据集支持选择归集任务导入 (#92)
* feature: 实现obs归集 * feature: 增加数据集中出现同名文件时的处理方式 * feature: 前端数据集导入数据时增加可以选择归集任务导入
1 parent 4506fa8 commit a07fba2

File tree

12 files changed

+168
-209
lines changed

12 files changed

+168
-209
lines changed

backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.datamate.collection.domain.process.ProcessRunner;
77
import com.datamate.collection.infrastructure.datax.config.MysqlConfig;
88
import com.datamate.collection.infrastructure.datax.config.NasConfig;
9+
import com.datamate.collection.infrastructure.datax.config.ObsConfig;
910
import com.datamate.common.infrastructure.exception.BusinessException;
1011
import com.datamate.common.infrastructure.exception.SystemErrorCode;
1112
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -92,18 +93,21 @@ private String getJobConfig(CollectionTask task) {
9293
try {
9394
ObjectMapper objectMapper = new ObjectMapper();
9495
TemplateType templateType = task.getTaskType();
95-
switch (templateType) {
96-
case NAS:
96+
return switch (templateType) {
97+
case NAS -> {
9798
// NAS 特殊处理
9899
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
99-
return nasConfig.toJobConfig(objectMapper, task);
100-
case OBS:
101-
case MYSQL:
100+
yield nasConfig.toJobConfig(objectMapper, task);
101+
}
102+
case OBS -> {
103+
ObsConfig obsConfig = objectMapper.readValue(task.getConfig(), ObsConfig.class);
104+
yield obsConfig.toJobConfig(objectMapper, task);
105+
}
106+
case MYSQL -> {
102107
MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class);
103-
return mysqlConfig.toJobConfig(objectMapper, task);
104-
default:
105-
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
106-
}
108+
yield mysqlConfig.toJobConfig(objectMapper, task);
109+
}
110+
};
107111
} catch (Exception e) {
108112
log.error("Failed to parse task config", e);
109113
throw new RuntimeException("Failed to parse task config", e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
import com.datamate.collection.domain.model.entity.CollectionTask;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
/**
13+
* OBS 归集配置类
14+
*
15+
* @since 2025/11/18
16+
*/
17+
@Getter
18+
@Setter
19+
public class ObsConfig implements BaseConfig{
20+
private String endpoint;
21+
private String bucket;
22+
private String accessKey;
23+
private String secretKey;
24+
private String prefix;
25+
26+
/**
27+
* 将当前 OBS 配置构造成 DataX 所需的 job JSON 字符串。
28+
*/
29+
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
30+
Map<String, Object> parameter = new HashMap<>();
31+
if (endpoint != null) parameter.put("endpoint", endpoint);
32+
if (bucket != null) parameter.put("bucket", bucket);
33+
if (accessKey != null) parameter.put("accessKey", accessKey);
34+
if (secretKey != null) parameter.put("secretKey", secretKey);
35+
if (prefix != null) parameter.put("prefix", prefix);
36+
parameter.put("destPath", task.getTargetPath());
37+
38+
Map<String, Object> job = new HashMap<>();
39+
Map<String, Object> content = new HashMap<>();
40+
Map<String, Object> reader = new HashMap<>();
41+
reader.put("name", "obsreader");
42+
reader.put("parameter", parameter);
43+
content.put("reader", reader);
44+
45+
Map<String, Object> writer = new HashMap<>();
46+
writer.put("name", "obswriter");
47+
writer.put("parameter", parameter);
48+
content.put("writer", writer);
49+
50+
job.put("content", List.of(content));
51+
Map<String, Object> setting = new HashMap<>();
52+
Map<String, Object> channel = new HashMap<>();
53+
channel.put("channel", 2);
54+
setting.put("speed", channel);
55+
job.put("setting", setting);
56+
57+
Map<String, Object> jobConfig = new HashMap<>();
58+
jobConfig.put("job", job);
59+
return objectMapper.writeValueAsString(jobConfig);
60+
}
61+
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java

Lines changed: 23 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@
1111
import com.datamate.datamanagement.domain.model.dataset.Tag;
1212
import com.datamate.datamanagement.infrastructure.client.CollectionTaskClient;
1313
import com.datamate.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse;
14-
import com.datamate.datamanagement.infrastructure.client.dto.LocalCollectionConfig;
1514
import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode;
1615
import com.datamate.datamanagement.infrastructure.persistence.mapper.TagMapper;
1716
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
1817
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
1918
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
2019
import com.datamate.datamanagement.interfaces.dto.*;
21-
import com.fasterxml.jackson.databind.ObjectMapper;
2220
import lombok.RequiredArgsConstructor;
2321
import lombok.extern.slf4j.Slf4j;
2422
import org.apache.commons.collections4.CollectionUtils;
@@ -28,11 +26,13 @@
2826
import org.springframework.transaction.annotation.Transactional;
2927
import org.springframework.util.StringUtils;
3028

29+
import java.io.IOException;
3130
import java.nio.file.Files;
31+
import java.nio.file.Path;
3232
import java.nio.file.Paths;
3333
import java.util.*;
34-
import java.util.function.Function;
3534
import java.util.stream.Collectors;
35+
import java.util.stream.Stream;
3636

3737
/**
3838
* 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键)
@@ -46,8 +46,7 @@ public class DatasetApplicationService {
4646
private final TagMapper tagMapper;
4747
private final DatasetFileRepository datasetFileRepository;
4848
private final CollectionTaskClient collectionTaskClient;
49-
private final FileMetadataService fileMetadataService;
50-
private final ObjectMapper objectMapper;
49+
private final DatasetFileApplicationService datasetFileApplicationService;
5150

5251
@Value("${datamate.data-management.base-path:/dataset}")
5352
private String datasetBasePath;
@@ -223,68 +222,38 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() {
223222
@Async
224223
public void processDataSourceAsync(String datasetId, String dataSourceId) {
225224
try {
226-
log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);
225+
log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId);
227226
List<String> filePaths = getFilePaths(dataSourceId);
228227
if (CollectionUtils.isEmpty(filePaths)) {
229228
return;
230229
}
231-
log.info("开始扫描文件,共 {} 个文件路径", filePaths.size());
232-
233-
List<DatasetFile> datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId);
234-
// 查询数据集中已存在的文件
235-
List<DatasetFile> existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId);
236-
Map<String, DatasetFile> existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
237-
Dataset dataset = datasetRepository.getById(datasetId);
238-
dataset.setFiles(existDatasetFileList);
239-
240-
// 批量同步数据集文件表
241-
asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths);
242-
datasetRepository.updateById(dataset);
230+
log.info("Starting file scan, total files: {}", filePaths.size());
231+
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
243232
} catch (Exception e) {
244233
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
245234
}
246235
}
247236

248-
private void asyncDatasetFile(List<DatasetFile> datasetFiles, Map<String, DatasetFile> existDatasetFilePathMap, Dataset dataset, List<DatasetFile> existDatasetFileList, List<String> filePaths) {
249-
if (CollectionUtils.isNotEmpty(datasetFiles)) {
250-
for (DatasetFile datasetFile : datasetFiles) {
251-
if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) {
252-
DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath());
253-
dataset.removeFile(existDatasetFile);
254-
existDatasetFile.setFileSize(datasetFile.getFileSize());
255-
dataset.addFile(existDatasetFile);
256-
dataset.active();
257-
datasetFileRepository.updateById(existDatasetFile);
258-
} else {
259-
dataset.addFile(datasetFile);
260-
dataset.active();
261-
datasetFileRepository.save(datasetFile);
262-
}
263-
}
264-
log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size());
265-
} else {
266-
log.warn("未扫描到有效文件");
267-
}
268-
for (DatasetFile datasetFile : existDatasetFileList) {
269-
String existFilePath = datasetFile.getFilePath();
270-
for (String filePath : filePaths) {
271-
if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) {
272-
if (Files.notExists(Paths.get(existFilePath))) {
273-
dataset.removeFile(datasetFile);
274-
datasetFileRepository.removeById(datasetFile.getId());
275-
}
276-
}
277-
}
278-
}
279-
}
280-
281237
private List<String> getFilePaths(String dataSourceId) {
282238
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
283239
if (taskDetail == null) {
284-
log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId);
240+
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
241+
return Collections.emptyList();
242+
}
243+
Path targetPath = Paths.get(taskDetail.getTargetPath());
244+
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
245+
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
246+
return Collections.emptyList();
247+
}
248+
249+
try (Stream<Path> paths = Files.walk(targetPath, 1)) {
250+
return paths
251+
.filter(Files::isRegularFile) // 只保留文件,排除目录
252+
.map(Path::toString) // 转换为字符串路径
253+
.collect(Collectors.toList());
254+
} catch (IOException e) {
255+
log.error("Fail to scan directory: {}", targetPath, e);
285256
return Collections.emptyList();
286257
}
287-
log.info("获取到归集任务详情: {}", taskDetail);
288-
return Collections.singletonList(taskDetail.getTargetPath());
289258
}
290259
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
import com.datamate.common.infrastructure.exception.BusinessAssert;
88
import com.datamate.common.infrastructure.exception.BusinessException;
99
import com.datamate.common.infrastructure.exception.SystemErrorCode;
10+
import com.datamate.datamanagement.common.enums.DuplicateMethod;
1011
import com.datamate.datamanagement.domain.contants.DatasetConstant;
1112
import com.datamate.datamanagement.domain.model.dataset.Dataset;
1213
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
1314
import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo;
15+
import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode;
1416
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
1517
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
1618
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
@@ -45,6 +47,8 @@
4547
import java.time.format.DateTimeFormatter;
4648
import java.util.*;
4749
import java.util.concurrent.CompletableFuture;
50+
import java.util.function.Function;
51+
import java.util.stream.Collectors;
4852
import java.util.zip.ZipEntry;
4953
import java.util.zip.ZipOutputStream;
5054

@@ -63,6 +67,9 @@ public class DatasetFileApplicationService {
6367
@Value("${datamate.data-management.base-path:/dataset}")
6468
private String datasetBasePath;
6569

70+
@Value("${datamate.data-management.file.duplicate:COVER}")
71+
private DuplicateMethod duplicateMethod;
72+
6673
@Autowired
6774
public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository,
6875
DatasetRepository datasetRepository, FileService fileService) {
@@ -254,13 +261,36 @@ private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileReque
254261
.filePath(savedFile.getPath())
255262
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
256263
.build();
257-
258-
datasetFileRepository.save(datasetFile);
264+
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
265+
setDatasetFileId(datasetFile, dataset);
266+
datasetFileRepository.saveOrUpdate(datasetFile);
259267
dataset.addFile(datasetFile);
260268
dataset.active();
261269
datasetRepository.updateById(dataset);
262270
}
263271

272+
/**
273+
* 为数据集文件设置文件id
274+
*
275+
* @param datasetFile 要设置id的文件
276+
* @param dataset 数据集(包含文件列表)
277+
*/
278+
private void setDatasetFileId(DatasetFile datasetFile, Dataset dataset) {
279+
Map<String, DatasetFile> existDatasetFilMap = dataset.getFiles().stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
280+
DatasetFile existDatasetFile = existDatasetFilMap.get(datasetFile.getFilePath());
281+
if (Objects.isNull(existDatasetFile)) {
282+
return;
283+
}
284+
if (duplicateMethod == DuplicateMethod.ERROR) {
285+
log.error("file {} already exists in dataset {}", datasetFile.getFileName(), datasetFile.getDatasetId());
286+
throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS);
287+
}
288+
if (duplicateMethod == DuplicateMethod.COVER) {
289+
dataset.removeFile(existDatasetFile);
290+
datasetFile.setId(existDatasetFile.getId());
291+
}
292+
}
293+
264294
/**
265295
* 复制文件到数据集目录
266296
*
@@ -273,6 +303,8 @@ public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesReques
273303
Dataset dataset = datasetRepository.getById(datasetId);
274304
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
275305
List<DatasetFile> copiedFiles = new ArrayList<>();
306+
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
307+
dataset.setFiles(existDatasetFiles);
276308
for (String sourceFilePath : req.sourcePaths()) {
277309
Path sourcePath = Paths.get(sourceFilePath);
278310
if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) {
@@ -292,10 +324,11 @@ public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesReques
292324
.uploadTime(currentTime)
293325
.lastAccessTime(currentTime)
294326
.build();
327+
setDatasetFileId(datasetFile, dataset);
295328
dataset.addFile(datasetFile);
296329
copiedFiles.add(datasetFile);
297330
}
298-
datasetFileRepository.saveBatch(copiedFiles, 100);
331+
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
299332
dataset.active();
300333
datasetRepository.updateById(dataset);
301334
CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset));

0 commit comments

Comments
 (0)