11package com .dataengine .datamanagement .application ;
22
3- import com .dataengine .datamanagement .common .enums .DatasetStatusType ;
3+ import com .baomidou .mybatisplus .core .metadata .IPage ;
4+ import com .baomidou .mybatisplus .extension .plugins .pagination .Page ;
5+ import com .dataengine .common .infrastructure .exception .BusinessAssert ;
6+ import com .dataengine .common .interfaces .PagedResponse ;
47import com .dataengine .datamanagement .domain .model .dataset .Dataset ;
58import com .dataengine .datamanagement .domain .model .dataset .DatasetFile ;
69import com .dataengine .datamanagement .domain .model .dataset .Tag ;
710import com .dataengine .datamanagement .infrastructure .client .CollectionTaskClient ;
811import com .dataengine .datamanagement .infrastructure .client .dto .CollectionTaskDetailResponse ;
912import com .dataengine .datamanagement .infrastructure .client .dto .LocalCollectionConfig ;
13+ import com .dataengine .datamanagement .infrastructure .exception .DataManagementErrorCode ;
1014import com .dataengine .datamanagement .infrastructure .persistence .mapper .TagMapper ;
1115import com .dataengine .datamanagement .infrastructure .persistence .repository .DatasetFileRepository ;
1216import com .dataengine .datamanagement .infrastructure .persistence .repository .DatasetRepository ;
1317import com .dataengine .datamanagement .interfaces .converter .DatasetConverter ;
14- import com .dataengine .datamanagement .interfaces .dto .AllDatasetStatisticsResponse ;
15- import com .dataengine .datamanagement .interfaces .dto .CreateDatasetRequest ;
16- import com .dataengine .datamanagement .interfaces .dto .DatasetPagingQuery ;
18+ import com .dataengine .datamanagement .interfaces .dto .*;
1719import com .fasterxml .jackson .databind .ObjectMapper ;
20+ import lombok .RequiredArgsConstructor ;
1821import lombok .extern .slf4j .Slf4j ;
1922import org .apache .commons .collections4 .CollectionUtils ;
20- import org .apache .commons .lang3 .StringUtils ;
21- import org .apache .ibatis .session .RowBounds ;
22- import org .springframework .beans .factory .annotation .Autowired ;
2323import org .springframework .beans .factory .annotation .Value ;
24- import org .springframework .data .domain .Page ;
25- import org .springframework .data .domain .PageImpl ;
26- import org .springframework .data .domain .PageRequest ;
2724import org .springframework .scheduling .annotation .Async ;
2825import org .springframework .stereotype .Service ;
2926import org .springframework .transaction .annotation .Transactional ;
27+ import org .springframework .util .StringUtils ;
3028
31- import java .time .LocalDateTime ;
3229import java .util .*;
3330import java .util .function .Function ;
3431import java .util .stream .Collectors ;
3936@ Slf4j
4037@ Service
4138@ Transactional
39+ @ RequiredArgsConstructor
4240public class DatasetApplicationService {
43-
4441 private final DatasetRepository datasetRepository ;
4542 private final TagMapper tagMapper ;
4643 private final DatasetFileRepository datasetFileRepository ;
@@ -51,104 +48,54 @@ public class DatasetApplicationService {
5148 @ Value ("${dataset.base.path:/dataset}" )
5249 private String datasetBasePath ;
5350
54- @ Autowired
55- public DatasetApplicationService (DatasetRepository datasetRepository ,
56- TagMapper tagMapper ,
57- DatasetFileRepository datasetFileRepository ,
58- CollectionTaskClient collectionTaskClient ,
59- FileMetadataService fileMetadataService ,
60- ObjectMapper objectMapper ) {
61- this .datasetRepository = datasetRepository ;
62- this .tagMapper = tagMapper ;
63- this .datasetFileRepository = datasetFileRepository ;
64- this .collectionTaskClient = collectionTaskClient ;
65- this .fileMetadataService = fileMetadataService ;
66- this .objectMapper = objectMapper ;
67- }
68-
6951 /**
7052 * 创建数据集
7153 */
7254 @ Transactional
7355 public Dataset createDataset (CreateDatasetRequest createDatasetRequest ) {
74- if (datasetRepository .findByName (createDatasetRequest .getName ()) != null ) {
75- throw new IllegalArgumentException ("Dataset with name '" + createDatasetRequest .getName () + "' already exists" );
76- }
77-
56+ BusinessAssert .isTrue (datasetRepository .findByName (createDatasetRequest .getName ()) == null , DataManagementErrorCode .DATASET_ALREADY_EXISTS );
7857 // 创建数据集对象
7958 Dataset dataset = DatasetConverter .INSTANCE .convertToDataset (createDatasetRequest );
8059 dataset .initCreateParam (datasetBasePath );
81- datasetRepository .save (dataset );
82-
8360 // 处理标签
84- Set <Tag > processedTags = new HashSet <>();
85- if (CollectionUtils .isNotEmpty (createDatasetRequest .getTags ())) {
86- processedTags = processTagNames (createDatasetRequest .getTags ());
87- for (Tag t : processedTags ) {
88- tagMapper .insertDatasetTag (dataset .getId (), t .getId ());
89- }
90- }
61+ Set <Tag > processedTags = Optional .ofNullable (createDatasetRequest .getTags ())
62+ .filter (CollectionUtils ::isNotEmpty )
63+ .map (this ::processTagNames )
64+ .orElseGet (HashSet ::new );
65+ dataset .setTags (processedTags );
66+ datasetRepository .save (dataset );
9167
92- if (StringUtils .isNotBlank (createDatasetRequest .getDataSource ())) {
68+ //todo 需要解耦这块逻辑
69+ if (StringUtils .hasText (createDatasetRequest .getDataSource ())) {
9370 // 数据源id不为空,使用异步线程进行文件扫盘落库
9471 processDataSourceAsync (dataset .getId (), createDatasetRequest .getDataSource ());
9572 }
96-
97- // 返回创建的数据集,包含标签信息
98- Dataset createdDataset = datasetRepository .getById (dataset .getId ());
99- createdDataset .getTags ().addAll (processedTags );
100- return createdDataset ;
73+ return dataset ;
10174 }
10275
103- /**
104- * 更新数据集
105- */
106- public Dataset updateDataset (String datasetId , String name , String description ,
107- List <String > tagNames , String status ) {
76+ public Dataset updateDataset (String datasetId , UpdateDatasetRequest updateDatasetRequest ) {
10877 Dataset dataset = datasetRepository .getById (datasetId );
109- if (dataset == null ) {
110- throw new IllegalArgumentException ("Dataset not found: " + datasetId );
78+ BusinessAssert .notNull (dataset , DataManagementErrorCode .DATASET_NOT_FOUND );
79+ if (StringUtils .hasText (updateDatasetRequest .getName ())) {
80+ dataset .setName (updateDatasetRequest .getName ());
11181 }
112-
113- if (name != null && !name .isEmpty ()) dataset .setName (name );
114- if (description != null ) dataset .setDescription (description );
115- if (status != null && !status .isEmpty ()) dataset .setStatus (DatasetStatusType .valueOf (status ));
116- dataset .setUpdatedAt (LocalDateTime .now ());
117-
118- Set <Tag > processedTags = new HashSet <>();
119- if (tagNames != null ) {
120- tagMapper .deleteDatasetTagsByDatasetId (datasetId );
121- if (!tagNames .isEmpty ()) {
122- processedTags = processTagNames (tagNames );
123- for (Tag t : processedTags ) {
124- tagMapper .insertDatasetTag (datasetId , t .getId ());
125- }
126- }
127- } else {
128- // 如果没有传入标签参数,保持原有标签
129- List <Tag > existingTags = tagMapper .findByDatasetId (datasetId );
130- if (existingTags != null ) {
131- processedTags .addAll (existingTags );
132- }
82+ if (StringUtils .hasText (updateDatasetRequest .getDescription ())) {
83+ dataset .setDescription (updateDatasetRequest .getDescription ());
84+ }
85+ if (CollectionUtils .isNotEmpty (updateDatasetRequest .getTags ())) {
86+ dataset .setTags (processTagNames (updateDatasetRequest .getTags ()));
87+ }
88+ if (Objects .nonNull (updateDatasetRequest .getStatus ())) {
89+ dataset .setStatus (updateDatasetRequest .getStatus ());
13390 }
134-
13591 datasetRepository .updateById (dataset );
136-
137- // 返回更新后的数据集,包含标签信息
138- Dataset updatedDataset = datasetRepository .getById (datasetId );
139- updatedDataset .getTags ().addAll (processedTags );
140- return updatedDataset ;
92+ return dataset ;
14193 }
14294
14395 /**
14496 * 删除数据集
14597 */
14698 public void deleteDataset (String datasetId ) {
147- Dataset dataset = datasetRepository .getById (datasetId );
148- if (dataset == null ) {
149- throw new IllegalArgumentException ("Dataset not found: " + datasetId );
150- }
151- tagMapper .deleteDatasetTagsByDatasetId (datasetId );
15299 datasetRepository .removeById (datasetId );
153100 }
154101
@@ -158,36 +105,18 @@ public void deleteDataset(String datasetId) {
158105 @ Transactional (readOnly = true )
159106 public Dataset getDataset (String datasetId ) {
160107 Dataset dataset = datasetRepository .getById (datasetId );
161- if (dataset == null ) {
162- throw new IllegalArgumentException ("Dataset not found: " + datasetId );
163- }
164- // 载入标签
165- List <Tag > tags = tagMapper .findByDatasetId (datasetId );
166- if (tags != null ) {
167- dataset .getTags ().addAll (tags );
168- }
108+ BusinessAssert .notNull (dataset , DataManagementErrorCode .DATASET_NOT_FOUND );
169109 return dataset ;
170110 }
171111
172112 /**
173113 * 分页查询数据集
174114 */
175115 @ Transactional (readOnly = true )
176- public Page <Dataset > getDatasets (DatasetPagingQuery query ) {
177- RowBounds bounds = new RowBounds (query .getPage () * query .getSize (), query .getSize ());
178- List <Dataset > content = datasetRepository .findByCriteria (query .getType (), query .getStatus (), query .getKeyword (), query .getTagList (), bounds );
179- long total = datasetRepository .countByCriteria (query .getType (), query .getStatus (), query .getKeyword (), query .getTagList ());
180-
181- // 为每个数据集填充标签信息
182- if (CollectionUtils .isNotEmpty (content )) {
183- for (Dataset dataset : content ) {
184- List <Tag > tags = tagMapper .findByDatasetId (dataset .getId ());
185- if (tags != null ) {
186- dataset .getTags ().addAll (tags );
187- }
188- }
189- }
190- return new PageImpl <>(content , PageRequest .of (query .getPage (), query .getSize ()), total );
116+ public PagedResponse <DatasetResponse > getDatasets (DatasetPagingQuery query ) {
117+ IPage <Dataset > page = new Page <>(query .getPage (), query .getSize ());
118+ page = datasetRepository .findByCriteria (page , query );
119+ return PagedResponse .of (DatasetConverter .INSTANCE .convertToResponse (page .getRecords ()), page .getCurrent (), page .getTotal (), page .getPages ());
191120 }
192121
193122 /**
@@ -272,7 +201,8 @@ public AllDatasetStatisticsResponse getAllDatasetStatistics() {
272201
273202 /**
274203 * 异步处理数据源文件扫描
275- * @param datasetId 数据集ID
204+ *
205+ * @param datasetId 数据集ID
276206 * @param dataSourceId 数据源ID(归集任务ID)
277207 */
278208 @ Async
@@ -309,8 +239,7 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
309239 List <DatasetFile > datasetFiles = fileMetadataService .scanFiles (filePaths , datasetId );
310240 // 查询数据集中已存在的文件
311241 List <DatasetFile > existDatasetFileList = datasetFileRepository .findAllByDatasetId (datasetId );
312- Map <String , DatasetFile > existDatasetFilePathMap = existDatasetFileList .
313- stream ().collect (Collectors .toMap (DatasetFile ::getFilePath , Function .identity ()));
242+ Map <String , DatasetFile > existDatasetFilePathMap = existDatasetFileList .stream ().collect (Collectors .toMap (DatasetFile ::getFilePath , Function .identity ()));
314243 Dataset dataset = datasetRepository .getById (datasetId );
315244
316245 // 6. 批量插入数据集文件表
0 commit comments