2020import com .datamate .datamanagement .domain .model .dataset .Dataset ;
2121import com .datamate .datamanagement .domain .model .dataset .DatasetFile ;
2222import com .datamate .datamanagement .interfaces .dto .CreateDatasetRequest ;
23+ import com .datamate .operator .domain .repository .OperatorRepository ;
24+ import com .datamate .operator .infrastructure .exception .OperatorErrorCode ;
25+ import com .datamate .operator .interfaces .dto .OperatorDto ;
26+ import com .fasterxml .jackson .core .JsonProcessingException ;
2327import com .fasterxml .jackson .databind .JsonNode ;
2428import com .fasterxml .jackson .databind .ObjectMapper ;
2529import com .fasterxml .jackson .databind .PropertyNamingStrategies ;
2630import com .fasterxml .jackson .dataformat .yaml .YAMLFactory ;
2731import lombok .RequiredArgsConstructor ;
2832import lombok .extern .slf4j .Slf4j ;
33+ import org .apache .commons .collections4 .CollectionUtils ;
34+ import org .apache .commons .io .FileUtils ;
35+ import org .apache .commons .lang3 .StringUtils ;
2936import org .springframework .stereotype .Service ;
3037import org .springframework .transaction .annotation .Transactional ;
3138import org .yaml .snakeyaml .DumperOptions ;
3946import java .nio .file .Paths ;
4047import java .util .*;
4148import java .util .concurrent .atomic .AtomicReference ;
49+ import java .util .function .Function ;
50+ import java .util .function .Predicate ;
4251import java .util .regex .Matcher ;
4352import java .util .regex .Pattern ;
4453import java .util .stream .Collectors ;
@@ -52,6 +61,8 @@ public class CleaningTaskService {
5261
5362 private final OperatorInstanceRepository operatorInstanceRepo ;
5463
64+ private final OperatorRepository operatorRepo ;
65+
5566 private final CleaningResultRepository cleaningResultRepo ;
5667
5768 private final CleaningTaskScheduler taskScheduler ;
@@ -66,11 +77,16 @@ public class CleaningTaskService {
6677
6778 private final String FLOW_PATH = "/flow" ;
6879
69- private final Pattern LEVEL_PATTERN = Pattern .compile (
70- "\\ b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\ b" ,
71- Pattern .CASE_INSENSITIVE
80+ private static final Pattern STANDARD_LEVEL_PATTERN = Pattern .compile (
81+ "\\ b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\\ b"
82+ );
83+
84+ private static final Pattern EXCEPTION_SUFFIX_PATTERN = Pattern .compile (
85+ "\\ b\\ w+(Warning|Error|Exception)\\ b"
7286 );
7387
88+ private final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
89+
7490 public List <CleaningTaskDto > getTasks (String status , String keywords , Integer page , Integer size ) {
7591 List <CleaningTaskDto > tasks = cleaningTaskRepo .findTasks (status , keywords , page , size );
7692 tasks .forEach (this ::setProcess );
@@ -133,6 +149,7 @@ public List<CleaningResultDto> getTaskResults(String taskId) {
133149 }
134150
135151 public List <CleaningTaskLog > getTaskLog (String taskId ) {
152+ cleanTaskValidator .checkTaskId (taskId );
136153 String logPath = FLOW_PATH + "/" + taskId + "/output.log" ;
137154 try (Stream <String > lines = Files .lines (Paths .get (logPath ))) {
138155 List <CleaningTaskLog > logs = new ArrayList <>();
@@ -156,18 +173,31 @@ private String getLogLevel(String logLine, String defaultLevel) {
156173 return defaultLevel ;
157174 }
158175
159- Matcher matcher = LEVEL_PATTERN .matcher (logLine );
160- if (matcher .find ()) {
161- return matcher .group (1 ).toUpperCase ();
176+ Matcher stdMatcher = STANDARD_LEVEL_PATTERN .matcher (logLine );
177+ if (stdMatcher .find ()) {
178+ return stdMatcher .group (1 ).toUpperCase ();
179+ }
180+
181+ Matcher exMatcher = EXCEPTION_SUFFIX_PATTERN .matcher (logLine );
182+ if (exMatcher .find ()) {
183+ String match = exMatcher .group (1 ).toUpperCase ();
184+ if ("WARNING" .equals (match )) return "WARN" ;
185+ if ("ERROR" .equals (match ) || "EXCEPTION" .equals (match )) return "ERROR" ;
162186 }
163187 return defaultLevel ;
164188 }
165189
166190 @ Transactional
167191 public void deleteTask (String taskId ) {
192+ cleanTaskValidator .checkTaskId (taskId );
168193 cleaningTaskRepo .deleteTaskById (taskId );
169194 operatorInstanceRepo .deleteByInstanceId (taskId );
170195 cleaningResultRepo .deleteByInstanceId (taskId );
196+ try {
197+ FileUtils .deleteDirectory (new File (FLOW_PATH + "/" + taskId ));
198+ } catch (IOException e ) {
199+ log .warn ("Can't delete flow path with task id: {}." , taskId , e );
200+ }
171201 }
172202
173203 public void executeTask (String taskId ) {
@@ -180,14 +210,26 @@ public void executeTask(String taskId) {
180210 }
181211
182212 private void prepareTask (CleaningTaskDto task , List <OperatorInstanceDto > instances ) {
213+ List <OperatorDto > allOperators = operatorRepo .findAllOperators ();
214+ Map <String , OperatorDto > defaultSettings = allOperators .stream ()
215+ .filter (operatorDto -> StringUtils .isNotBlank (operatorDto .getSettings ()))
216+ .collect (Collectors .toMap (OperatorDto ::getId , Function .identity ()));
217+
183218 TaskProcess process = new TaskProcess ();
184219 process .setInstanceId (task .getId ());
185220 process .setDatasetId (task .getDestDatasetId ());
186221 process .setDatasetPath (FLOW_PATH + "/" + task .getId () + "/dataset.jsonl" );
187222 process .setExportPath (DATASET_PATH + "/" + task .getDestDatasetId ());
188223 process .setExecutorType (ExecutorType .DATAMATE .getValue ());
189224 process .setProcess (instances .stream ()
190- .map (instance -> Map .of (instance .getId (), instance .getOverrides ()))
225+ .map (instance -> {
226+ OperatorDto operatorDto = defaultSettings .get (instance .getId ());
227+ Map <String , Object > stringObjectMap = getDefaultValue (operatorDto );
228+ stringObjectMap .putAll (instance .getOverrides ());
229+ Map <String , Object > runtime = getRuntime (operatorDto );
230+ stringObjectMap .putAll (runtime );
231+ return Map .of (instance .getId (), stringObjectMap );
232+ })
191233 .toList ());
192234
193235 ObjectMapper jsonMapper = new ObjectMapper (new YAMLFactory ());
@@ -210,67 +252,113 @@ private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instanc
210252 }
211253 }
212254
213- private void scanDataset (String taskId , String srcDatasetId ) {
214- int pageNumber = 0 ;
215- int pageSize = 500 ;
216- PagingQuery pageRequest = new PagingQuery (pageNumber , pageSize );
217- PagedResponse <DatasetFile > datasetFiles ;
218- do {
219- datasetFiles = datasetFileService .getDatasetFiles (srcDatasetId , null , null ,null , pageRequest );
220- if (datasetFiles .getContent ().isEmpty ()) {
221- break ;
255+ private Map <String , Object > getDefaultValue (OperatorDto operatorDto ) {
256+ if (StringUtils .isBlank (operatorDto .getSettings ())) {
257+ return new HashMap <>();
258+ }
259+
260+ Map <String , Object > defaultSettings = new HashMap <>();
261+ try {
262+ Map <String , Map <String , Object >> settings = OBJECT_MAPPER .readValue (operatorDto .getSettings (), Map .class );
263+ for (Map .Entry <String , Map <String , Object >> entry : settings .entrySet ()) {
264+ String key = entry .getKey ();
265+ Map <String , Object > setting = entry .getValue ();
266+ String type = setting .get ("type" ).toString ();
267+ switch (type ) {
268+ case "slider" :
269+ case "switch" :
270+ case "select" :
271+ case "input" :
272+ case "radio" :
273+ case "checkbox" :
274+ if (setting .containsKey ("defaultVal" )) {
275+ defaultSettings .put (key , setting .get ("defaultVal" ));
276+ }
277+ break ;
278+ case "range" :
279+ List <Object > rangeDefault = getRangeDefault (setting );
280+ if (CollectionUtils .isNotEmpty (rangeDefault )) {
281+ defaultSettings .put (key , rangeDefault );
282+ }
283+ break ;
284+ default :
285+ }
222286 }
223- List <Map <String , Object >> files = datasetFiles .getContent ().stream ()
224- .map (content -> Map .of ("fileName" , (Object ) content .getFileName (),
225- "fileSize" , content .getFileSize (),
226- "filePath" , content .getFilePath (),
227- "fileType" , content .getFileType (),
228- "fileId" , content .getId ()))
229- .toList ();
230- writeListMapToJsonlFile (files , FLOW_PATH + "/" + taskId + "/dataset.jsonl" );
231- pageNumber += 1 ;
232- } while (pageNumber < datasetFiles .getTotalPages ());
287+ return defaultSettings ;
288+ } catch (JsonProcessingException e ) {
289+ throw BusinessException .of (OperatorErrorCode .SETTINGS_PARSE_FAILED , e .getMessage ());
290+ }
291+ }
292+
293+ private List <Object > getRangeDefault (Map <String , Object > setting ) {
294+ List <Object > defaultValue = new ArrayList <>();
295+ Object properties = setting .get ("properties" );
296+ if (properties instanceof List <?> list ) {
297+ for (Object o : list ) {
298+ Map <String , Object > map = OBJECT_MAPPER .convertValue (o , Map .class );
299+ if (map .containsKey ("defaultVal" )) {
300+ defaultValue .add (map .get ("defaultVal" ));
301+ }
302+ }
303+ }
304+ return defaultValue ;
305+ }
306+
307+ private Map <String , Object > getRuntime (OperatorDto operatorDto ) {
308+ if (StringUtils .isBlank (operatorDto .getRuntime ())) {
309+ return new HashMap <>();
310+ }
311+ try {
312+ return OBJECT_MAPPER .readValue (operatorDto .getRuntime (), Map .class );
313+ } catch (JsonProcessingException e ) {
314+ throw BusinessException .of (OperatorErrorCode .SETTINGS_PARSE_FAILED , e .getMessage ());
315+ }
316+ }
317+
318+ private void scanDataset (String taskId , String srcDatasetId ) {
319+ doScan (taskId , srcDatasetId , file -> true );
233320 }
234321
235322 private void scanDataset (String taskId , String srcDatasetId , Set <String > succeedFiles ) {
323+ doScan (taskId , srcDatasetId , file -> !succeedFiles .contains (file .getId ()));
324+ }
325+
326+ private void doScan (String taskId , String srcDatasetId , Predicate <DatasetFile > filterCondition ) {
327+ cleanTaskValidator .checkTaskId (taskId );
328+ String targetFilePath = FLOW_PATH + "/" + taskId + "/dataset.jsonl" ;
329+ File targetFile = new File (targetFilePath );
330+ if (targetFile .getParentFile () != null && !targetFile .getParentFile ().exists ()) {
331+ targetFile .getParentFile ().mkdirs ();
332+ }
333+
236334 int pageNumber = 0 ;
237335 int pageSize = 500 ;
238- PagingQuery pageRequest = new PagingQuery (pageNumber , pageSize );
239- PagedResponse <DatasetFile > datasetFiles ;
240- do {
241- datasetFiles = datasetFileService .getDatasetFiles (srcDatasetId , null , null ,null , pageRequest );
242- if (datasetFiles .getContent ().isEmpty ()) {
243- break ;
244- }
245- List <Map <String , Object >> files = datasetFiles .getContent ().stream ()
246- .filter (content -> !succeedFiles .contains (content .getId ()))
247- .map (content -> Map .of ("fileName" , (Object ) content .getFileName (),
336+ try (BufferedWriter writer = new BufferedWriter (new FileWriter (targetFile ))) {
337+ PagedResponse <DatasetFile > datasetFiles ;
338+ do {
339+ PagingQuery pageRequest = new PagingQuery (pageNumber , pageSize );
340+ datasetFiles = datasetFileService .getDatasetFiles (srcDatasetId , null , null , null , pageRequest );
341+ if (datasetFiles .getContent ().isEmpty ()) {
342+ break ;
343+ }
344+ for (DatasetFile content : datasetFiles .getContent ()) {
345+ if (!filterCondition .test (content )) {
346+ continue ;
347+ }
348+ Map <String , Object > fileMap = Map .of (
349+ "fileName" , content .getFileName (),
248350 "fileSize" , content .getFileSize (),
249351 "filePath" , content .getFilePath (),
250352 "fileType" , content .getFileType (),
251- "fileId" , content .getId ()))
252- .toList ();
253- writeListMapToJsonlFile (files , FLOW_PATH + "/" + taskId + "/dataset.jsonl" );
254- pageNumber += 1 ;
255- } while (pageNumber < datasetFiles .getTotalPages ());
256- }
257-
258- private void writeListMapToJsonlFile (List <Map <String , Object >> mapList , String fileName ) {
259- ObjectMapper objectMapper = new ObjectMapper ();
260-
261- try (BufferedWriter writer = new BufferedWriter (new FileWriter (fileName ))) {
262- if (!mapList .isEmpty ()) { // 检查列表是否为空,避免异常
263- String jsonString = objectMapper .writeValueAsString (mapList .getFirst ());
264- writer .write (jsonString );
265-
266- for (int i = 1 ; i < mapList .size (); i ++) {
353+ "fileId" , content .getId ()
354+ );
355+ writer .write (OBJECT_MAPPER .writeValueAsString (fileMap ));
267356 writer .newLine ();
268- jsonString = objectMapper .writeValueAsString (mapList .get (i ));
269- writer .write (jsonString );
270357 }
271- }
358+ pageNumber ++;
359+ } while (pageNumber < datasetFiles .getTotalPages ());
272360 } catch (IOException e ) {
273- log .error ("Failed to prepare dataset.jsonl." , e );
361+ log .error ("Failed to write dataset.jsonl for taskId: {}" , taskId , e );
274362 throw BusinessException .of (SystemErrorCode .FILE_SYSTEM_ERROR );
275363 }
276364 }
0 commit comments