1- package com .datamate .cleaning .application . service ;
1+ package com .datamate .cleaning .application ;
22
33
4- import com .datamate .cleaning .application .httpclient .DatasetClient ;
54import com .datamate .cleaning .application .scheduler .CleaningTaskScheduler ;
6- import com .datamate .cleaning .domain .converter .OperatorInstanceConverter ;
7- import com .datamate .cleaning .domain .model .DatasetResponse ;
8- import com .datamate .cleaning .domain .model .ExecutorType ;
9- import com .datamate .cleaning .domain .model .OperatorInstancePo ;
10- import com .datamate .cleaning .domain .model .PagedDatasetFileResponse ;
5+ import com .datamate .cleaning .common .enums .CleaningTaskStatusEnum ;
6+ import com .datamate .cleaning .common .enums .ExecutorType ;
7+
118import com .datamate .cleaning .domain .model .TaskProcess ;
12- import com .datamate .cleaning .infrastructure .persistence .mapper .CleaningResultMapper ;
13- import com .datamate .cleaning .infrastructure .persistence .mapper .CleaningTaskMapper ;
14- import com .datamate .cleaning .infrastructure .persistence .mapper .OperatorInstanceMapper ;
9+ import com .datamate .cleaning .domain .repository .CleaningResultRepository ;
10+ import com .datamate .cleaning .domain .repository .CleaningTaskRepository ;
11+ import com .datamate .cleaning .domain .repository .OperatorInstanceRepository ;
12+
1513import com .datamate .cleaning .interfaces .dto .CleaningProcess ;
16- import com .datamate .cleaning .interfaces .dto .CleaningTask ;
14+ import com .datamate .cleaning .interfaces .dto .CleaningTaskDto ;
1715import com .datamate .cleaning .interfaces .dto .CreateCleaningTaskRequest ;
18- import com .datamate .cleaning .interfaces .dto .OperatorInstance ;
16+ import com .datamate .cleaning .interfaces .dto .OperatorInstanceDto ;
1917import com .datamate .common .infrastructure .exception .BusinessException ;
2018import com .datamate .common .infrastructure .exception .SystemErrorCode ;
19+ import com .datamate .datamanagement .application .DatasetApplicationService ;
20+ import com .datamate .datamanagement .application .DatasetFileApplicationService ;
21+ import com .datamate .datamanagement .common .enums .DatasetType ;
22+ import com .datamate .datamanagement .domain .model .dataset .Dataset ;
23+ import com .datamate .datamanagement .domain .model .dataset .DatasetFile ;
24+ import com .datamate .datamanagement .interfaces .dto .CreateDatasetRequest ;
2125import com .fasterxml .jackson .databind .JsonNode ;
2226import com .fasterxml .jackson .databind .ObjectMapper ;
2327import com .fasterxml .jackson .databind .PropertyNamingStrategies ;
2428import com .fasterxml .jackson .dataformat .yaml .YAMLFactory ;
2529import lombok .RequiredArgsConstructor ;
2630import lombok .extern .slf4j .Slf4j ;
31+ import org .springframework .data .domain .Page ;
2732import org .springframework .data .domain .PageRequest ;
2833import org .springframework .stereotype .Service ;
2934import org .springframework .transaction .annotation .Transactional ;
4247@ Service
4348@ RequiredArgsConstructor
4449public class CleaningTaskService {
45- private final CleaningTaskMapper cleaningTaskMapper ;
50+ private final CleaningTaskRepository CleaningTaskRepo ;
4651
47- private final OperatorInstanceMapper operatorInstanceMapper ;
52+ private final OperatorInstanceRepository operatorInstanceRepo ;
4853
49- private final CleaningResultMapper cleaningResultMapper ;
54+ private final CleaningResultRepository cleaningResultRepo ;
5055
5156 private final CleaningTaskScheduler taskScheduler ;
5257
58+ private final DatasetApplicationService datasetService ;
59+
60+ private final DatasetFileApplicationService datasetFileService ;
61+
5362 private final String DATASET_PATH = "/dataset" ;
5463
5564 private final String FLOW_PATH = "/flow" ;
5665
57- public List <CleaningTask > getTasks (String status , String keywords , Integer page , Integer size ) {
58- Integer offset = page * size ;
59- List <CleaningTask > tasks = cleaningTaskMapper .findTasks (status , keywords , size , offset );
66+ public List <CleaningTaskDto > getTasks (String status , String keywords , Integer page , Integer size ) {
67+ List <CleaningTaskDto > tasks = CleaningTaskRepo .findTasks (status , keywords , page , size );
6068 tasks .forEach (this ::setProcess );
6169 return tasks ;
6270 }
6371
64- private void setProcess (CleaningTask task ) {
65- int count = cleaningResultMapper .countByInstanceId (task .getId ());
72+ private void setProcess (CleaningTaskDto task ) {
73+ int count = cleaningResultRepo .countByInstanceId (task .getId ());
6674 task .setProgress (CleaningProcess .of (task .getFileCount (), count ));
6775 }
6876
6977 public int countTasks (String status , String keywords ) {
70- return cleaningTaskMapper .findTasks (status , keywords , null , null ).size ();
78+ return CleaningTaskRepo .findTasks (status , keywords , null , null ).size ();
7179 }
7280
7381 @ Transactional
74- public CleaningTask createTask (CreateCleaningTaskRequest request ) {
75- DatasetResponse destDataset = DatasetClient .createDataset (request .getDestDatasetName (),
76- request .getDestDatasetType ());
82+ public CleaningTaskDto createTask (CreateCleaningTaskRequest request ) {
83+ CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest ();
84+ createDatasetRequest .setName (request .getDestDatasetName ());
85+ createDatasetRequest .setDatasetType (DatasetType .valueOf (request .getDestDatasetType ()));
86+ Dataset destDataset = datasetService .createDataset (createDatasetRequest );
7787
78- DatasetResponse srcDataset = DatasetClient .getDataset (request .getSrcDatasetId ());
88+ Dataset srcDataset = datasetService .getDataset (request .getSrcDatasetId ());
7989
80- CleaningTask task = new CleaningTask ();
90+ CleaningTaskDto task = new CleaningTaskDto ();
8191 task .setName (request .getName ());
8292 task .setDescription (request .getDescription ());
83- task .setStatus (CleaningTask . StatusEnum .PENDING );
93+ task .setStatus (CleaningTaskStatusEnum .PENDING );
8494 String taskId = UUID .randomUUID ().toString ();
8595 task .setId (taskId );
8696 task .setSrcDatasetId (request .getSrcDatasetId ());
8797 task .setSrcDatasetName (request .getSrcDatasetName ());
8898 task .setDestDatasetId (destDataset .getId ());
8999 task .setDestDatasetName (destDataset .getName ());
90- task .setBeforeSize (srcDataset .getTotalSize ());
91- task .setFileCount (srcDataset .getFileCount ());
92- cleaningTaskMapper .insertTask (task );
100+ task .setBeforeSize (srcDataset .getSizeBytes ());
101+ task .setFileCount (srcDataset .getFileCount (). intValue () );
102+ CleaningTaskRepo .insertTask (task );
93103
94- List <OperatorInstancePo > instancePos = request .getInstance ().stream ()
95- .map (OperatorInstanceConverter .INSTANCE ::operatorToDo ).toList ();
96- operatorInstanceMapper .insertInstance (taskId , instancePos );
104+ operatorInstanceRepo .insertInstance (taskId , request .getInstance ());
97105
98106 prepareTask (task , request .getInstance ());
99107 scanDataset (taskId , request .getSrcDatasetId ());
100108 executeTask (taskId );
101109 return task ;
102110 }
103111
104- public CleaningTask getTask (String taskId ) {
105- CleaningTask task = cleaningTaskMapper .findTaskById (taskId );
112+ public CleaningTaskDto getTask (String taskId ) {
113+ CleaningTaskDto task = CleaningTaskRepo .findTaskById (taskId );
106114 setProcess (task );
107115 return task ;
108116 }
109117
110118 @ Transactional
111119 public void deleteTask (String taskId ) {
112- cleaningTaskMapper . deleteTask (taskId );
113- operatorInstanceMapper .deleteByInstanceId (taskId );
114- cleaningResultMapper .deleteByInstanceId (taskId );
120+ CleaningTaskRepo . deleteTaskById (taskId );
121+ operatorInstanceRepo .deleteByInstanceId (taskId );
122+ cleaningResultRepo .deleteByInstanceId (taskId );
115123 }
116124
117125 public void executeTask (String taskId ) {
118126 taskScheduler .executeTask (taskId );
119127 }
120128
121- private void prepareTask (CleaningTask task , List <OperatorInstance > instances ) {
129+ private void prepareTask (CleaningTaskDto task , List <OperatorInstanceDto > instances ) {
122130 TaskProcess process = new TaskProcess ();
123131 process .setInstanceId (task .getId ());
124132 process .setDatasetId (task .getDestDatasetId ());
@@ -153,13 +161,13 @@ private void scanDataset(String taskId, String srcDatasetId) {
153161 int pageNumber = 0 ;
154162 int pageSize = 500 ;
155163 PageRequest pageRequest = PageRequest .of (pageNumber , pageSize );
156- PagedDatasetFileResponse datasetFile ;
164+ Page < DatasetFile > datasetFiles ;
157165 do {
158- datasetFile = DatasetClient . getDatasetFile (srcDatasetId , pageRequest );
159- if (datasetFile . getContent () != null && datasetFile .getContent ().isEmpty ()) {
166+ datasetFiles = datasetFileService . getDatasetFiles (srcDatasetId , null , null , pageRequest );
167+ if (datasetFiles .getContent ().isEmpty ()) {
160168 break ;
161169 }
162- List <Map <String , Object >> files = datasetFile .getContent ().stream ()
170+ List <Map <String , Object >> files = datasetFiles .getContent ().stream ()
163171 .map (content -> Map .of ("fileName" , (Object ) content .getFileName (),
164172 "fileSize" , content .getFileSize (),
165173 "filePath" , content .getFilePath (),
@@ -168,7 +176,7 @@ private void scanDataset(String taskId, String srcDatasetId) {
168176 .toList ();
169177 writeListMapToJsonlFile (files , FLOW_PATH + "/" + taskId + "/dataset.jsonl" );
170178 pageNumber += 1 ;
171- } while (pageNumber < datasetFile .getTotalPages ());
179+ } while (pageNumber < datasetFiles .getTotalPages ());
172180 }
173181
174182 private void writeListMapToJsonlFile (List <Map <String , Object >> mapList , String fileName ) {
0 commit comments