11package com .dataengine .datamanagement .application .service ;
22
33import com .dataengine .datamanagement .domain .model .dataset .Dataset ;
4+ import com .dataengine .datamanagement .domain .model .dataset .DatasetFile ;
45import com .dataengine .datamanagement .domain .model .dataset .Tag ;
6+ import com .dataengine .datamanagement .infrastructure .client .CollectionTaskClient ;
7+ import com .dataengine .datamanagement .infrastructure .client .dto .CollectionTaskDetailResponse ;
8+ import com .dataengine .datamanagement .infrastructure .client .dto .LocalCollectionConfig ;
59import com .dataengine .datamanagement .infrastructure .persistence .mapper .DatasetFileMapper ;
610import com .dataengine .datamanagement .infrastructure .persistence .mapper .DatasetMapper ;
711import com .dataengine .datamanagement .infrastructure .persistence .mapper .TagMapper ;
812import com .dataengine .datamanagement .interfaces .converter .DatasetConverter ;
913import com .dataengine .datamanagement .interfaces .dto .AllDatasetStatisticsResponse ;
1014import com .dataengine .datamanagement .interfaces .dto .CreateDatasetRequest ;
1115import com .dataengine .datamanagement .interfaces .dto .DatasetPagingQuery ;
16+ import com .fasterxml .jackson .databind .ObjectMapper ;
17+ import lombok .extern .slf4j .Slf4j ;
1218import org .apache .commons .collections4 .CollectionUtils ;
19+ import org .apache .commons .lang3 .StringUtils ;
1320import org .apache .ibatis .session .RowBounds ;
1421import org .springframework .beans .factory .annotation .Autowired ;
1522import org .springframework .beans .factory .annotation .Value ;
1623import org .springframework .data .domain .Page ;
1724import org .springframework .data .domain .PageImpl ;
1825import org .springframework .data .domain .PageRequest ;
26+ import org .springframework .scheduling .annotation .Async ;
1927import org .springframework .stereotype .Service ;
2028import org .springframework .transaction .annotation .Transactional ;
2129
2533/**
2634 * 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键)
2735 */
36+ @ Slf4j
2837@ Service
2938@ Transactional
3039public class DatasetApplicationService {
3140
3241 private final DatasetMapper datasetMapper ;
3342 private final TagMapper tagMapper ;
3443 private final DatasetFileMapper datasetFileMapper ;
44+ private final CollectionTaskClient collectionTaskClient ;
45+ private final FileMetadataService fileMetadataService ;
46+ private final ObjectMapper objectMapper ;
3547
3648 @ Value ("${dataset.base.path:/dataset}" )
3749 private String datasetBasePath ;
3850
3951 @ Autowired
40- public DatasetApplicationService (DatasetMapper datasetMapper , TagMapper tagMapper , DatasetFileMapper datasetFileMapper ) {
52+ public DatasetApplicationService (DatasetMapper datasetMapper ,
53+ TagMapper tagMapper ,
54+ DatasetFileMapper datasetFileMapper ,
55+ CollectionTaskClient collectionTaskClient ,
56+ FileMetadataService fileMetadataService ,
57+ ObjectMapper objectMapper ) {
4158 this .datasetMapper = datasetMapper ;
4259 this .tagMapper = tagMapper ;
4360 this .datasetFileMapper = datasetFileMapper ;
61+ this .collectionTaskClient = collectionTaskClient ;
62+ this .fileMetadataService = fileMetadataService ;
63+ this .objectMapper = objectMapper ;
4464 }
4565
4666 /**
@@ -66,6 +86,11 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
6686 }
6787 }
6888
89+ if (StringUtils .isNotBlank (createDatasetRequest .getDataSource ())) {
90+ // 数据源id不为空,使用异步线程进行文件扫盘落库
91+ processDataSourceAsync (dataset .getId (), createDatasetRequest .getDataSource ());
92+ }
93+
6994 // 返回创建的数据集,包含标签信息
7095 Dataset createdDataset = datasetMapper .findById (dataset .getId ());
7196 createdDataset .getTags ().addAll (processedTags );
@@ -241,4 +266,78 @@ public Map<String, Object> getDatasetStatistics(String datasetId) {
241266 public AllDatasetStatisticsResponse getAllDatasetStatistics () {
242267 return datasetMapper .getAllDatasetStatistics ();
243268 }
269+
270+ /**
271+ * 异步处理数据源文件扫描
272+ * @param datasetId 数据集ID
273+ * @param dataSourceId 数据源ID(归集任务ID)
274+ */
275+ @ Async
276+ public void processDataSourceAsync (String datasetId , String dataSourceId ) {
277+ try {
278+ log .info ("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}" , datasetId , dataSourceId );
279+
280+ // 1. 调用数据归集服务获取任务详情
281+ CollectionTaskDetailResponse taskDetail = collectionTaskClient .getTaskDetail (dataSourceId );
282+ if (taskDetail == null ) {
283+ log .error ("获取归集任务详情失败,任务ID: {}" , dataSourceId );
284+ return ;
285+ }
286+
287+ log .info ("获取到归集任务详情: {}" , taskDetail .getName ());
288+
289+ // 2. 解析任务配置
290+ LocalCollectionConfig config = parseTaskConfig (taskDetail .getConfig ());
291+ if (config == null ) {
292+ log .error ("解析任务配置失败,任务ID: {}" , dataSourceId );
293+ return ;
294+ }
295+
296+ // 3. 检查任务类型是否为 LOCAL_COLLECTION
297+ if (!"LOCAL_COLLECTION" .equalsIgnoreCase (config .getType ())) {
298+ log .info ("任务类型不是 LOCAL_COLLECTION,跳过文件扫描。任务类型: {}" , config .getType ());
299+ return ;
300+ }
301+
302+ // 4. 获取文件路径列表
303+ List <String > filePaths = config .getFilePaths ();
304+ if (CollectionUtils .isEmpty (filePaths )) {
305+ log .warn ("文件路径列表为空,任务ID: {}" , dataSourceId );
306+ return ;
307+ }
308+
309+ log .info ("开始扫描文件,共 {} 个文件路径" , filePaths .size ());
310+
311+ // 5. 扫描文件元数据
312+ List <DatasetFile > datasetFiles = fileMetadataService .scanFiles (filePaths , datasetId );
313+
314+ // 6. 批量插入数据集文件表
315+ if (CollectionUtils .isNotEmpty (datasetFiles )) {
316+ for (DatasetFile datasetFile : datasetFiles ) {
317+ datasetFileMapper .insert (datasetFile );
318+ }
319+ log .info ("文件元数据写入完成,共写入 {} 条记录" , datasetFiles .size ());
320+ } else {
321+ log .warn ("未扫描到有效文件" );
322+ }
323+
324+ } catch (Exception e ) {
325+ log .error ("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}" , datasetId , dataSourceId , e );
326+ }
327+ }
328+
329+ /**
330+ * 解析任务配置
331+ */
332+ private LocalCollectionConfig parseTaskConfig (Map <String , Object > configMap ) {
333+ try {
334+ if (configMap == null || configMap .isEmpty ()) {
335+ return null ;
336+ }
337+ return objectMapper .convertValue (configMap , LocalCollectionConfig .class );
338+ } catch (Exception e ) {
339+ log .error ("解析任务配置失败" , e );
340+ return null ;
341+ }
342+ }
244343}
0 commit comments