Skip to content

Commit 69b6d08

Browse files
committed
Merge branch 'develop_930' of github.com:ModelEngine-Group/data-platform into develop_930
2 parents 88304cb + c84f8bd commit 69b6d08

File tree

14 files changed

+156
-88
lines changed

14 files changed

+156
-88
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ mysql-k8s-install:
127127
.PHONY: mysql-k8s-uninstall
128128
mysql-k8s-uninstall:
129129
kubectl delete configmap init-sql
130+
kubectl delete -f deployment/kubernetes/mysql/configmap.yaml
130131
kubectl delete -f deployment/kubernetes/mysql/deploy.yaml
131132

132133
.PHONY: backend-k8s-install

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/httpclient/DatasetClient.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import com.dataengine.cleaning.domain.model.CreateDatasetRequest;
44
import com.dataengine.cleaning.domain.model.DatasetResponse;
55
import com.dataengine.cleaning.domain.model.PagedDatasetFileResponse;
6+
import com.dataengine.common.infrastructure.exception.BusinessException;
7+
import com.dataengine.common.infrastructure.exception.ErrorCodeImpl;
8+
import com.dataengine.common.infrastructure.exception.SystemErrorCode;
69
import com.fasterxml.jackson.databind.JsonNode;
710
import com.fasterxml.jackson.databind.ObjectMapper;
811
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
912
import lombok.extern.slf4j.Slf4j;
10-
import org.springframework.data.domain.Pageable;
11-
13+
import org.springframework.data.domain.PageRequest;
1214

1315
import java.io.IOException;
1416
import java.net.URI;
@@ -17,6 +19,8 @@
1719
import java.net.http.HttpResponse;
1820
import java.text.MessageFormat;
1921
import java.time.Duration;
22+
import java.util.Map;
23+
import java.util.stream.Collectors;
2024

2125
@Slf4j
2226
public class DatasetClient {
@@ -39,12 +43,12 @@ public static DatasetResponse createDataset(String name, String type) {
3943
createDatasetRequest.setName(name);
4044
createDatasetRequest.setDatasetType(type);
4145

42-
4346
String jsonPayload;
4447
try {
4548
jsonPayload = OBJECT_MAPPER.writeValueAsString(createDatasetRequest);
4649
} catch (IOException e) {
47-
throw new RuntimeException("Error serializing object to JSON: " + e.getMessage());
50+
log.error("Error occurred while converting the object.", e);
51+
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
4852
}
4953

5054
HttpRequest request = HttpRequest.newBuilder()
@@ -54,45 +58,50 @@ public static DatasetResponse createDataset(String name, String type) {
5458
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
5559
.build();
5660

57-
try {
58-
HttpResponse<String> response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
59-
int statusCode = response.statusCode();
60-
String responseBody = response.body();
61-
62-
if (statusCode < 200 || statusCode >= 300) {
63-
log.error("Request failed with status code: {}", statusCode);
64-
throw new RuntimeException();
65-
}
66-
JsonNode jsonNode = OBJECT_MAPPER.readTree(responseBody);
67-
return OBJECT_MAPPER.treeToValue(jsonNode.get("data"), DatasetResponse.class);
68-
} catch (IOException | InterruptedException e) {
69-
log.error("Error occurred while making the request: {}", e.getMessage());
70-
throw new RuntimeException();
71-
}
61+
return sendAndReturn(request, DatasetResponse.class);
7262
}
7363

74-
public static PagedDatasetFileResponse getDatasetFile(String datasetId, Pageable page) {
64+
public static PagedDatasetFileResponse getDatasetFile(String datasetId, PageRequest page) {
65+
String url = buildQueryParams(MessageFormat.format(GET_DATASET_FILE_URL, datasetId),
66+
Map.of("page", page.getPageNumber(), "size", page.getPageSize()));
7567
HttpRequest request = HttpRequest.newBuilder()
76-
.uri(URI.create(MessageFormat.format(GET_DATASET_FILE_URL, datasetId)))
68+
.uri(URI.create(url))
7769
.timeout(Duration.ofSeconds(30))
7870
.header("Content-Type", "application/json")
7971
.GET()
8072
.build();
8173

74+
return sendAndReturn(request, PagedDatasetFileResponse.class);
75+
}
76+
77+
private static <T> T sendAndReturn(HttpRequest request, Class<T> clazz) {
8278
try {
8379
HttpResponse<String> response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
8480
int statusCode = response.statusCode();
8581
String responseBody = response.body();
82+
JsonNode jsonNode = OBJECT_MAPPER.readTree(responseBody);
8683

8784
if (statusCode < 200 || statusCode >= 300) {
88-
log.error("Request failed with status code: {}", statusCode);
89-
throw new RuntimeException();
85+
String code = jsonNode.get("code").asText();
86+
String message = jsonNode.get("message").asText();
87+
throw BusinessException.of(ErrorCodeImpl.of(code, message));
9088
}
91-
JsonNode jsonNode = OBJECT_MAPPER.readTree(responseBody);
92-
return OBJECT_MAPPER.treeToValue(jsonNode.get("data"), PagedDatasetFileResponse.class);
89+
return OBJECT_MAPPER.treeToValue(jsonNode.get("data"), clazz);
9390
} catch (IOException | InterruptedException e) {
94-
log.error("Error occurred while making the request: {}", e.getMessage());
95-
throw new RuntimeException();
91+
log.error("Error occurred while making the request.", e);
92+
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
9693
}
9794
}
95+
96+
private static String buildQueryParams(String baseUrl, Map<String, Object> params) {
97+
if (params == null || params.isEmpty()) {
98+
return baseUrl;
99+
}
100+
101+
String queryString = params.entrySet().stream()
102+
.map(entry -> entry.getKey() + entry.getValue().toString())
103+
.collect(Collectors.joining("&"));
104+
105+
return baseUrl + (baseUrl.contains("?") ? "&" : "?") + queryString;
106+
}
98107
}
Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.dataengine.cleaning.application.httpclient;
22

3+
import com.dataengine.common.infrastructure.exception.BusinessException;
4+
import com.dataengine.common.infrastructure.exception.SystemErrorCode;
35
import lombok.extern.slf4j.Slf4j;
46

57
import java.io.IOException;
@@ -21,30 +23,16 @@ public class RuntimeClient {
2123
private static final HttpClient CLIENT = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build();
2224

2325
public static void submitTask(String taskId) {
24-
HttpRequest request = HttpRequest.newBuilder()
25-
.uri(URI.create(MessageFormat.format(CREATE_TASK_URL, taskId)))
26-
.timeout(Duration.ofSeconds(30))
27-
.header("Content-Type", "application/json")
28-
.POST(HttpRequest.BodyPublishers.noBody())
29-
.build();
30-
31-
try {
32-
HttpResponse<String> response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
33-
int statusCode = response.statusCode();
34-
35-
if (statusCode < 200 || statusCode >= 300) {
36-
log.error("Request failed with status code: {}", statusCode);
37-
throw new RuntimeException();
38-
}
39-
} catch (IOException | InterruptedException e) {
40-
log.error("Error occurred while making the request: {}", e.getMessage());
41-
throw new RuntimeException();
42-
}
26+
send(MessageFormat.format(CREATE_TASK_URL, taskId));
4327
}
4428

4529
public static void stopTask(String taskId) {
30+
send(MessageFormat.format(STOP_TASK_URL, taskId));
31+
}
32+
33+
private static void send(String url) {
4634
HttpRequest request = HttpRequest.newBuilder()
47-
.uri(URI.create(MessageFormat.format(STOP_TASK_URL, taskId)))
35+
.uri(URI.create(url))
4836
.timeout(Duration.ofSeconds(30))
4937
.header("Content-Type", "application/json")
5038
.POST(HttpRequest.BodyPublishers.noBody())
@@ -56,11 +44,11 @@ public static void stopTask(String taskId) {
5644

5745
if (statusCode < 200 || statusCode >= 300) {
5846
log.error("Request failed with status code: {}", statusCode);
59-
throw new RuntimeException();
47+
throw BusinessException.of(SystemErrorCode.SYSTEM_BUSY);
6048
}
6149
} catch (IOException | InterruptedException e) {
62-
log.error("Error occurred while making the request: {}", e.getMessage());
63-
throw new RuntimeException();
50+
log.error("Error occurred while making the request.", e);
51+
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
6452
}
6553
}
6654
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.dataengine.cleaning.application.scheduler;
2+
3+
import com.dataengine.cleaning.application.httpclient.RuntimeClient;
4+
import com.dataengine.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper;
5+
import com.dataengine.cleaning.interfaces.dto.CleaningTask;
6+
import lombok.RequiredArgsConstructor;
7+
import org.springframework.stereotype.Service;
8+
9+
import java.time.LocalDateTime;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
13+
@Service
14+
@RequiredArgsConstructor
15+
public class CleaningTaskScheduler {
16+
private final CleaningTaskMapper cleaningTaskMapper;
17+
18+
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
19+
20+
public void executeTask(String taskId) {
21+
taskExecutor.submit(() -> submitTask(taskId));
22+
}
23+
24+
private void submitTask(String taskId) {
25+
CleaningTask task = new CleaningTask();
26+
task.setId(taskId);
27+
task.setStatus(CleaningTask.StatusEnum.RUNNING);
28+
task.setStartedAt(LocalDateTime.now());
29+
cleaningTaskMapper.updateTask(task);
30+
RuntimeClient.submitTask(taskId);
31+
}
32+
33+
public void stopTask(String taskId) {
34+
RuntimeClient.stopTask(taskId);
35+
CleaningTask task = new CleaningTask();
36+
task.setId(taskId);
37+
task.setStatus(CleaningTask.StatusEnum.STOPPED);
38+
cleaningTaskMapper.updateTask(task);
39+
}
40+
}

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/scheduler/TaskScheduler.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/service/CleaningTaskService.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
import com.dataengine.cleaning.application.httpclient.DatasetClient;
5-
import com.dataengine.cleaning.application.httpclient.RuntimeClient;
5+
import com.dataengine.cleaning.application.scheduler.CleaningTaskScheduler;
66
import com.dataengine.cleaning.domain.converter.OperatorInstanceConverter;
77
import com.dataengine.cleaning.domain.model.DatasetResponse;
88
import com.dataengine.cleaning.domain.model.ExecutorType;
@@ -14,11 +14,14 @@
1414
import com.dataengine.cleaning.interfaces.dto.CleaningTask;
1515
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTaskRequest;
1616
import com.dataengine.cleaning.interfaces.dto.OperatorInstance;
17+
import com.dataengine.common.infrastructure.exception.BusinessException;
18+
import com.dataengine.common.infrastructure.exception.SystemErrorCode;
1719
import com.fasterxml.jackson.databind.JsonNode;
1820
import com.fasterxml.jackson.databind.ObjectMapper;
1921
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
2022
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
2123
import lombok.RequiredArgsConstructor;
24+
import lombok.extern.slf4j.Slf4j;
2225
import org.springframework.data.domain.PageRequest;
2326
import org.springframework.stereotype.Service;
2427
import org.springframework.transaction.annotation.Transactional;
@@ -29,21 +32,19 @@
2932
import java.io.File;
3033
import java.io.FileWriter;
3134
import java.io.IOException;
32-
import java.time.LocalDateTime;
3335
import java.util.List;
3436
import java.util.Map;
3537
import java.util.UUID;
36-
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.Executors;
3838

39+
@Slf4j
3940
@Service
4041
@RequiredArgsConstructor
4142
public class CleaningTaskService {
4243
private final CleaningTaskMapper cleaningTaskMapper;
4344

4445
private final OperatorInstanceMapper operatorInstanceMapper;
4546

46-
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
47+
private final CleaningTaskScheduler taskScheduler;
4748

4849
private final String DATASET_PATH = "/dataset";
4950

@@ -96,7 +97,7 @@ public void deleteTask(String taskId) {
9697
}
9798

9899
public void executeTask(String taskId) {
99-
taskExecutor.submit(() -> submitTask(taskId));
100+
taskScheduler.executeTask(taskId);
100101
}
101102

102103
private void prepareTask(CleaningTask task, List<OperatorInstance> instances) {
@@ -125,7 +126,8 @@ private void prepareTask(CleaningTask task, List<OperatorInstance> instances) {
125126
try (FileWriter writer = new FileWriter(file)) {
126127
yaml.dump(jsonMapper.treeToValue(jsonNode, Map.class), writer);
127128
} catch (IOException e) {
128-
throw new RuntimeException(e);
129+
log.error("Failed to prepare process.yaml.", e);
130+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
129131
}
130132
}
131133

@@ -151,15 +153,6 @@ private void scanDataset(String taskId, String srcDatasetId) {
151153
} while (pageNumber < datasetFile.getTotalPages());
152154
}
153155

154-
private void submitTask(String taskId) {
155-
CleaningTask task = new CleaningTask();
156-
task.setId(taskId);
157-
task.setStatus(CleaningTask.StatusEnum.RUNNING);
158-
task.setStartedAt(LocalDateTime.now());
159-
cleaningTaskMapper.updateTask(task);
160-
RuntimeClient.submitTask(taskId);
161-
}
162-
163156
private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {
164157
ObjectMapper objectMapper = new ObjectMapper();
165158

@@ -175,15 +168,12 @@ private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String f
175168
}
176169
}
177170
} catch (IOException e) {
178-
throw new RuntimeException("Error serializing map to JSON: " + e.getMessage());
171+
log.error("Failed to prepare dataset.jsonl.", e);
172+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
179173
}
180174
}
181175

182176
public void stopTask(String taskId) {
183-
RuntimeClient.stopTask(taskId);
184-
CleaningTask task = new CleaningTask();
185-
task.setId(taskId);
186-
task.setStatus(CleaningTask.StatusEnum.STOPPED);
187-
cleaningTaskMapper.updateTask(task);
177+
taskScheduler.stopTask(taskId);
188178
}
189179
}

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/domain/converter/OperatorInstanceConverter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33

44
import com.dataengine.cleaning.domain.model.OperatorInstancePo;
55
import com.dataengine.cleaning.interfaces.dto.OperatorInstance;
6+
import com.dataengine.common.infrastructure.exception.BusinessException;
7+
import com.dataengine.common.infrastructure.exception.SystemErrorCode;
68
import com.fasterxml.jackson.core.JsonProcessingException;
79
import com.fasterxml.jackson.databind.ObjectMapper;
810
import org.mapstruct.Mapper;
911
import org.mapstruct.Mapping;
1012
import org.mapstruct.Named;
1113
import org.mapstruct.factory.Mappers;
1214

13-
import java.util.List;
1415
import java.util.Map;
1516

1617
@Mapper
@@ -26,8 +27,7 @@ static String mapToJson(Map<String, Object> objects) {
2627
try {
2728
return objectMapper.writeValueAsString(objects);
2829
} catch (JsonProcessingException e) {
29-
throw new RuntimeException(e);
30+
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
3031
}
31-
3232
}
3333
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.dataengine.cleaning.infrastructure.exception;
2+
3+
import com.dataengine.common.infrastructure.exception.ErrorCode;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Getter;
6+
7+
@Getter
8+
@AllArgsConstructor
9+
public enum CleanErrorCode implements ErrorCode {
10+
/**
11+
* 清洗任务名称重复
12+
*/
13+
DUPLICATE_TASK_NAME("clean.0001", "清洗任务名称重复"),
14+
15+
CREATE_DATASET_FAILED("clean.0002", "创建数据集失败");
16+
17+
private final String code;
18+
private final String message;
19+
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ private String mapToJsonString(Map<String, Object> map, String taskId) {
8181
if (Objects.nonNull(map) && "DATAX".equals(map.get("type"))) {
8282
// NFS相关校验和处理
8383
map.put("destPath", "/dataset/local/" + taskId);
84-
map.put("filePaths", Arrays.asList(map.get("destPath")));
84+
if (map.containsKey("filePaths") && map.get("filePaths") != null && map.get("filePaths") instanceof List) {
85+
((List) map.get("filePaths")).add(map.get("destPath"));
86+
} else {
87+
map.put("filePaths", Collections.singletonList(map.get("destPath")));
88+
}
8589
return objectMapper.writeValueAsString(map);
8690
}
8791

0 commit comments

Comments
 (0)