Skip to content

Commit d43f5c2

Browse files
committed
Merge branch 'develop_930' of github.com:ModelEngine-Group/data-platform into develop_930
2 parents c837117 + 015e66f commit d43f5c2

File tree

48 files changed

+847
-292
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+847
-292
lines changed

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/service/CleaningTaskService.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.File;
3030
import java.io.FileWriter;
3131
import java.io.IOException;
32+
import java.time.LocalDateTime;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.UUID;
@@ -50,7 +51,11 @@ public class CleaningTaskService {
5051

5152
public List<CleaningTask> getTasks(String status, String keywords, Integer page, Integer size) {
5253
Integer offset = page * size;
53-
return cleaningTaskMapper.findTasksByStatus(status, keywords, size, offset);
54+
return cleaningTaskMapper.findTasks(status, keywords, size, offset);
55+
}
56+
57+
public int countTasks(String status, String keywords) {
58+
return cleaningTaskMapper.findTasks(status, keywords, null, null).size();
5459
}
5560

5661
@Transactional
@@ -68,16 +73,16 @@ public CleaningTask createTask(CreateCleaningTaskRequest request) {
6873
task.setSrcDatasetName(request.getSrcDatasetName());
6974
task.setDestDatasetId(datasetResponse.getId());
7075
task.setDestDatasetName(datasetResponse.getName());
76+
task.setBeforeSize(datasetResponse.getTotalSize());
7177
cleaningTaskMapper.insertTask(task);
7278

7379
List<OperatorInstancePo> instancePos = request.getInstance().stream()
7480
.map(OperatorInstanceConverter.INSTANCE::operatorToDo).toList();
7581
operatorInstanceMapper.insertInstance(taskId, instancePos);
7682

7783
prepareTask(task, request.getInstance());
78-
scanDataset(task.getId(), request.getSrcDatasetId());
79-
80-
taskExecutor.submit(() -> executeTask(task));
84+
scanDataset(taskId, request.getSrcDatasetId());
85+
executeTask(taskId);
8186
return task;
8287
}
8388

@@ -90,15 +95,14 @@ public void deleteTask(String taskId) {
9095
cleaningTaskMapper.deleteTask(taskId);
9196
}
9297

93-
public void executeTask(CleaningTask task) {
94-
task.setStatus(CleaningTask.StatusEnum.RUNNING);
95-
cleaningTaskMapper.updateTaskStatus(task);
96-
submitTask(task.getId());
98+
public void executeTask(String taskId) {
99+
taskExecutor.submit(() -> submitTask(taskId));
97100
}
98101

99102
private void prepareTask(CleaningTask task, List<OperatorInstance> instances) {
100103
TaskProcess process = new TaskProcess();
101104
process.setInstanceId(task.getId());
105+
process.setDatasetId(task.getDestDatasetId());
102106
process.setDatasetPath(FLOW_PATH + "/" + task.getId() + "/dataset.jsonl");
103107
process.setExportPath(DATASET_PATH + "/" + task.getDestDatasetId());
104108
process.setExecutorType(ExecutorType.DATA_PLATFORM.getValue());
@@ -137,7 +141,7 @@ private void scanDataset(String taskId, String srcDatasetId) {
137141
}
138142
List<Map<String, Object>> files = datasetFile.getContent().stream()
139143
.map(content -> Map.of("fileName", (Object) content.getFileName(),
140-
"fileSize", content.getSize() + "B",
144+
"fileSize", content.getSize(),
141145
"filePath", content.getFilePath(),
142146
"fileType", content.getFileType(),
143147
"fileId", content.getId()))
@@ -148,6 +152,11 @@ private void scanDataset(String taskId, String srcDatasetId) {
148152
}
149153

150154
private void submitTask(String taskId) {
155+
CleaningTask task = new CleaningTask();
156+
task.setId(taskId);
157+
task.setStatus(CleaningTask.StatusEnum.RUNNING);
158+
task.setStartedAt(LocalDateTime.now());
159+
cleaningTaskMapper.updateTask(task);
151160
RuntimeClient.submitTask(taskId);
152161
}
153162

@@ -175,6 +184,6 @@ public void stopTask(String taskId) {
175184
CleaningTask task = new CleaningTask();
176185
task.setId(taskId);
177186
task.setStatus(CleaningTask.StatusEnum.STOPPED);
178-
cleaningTaskMapper.updateTaskStatus(task);
187+
cleaningTaskMapper.updateTask(task);
179188
}
180189
}

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/service/CleaningTemplateService.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,45 @@
11
package com.dataengine.cleaning.application.service;
22

33

4+
import com.dataengine.cleaning.domain.converter.OperatorInstanceConverter;
5+
import com.dataengine.cleaning.domain.model.OperatorInstancePo;
46
import com.dataengine.cleaning.domain.model.TemplateWithInstance;
57
import com.dataengine.cleaning.infrastructure.persistence.mapper.CleaningTemplateMapper;
8+
import com.dataengine.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper;
69
import com.dataengine.cleaning.interfaces.dto.CleaningTemplate;
710
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
811
import com.dataengine.cleaning.interfaces.dto.OperatorResponse;
912
import com.dataengine.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
13+
import lombok.RequiredArgsConstructor;
1014
import org.apache.commons.lang3.StringUtils;
11-
import org.springframework.beans.factory.annotation.Autowired;
15+
import org.apache.ibatis.annotations.Param;
1216
import org.springframework.stereotype.Service;
1317
import org.springframework.transaction.annotation.Transactional;
1418

1519
import java.util.Comparator;
1620
import java.util.List;
1721
import java.util.Map;
22+
import java.util.UUID;
1823
import java.util.function.Function;
1924
import java.util.stream.Collectors;
2025

2126
@Service
27+
@RequiredArgsConstructor
2228
public class CleaningTemplateService {
29+
private final CleaningTemplateMapper cleaningTemplateMapper;
2330

24-
@Autowired
25-
private CleaningTemplateMapper cleaningTemplateMapper;
31+
private final OperatorInstanceMapper operatorInstanceMapper;
2632

27-
public List<CleaningTemplate> getTemplates() {
33+
public List<CleaningTemplate> getTemplates(String keywords, Integer page, Integer size) {
2834
List<OperatorResponse> allOperators = cleaningTemplateMapper.findAllOperators();
2935
Map<String, OperatorResponse> operatorsMap = allOperators.stream()
3036
.collect(Collectors.toMap(OperatorResponse::getId, Function.identity()));
3137

32-
List<TemplateWithInstance> allTemplates = cleaningTemplateMapper.findAllTemplates();
38+
Integer offset = null;
39+
if (page != null && size != null) {
40+
offset = page * size;
41+
}
42+
List<TemplateWithInstance> allTemplates = cleaningTemplateMapper.findAllTemplates(keywords, size, offset);
3343
Map<String, List<TemplateWithInstance>> templatesMap = allTemplates.stream()
3444
.collect(Collectors.groupingBy(TemplateWithInstance::getId));
3545
return templatesMap.entrySet().stream().map(twi -> {
@@ -53,12 +63,22 @@ public List<CleaningTemplate> getTemplates() {
5363
}).toList();
5464
}
5565

66+
public int countTemplates(String keywords) {
67+
return cleaningTemplateMapper.findAllTemplates(keywords, null, null).size();
68+
}
69+
5670
@Transactional
5771
public CleaningTemplate createTemplate(CreateCleaningTemplateRequest request) {
5872
CleaningTemplate template = new CleaningTemplate();
73+
String templateId = UUID.randomUUID().toString();
74+
template.setId(templateId);
5975
template.setName(request.getName());
6076
template.setDescription(request.getDescription());
6177
cleaningTemplateMapper.insertTemplate(template);
78+
79+
List<OperatorInstancePo> instancePos = request.getInstance().stream()
80+
.map(OperatorInstanceConverter.INSTANCE::operatorToDo).toList();
81+
operatorInstanceMapper.insertInstance(templateId, instancePos);
6282
return template;
6383
}
6484

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/domain/model/TaskProcess.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
public class TaskProcess {
1313
private String instanceId;
1414

15+
private String datasetId;
16+
1517
private String datasetPath;
1618

1719
private String exportPath;

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/domain/model/TemplateWithInstance.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import lombok.Setter;
55
import org.springframework.format.annotation.DateTimeFormat;
66

7-
import java.time.OffsetDateTime;
7+
import java.time.LocalDateTime;
88

99

1010
@Getter
@@ -17,10 +17,10 @@ public class TemplateWithInstance {
1717
private String description;
1818

1919
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
20-
private OffsetDateTime createdAt;
20+
private LocalDateTime createdAt;
2121

2222
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
23-
private OffsetDateTime updatedAt;
23+
private LocalDateTime updatedAt;
2424

2525
private String operatorId;
2626

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/infrastructure/persistence/mapper/CleaningTaskMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88

99
@Mapper
1010
public interface CleaningTaskMapper {
11-
List<CleaningTask> findTasksByStatus(@Param("status") String status, @Param("keywords") String keywords,
11+
List<CleaningTask> findTasks(@Param("status") String status, @Param("keywords") String keywords,
1212
@Param("size") Integer size, @Param("offset") Integer offset);
1313

1414
CleaningTask findTaskById(@Param("taskId") String taskId);
1515

1616
void insertTask(CleaningTask task);
1717

18-
void updateTaskStatus(CleaningTask task);
18+
void updateTask(CleaningTask task);
1919

2020
void deleteTask(@Param("taskId") String taskId);
2121
}

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/infrastructure/persistence/mapper/CleaningTemplateMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
@Mapper
1212
public interface CleaningTemplateMapper {
1313

14-
List<TemplateWithInstance> findAllTemplates();
14+
List<TemplateWithInstance> findAllTemplates(@Param("keywords") String keywords,
15+
@Param("size") Integer size, @Param("offset") Integer offset);
1516

1617
List<OperatorResponse> findAllOperators();
1718

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/interfaces/api/CleaningTaskController.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
11
package com.dataengine.cleaning.interfaces.api;
22

3+
import com.dataengine.cleaning.application.service.CleaningTaskService;
34
import com.dataengine.cleaning.interfaces.dto.CleaningTask;
45
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTaskRequest;
5-
import com.dataengine.cleaning.application.service.CleaningTaskService;
6-
6+
import com.dataengine.common.infrastructure.common.Response;
77
import com.dataengine.common.interfaces.PagedResponse;
8-
import com.dataengine.common.interfaces.Response;
98
import lombok.RequiredArgsConstructor;
109
import org.springframework.http.ResponseEntity;
11-
import org.springframework.web.bind.annotation.DeleteMapping;
12-
import org.springframework.web.bind.annotation.GetMapping;
13-
import org.springframework.web.bind.annotation.PathVariable;
14-
import org.springframework.web.bind.annotation.PostMapping;
15-
import org.springframework.web.bind.annotation.RequestBody;
16-
import org.springframework.web.bind.annotation.RequestMapping;
17-
import org.springframework.web.bind.annotation.RequestParam;
18-
import org.springframework.web.bind.annotation.RestController;
10+
import org.springframework.web.bind.annotation.*;
11+
12+
import java.util.List;
1913

2014

2115
@RestController
@@ -29,8 +23,10 @@ public ResponseEntity<Response<PagedResponse<CleaningTask>>> cleaningTasksGet(
2923
@RequestParam("page") Integer page,
3024
@RequestParam("size") Integer size, @RequestParam(value = "status", required = false) String status,
3125
@RequestParam(value = "keywords", required = false) String keywords) {
32-
return ResponseEntity.ok(Response.ok(PagedResponse.of(cleaningTaskService.getTasks(status, keywords, page,
33-
size))));
26+
List<CleaningTask> tasks = cleaningTaskService.getTasks(status, keywords, page, size);
27+
int count = cleaningTaskService.countTasks(status, keywords);
28+
int totalPages = (count + size + 1) / size;
29+
return ResponseEntity.ok(Response.ok(PagedResponse.of(tasks, page, count, totalPages)));
3430
}
3531

3632
@PostMapping
@@ -44,11 +40,9 @@ public ResponseEntity<Response<Object>> cleaningTasksStop(@PathVariable("taskId"
4440
return ResponseEntity.ok(Response.ok(null));
4541
}
4642

47-
@PostMapping("/{taskId}/start")
43+
@PostMapping("/{taskId}/execute")
4844
public ResponseEntity<Response<Object>> cleaningTasksStart(@PathVariable("taskId") String taskId) {
49-
CleaningTask task = new CleaningTask();
50-
task.setId(taskId);
51-
cleaningTaskService.executeTask(task);
45+
cleaningTaskService.executeTask(taskId);
5246
return ResponseEntity.ok(Response.ok(null));
5347
}
5448

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/interfaces/api/CleaningTemplateController.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package com.dataengine.cleaning.interfaces.api;
22

33
import com.dataengine.cleaning.application.service.CleaningTemplateService;
4-
54
import com.dataengine.cleaning.interfaces.dto.CleaningTemplate;
65
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
76
import com.dataengine.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
7+
import com.dataengine.common.infrastructure.common.Response;
88
import com.dataengine.common.interfaces.PagedResponse;
9-
import com.dataengine.common.interfaces.Response;
109
import lombok.RequiredArgsConstructor;
1110
import org.springframework.http.ResponseEntity;
1211
import org.springframework.web.bind.annotation.DeleteMapping;
@@ -16,8 +15,11 @@
1615
import org.springframework.web.bind.annotation.PutMapping;
1716
import org.springframework.web.bind.annotation.RequestBody;
1817
import org.springframework.web.bind.annotation.RequestMapping;
18+
import org.springframework.web.bind.annotation.RequestParam;
1919
import org.springframework.web.bind.annotation.RestController;
2020

21+
import java.util.List;
22+
2123

2224
@RestController
2325
@RequestMapping("/cleaning/templates")
@@ -26,8 +28,18 @@ public class CleaningTemplateController {
2628
private final CleaningTemplateService cleaningTemplateService;
2729

2830
@GetMapping
29-
public ResponseEntity<Response<PagedResponse<CleaningTemplate>>> cleaningTemplatesGet() {
30-
return ResponseEntity.ok(Response.ok(PagedResponse.of(cleaningTemplateService.getTemplates())));
31+
public ResponseEntity<Response<PagedResponse<CleaningTemplate>>> cleaningTemplatesGet(
32+
@RequestParam(value = "page", required = false) Integer page,
33+
@RequestParam(value = "size", required = false) Integer size,
34+
@RequestParam(value = "keywords", required = false) String keyword) {
35+
List<CleaningTemplate> templates = cleaningTemplateService.getTemplates(keyword, page, size);
36+
if (page != null && size != null) {
37+
int count = cleaningTemplateService.countTemplates(keyword);
38+
int totalPages = (count + size + 1) / size;
39+
return ResponseEntity.ok(Response.ok(PagedResponse.of(templates, page, count, totalPages)));
40+
} else {
41+
return ResponseEntity.ok(Response.ok(PagedResponse.of(templates)));
42+
}
3143
}
3244

3345
@PostMapping

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/interfaces/dto/CleaningTask.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import com.fasterxml.jackson.annotation.JsonCreator;
44
import com.fasterxml.jackson.annotation.JsonValue;
55

6-
import java.time.OffsetDateTime;
6+
import java.time.LocalDateTime;
77
import java.util.List;
88

99
import lombok.Getter;
@@ -32,19 +32,23 @@ public class CleaningTask {
3232

3333
private String destDatasetName;
3434

35+
private long beforeSize;
36+
37+
private long afterSize;
38+
3539
/**
3640
* 任务当前状态
3741
*/
3842
public enum StatusEnum {
39-
PENDING("pending"),
43+
PENDING("PENDING"),
4044

41-
RUNNING("running"),
45+
RUNNING("RUNNING"),
4246

43-
COMPLETED("completed"),
47+
COMPLETED("COMPLETED"),
4448

45-
STOPPED("stopped"),
49+
STOPPED("STOPPED"),
4650

47-
FAILED("failed");
51+
FAILED("FAILED");
4852

4953
private final String value;
5054

@@ -77,12 +81,12 @@ public static StatusEnum fromValue(String value) {
7781
private CleaningProcess progress;
7882

7983
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
80-
private OffsetDateTime createdAt;
84+
private LocalDateTime createdAt;
8185

8286
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
83-
private OffsetDateTime startedAt;
87+
private LocalDateTime startedAt;
8488

8589
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
86-
private OffsetDateTime finishedAt;
90+
private LocalDateTime finishedAt;
8791
}
8892

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/interfaces/dto/CleaningTemplate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.dataengine.cleaning.interfaces.dto;
22

3-
import java.time.OffsetDateTime;
3+
import java.time.LocalDateTime;
44
import java.util.ArrayList;
55
import java.util.List;
66

@@ -25,9 +25,9 @@ public class CleaningTemplate {
2525
private List<OperatorResponse> instance = new ArrayList<>();
2626

2727
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
28-
private OffsetDateTime createdAt;
28+
private LocalDateTime createdAt;
2929

3030
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
31-
private OffsetDateTime updatedAt;
31+
private LocalDateTime updatedAt;
3232
}
3333

0 commit comments

Comments
 (0)