11package com.dataengine.datamanagement.application.service;
22
33import com.dataengine.datamanagement.domain.model.dataset.Dataset;
4+ import com.dataengine.datamanagement.domain.model.dataset.DatasetFile;
45import com.dataengine.datamanagement.domain.model.dataset.Tag;
6+ import com.dataengine.datamanagement.infrastructure.client.CollectionTaskClient;
7+ import com.dataengine.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse;
8+ import com.dataengine.datamanagement.infrastructure.client.dto.LocalCollectionConfig;
59import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper;
610import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper;
711import com.dataengine.datamanagement.infrastructure.persistence.mapper.TagMapper;
812import com.dataengine.datamanagement.interfaces.converter.DatasetConverter;
913import com.dataengine.datamanagement.interfaces.dto.AllDatasetStatisticsResponse;
1014import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest;
1115import com.dataengine.datamanagement.interfaces.dto.DatasetPagingQuery;
16+ import com.fasterxml.jackson.databind.ObjectMapper;
17+ import lombok.extern.slf4j.Slf4j;
1218import org.apache.commons.collections4.CollectionUtils;
19+ import org.apache.commons.lang3.StringUtils;
1320import org.apache.ibatis.session.RowBounds;
1421import org.springframework.beans.factory.annotation.Autowired;
1522import org.springframework.beans.factory.annotation.Value;
1623import org.springframework.data.domain.Page;
1724import org.springframework.data.domain.PageImpl;
1825import org.springframework.data.domain.PageRequest;
26+ import org.springframework.scheduling.annotation.Async;
1927import org.springframework.stereotype.Service;
2028import org.springframework.transaction.annotation.Transactional;
2129
2533/**
2634 * 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键)
2735 */
36+ @Slf4j
2837@Service
2938@Transactional
3039public class DatasetApplicationService {
3140
3241 private final DatasetMapper datasetMapper;
3342 private final TagMapper tagMapper;
3443 private final DatasetFileMapper datasetFileMapper;
44+ private final CollectionTaskClient collectionTaskClient;
45+ private final FileMetadataService fileMetadataService;
46+ private final ObjectMapper objectMapper;
3547
3648 @Value("${dataset.base.path:/dataset}")
3749 private String datasetBasePath;
3850
3951 @Autowired
40- public DatasetApplicationService(DatasetMapper datasetMapper, TagMapper tagMapper, DatasetFileMapper datasetFileMapper) {
52+ public DatasetApplicationService(DatasetMapper datasetMapper,
53+ TagMapper tagMapper,
54+ DatasetFileMapper datasetFileMapper,
55+ CollectionTaskClient collectionTaskClient,
56+ FileMetadataService fileMetadataService,
57+ ObjectMapper objectMapper) {
4158 this.datasetMapper = datasetMapper;
4259 this.tagMapper = tagMapper;
4360 this.datasetFileMapper = datasetFileMapper;
61+ this.collectionTaskClient = collectionTaskClient;
62+ this.fileMetadataService = fileMetadataService;
63+ this.objectMapper = objectMapper;
4464 }
4565
4666 /**
@@ -66,6 +86,11 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
6686 }
6787 }
6888
89+ if (StringUtils.isNotBlank(createDatasetRequest.getDataSource())) {
90+ // 数据源id不为空,使用异步线程进行文件扫盘落库
91+ processDataSourceAsync(dataset.getId(), createDatasetRequest.getDataSource());
92+ }
93+
6994 // 返回创建的数据集,包含标签信息
7095 Dataset createdDataset = datasetMapper.findById(dataset.getId());
7196 createdDataset.getTags().addAll(processedTags);
@@ -241,4 +266,78 @@ public Map<String, Object> getDatasetStatistics(String datasetId) {
241266 public AllDatasetStatisticsResponse getAllDatasetStatistics() {
242267 return datasetMapper.getAllDatasetStatistics();
243268 }
269+
270+ /**
271+ * 异步处理数据源文件扫描
272+ * @param datasetId 数据集ID
273+ * @param dataSourceId 数据源ID(归集任务ID)
274+ */
275+ @Async
276+ public void processDataSourceAsync(String datasetId, String dataSourceId) {
277+ try {
278+ log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);
279+
280+ // 1. 调用数据归集服务获取任务详情
281+ CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId);
282+ if (taskDetail == null) {
283+ log.error("获取归集任务详情失败,任务ID: {}", dataSourceId);
284+ return;
285+ }
286+
287+ log.info("获取到归集任务详情: {}", taskDetail.getName());
288+
289+ // 2. 解析任务配置
290+ LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
291+ if (config == null) {
292+ log.error("解析任务配置失败,任务ID: {}", dataSourceId);
293+ return;
294+ }
295+
296+ // 3. 检查任务类型是否为 LOCAL_COLLECTION
297+ if (!"LOCAL_COLLECTION".equalsIgnoreCase(config.getType())) {
298+ log.info("任务类型不是 LOCAL_COLLECTION,跳过文件扫描。任务类型: {}", config.getType());
299+ return;
300+ }
301+
302+ // 4. 获取文件路径列表
303+ List<String> filePaths = config.getFilePaths();
304+ if (CollectionUtils.isEmpty(filePaths)) {
305+ log.warn("文件路径列表为空,任务ID: {}", dataSourceId);
306+ return;
307+ }
308+
309+ log.info("开始扫描文件,共 {} 个文件路径", filePaths.size());
310+
311+ // 5. 扫描文件元数据
312+ List<DatasetFile> datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId);
313+
314+ // 6. 批量插入数据集文件表
315+ if (CollectionUtils.isNotEmpty(datasetFiles)) {
316+ for (DatasetFile datasetFile : datasetFiles) {
317+ datasetFileMapper.insert(datasetFile);
318+ }
319+ log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size());
320+ } else {
321+ log.warn("未扫描到有效文件");
322+ }
323+
324+ } catch (Exception e) {
325+ log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
326+ }
327+ }
328+
329+ /**
330+ * 解析任务配置
331+ */
332+ private LocalCollectionConfig parseTaskConfig(Map<String, Object> configMap) {
333+ try {
334+ if (configMap == null || configMap.isEmpty()) {
335+ return null;
336+ }
337+ return objectMapper.convertValue(configMap, LocalCollectionConfig.class);
338+ } catch (Exception e) {
339+ log.error("解析任务配置失败", e);
340+ return null;
341+ }
342+ }
244343}
0 commit comments