Skip to content

Commit 7226105

Browse files
committed
feature:本地归集重复文件(按照路径判断是否为重复)做更新处理
1 parent d43f5c2 commit 7226105

File tree

10 files changed

+81
-23
lines changed

10 files changed

+81
-23
lines changed

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/CollectionTaskService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,35 @@
66
import com.dataengine.collection.domain.model.DataxTemplate;
77
import com.dataengine.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
88
import com.dataengine.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
9+
import com.dataengine.collection.interfaces.dto.SyncMode;
910
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.slf4j.Slf4j;
1012
import org.springframework.stereotype.Service;
1113
import org.springframework.transaction.annotation.Transactional;
1214

1315
import java.time.LocalDateTime;
1416
import java.util.*;
1517

18+
@Slf4j
1619
@Service
1720
@RequiredArgsConstructor
1821
public class CollectionTaskService {
1922
private final CollectionTaskMapper taskMapper;
2023
private final TaskExecutionMapper executionMapper;
24+
private final DataxExecutionService dataxExecutionService;
2125

2226
@Transactional
2327
public CollectionTask create(CollectionTask task) {
24-
task.setId(UUID.randomUUID().toString());
2528
task.setStatus(TaskStatus.READY);
2629
task.setCreatedAt(LocalDateTime.now());
2730
task.setUpdatedAt(LocalDateTime.now());
2831
taskMapper.insert(task);
32+
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE.getValue())) {
33+
TaskExecution exec = dataxExecutionService.createExecution(task);
34+
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
35+
dataxExecutionService.runAsync(task, exec.getId(), timeout);
36+
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
37+
}
2938
return task;
3039
}
3140

backend/services/data-collection-service/src/main/java/com/dataengine/collection/infrastructure/runtime/datax/DataxJobBuilder.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
11
package com.dataengine.collection.infrastructure.runtime.datax;
22

33
import com.dataengine.collection.domain.model.CollectionTask;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
46
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.commons.lang3.StringUtils;
59
import org.springframework.stereotype.Component;
610

7-
import java.io.File;
811
import java.io.FileWriter;
912
import java.io.IOException;
1013
import java.nio.file.Files;
1114
import java.nio.file.Path;
1215
import java.nio.file.Paths;
16+
import java.util.HashMap;
17+
import java.util.List;
18+
import java.util.Map;
1319

1420
/**
1521
* 根据任务配置拼装 DataX 作业 JSON 文件
1622
*/
23+
@Slf4j
1724
@Component
1825
@RequiredArgsConstructor
1926
public class DataxJobBuilder {
@@ -28,11 +35,47 @@ public Path buildJobFile(CollectionTask task) throws IOException {
2835
try (FileWriter fw = new FileWriter(path.toFile())) {
2936
String json = task.getConfig() == null || task.getConfig().isEmpty() ?
3037
defaultJobJson() : task.getConfig();
38+
if (StringUtils.isNotBlank(task.getConfig())) {
39+
json = getJobConfig(task);
40+
}
41+
log.info("Job config: {}", json);
3142
fw.write(json);
3243
}
3344
return path;
3445
}
3546

47+
private String getJobConfig(CollectionTask task) {
48+
try {
49+
ObjectMapper objectMapper = new ObjectMapper();
50+
Map<String, Object> parameter = objectMapper.readValue(
51+
task.getConfig(),
52+
new TypeReference<>() {}
53+
);
54+
Map<String, Object> job = new HashMap<>();
55+
Map<String, Object> content = new HashMap<>();
56+
Map<String, Object> reader = new HashMap<>();
57+
reader.put("name", "nfsreader");
58+
reader.put("parameter", parameter);
59+
content.put("reader", reader);
60+
Map<String, Object> writer = new HashMap<>();
61+
writer.put("name", "nfswriter");
62+
writer.put("parameter", parameter);
63+
content.put("writer", writer);
64+
job.put("content", List.of(content));
65+
Map<String, Object> setting = new HashMap<>();
66+
Map<String, Object> channel = new HashMap<>();
67+
channel.put("channel", 2);
68+
setting.put("speed", channel);
69+
job.put("setting", setting);
70+
Map<String, Object> jobConfig = new HashMap<>();
71+
jobConfig.put("job", job);
72+
return objectMapper.writeValueAsString(jobConfig);
73+
} catch (Exception e) {
74+
log.error("Failed to parse task config", e);
75+
throw new RuntimeException("Failed to parse task config", e);
76+
}
77+
}
78+
3679
private String defaultJobJson() {
3780
// 提供一个最小可运行的空 job,实际会被具体任务覆盖
3881
return "{\n \"job\": {\n \"setting\": {\n \"speed\": {\n \"channel\": 1\n }\n },\n \"content\": []\n }\n}";

backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111
import org.springframework.validation.annotation.Validated;
1212
import org.springframework.web.bind.annotation.RestController;
1313

14-
import java.util.List;
15-
import java.util.Map;
14+
import java.util.*;
1615
import java.util.stream.Collectors;
1716

1817
@RestController
@@ -73,8 +72,19 @@ private Map<String, Object> parseJsonToMap(String json) {
7372
}
7473
}
7574

76-
private String mapToJsonString(Map<String, Object> map) {
75+
private String mapToJsonString(Map<String, Object> map, String taskId) {
7776
try {
77+
if (Objects.nonNull(map) && "LOCAL_COLLECTION".equals(map.get("type"))) {
78+
// 本地归集相关校验和处理
79+
return objectMapper.writeValueAsString(map);
80+
}
81+
if (Objects.nonNull(map) && "DATAX".equals(map.get("type"))) {
82+
// NFS相关校验和处理
83+
map.put("destPath", "/dataset/local/" + taskId);
84+
map.put("filePaths", Arrays.asList(map.get("destPath")));
85+
return objectMapper.writeValueAsString(map);
86+
}
87+
7888
return objectMapper.writeValueAsString(map != null ? map : Map.of());
7989
} catch (Exception e) {
8090
return "{}";
@@ -84,9 +94,10 @@ private String mapToJsonString(Map<String, Object> map) {
8494
@Override
8595
public ResponseEntity<CollectionTaskResponse> createTask(CreateCollectionTaskRequest body) {
8696
CollectionTask t = new CollectionTask();
97+
t.setId(UUID.randomUUID().toString());
8798
t.setName(body.getName());
8899
t.setDescription(body.getDescription());
89-
t.setConfig(mapToJsonString(body.getConfig()));
100+
t.setConfig(mapToJsonString(body.getConfig(), t.getId()));
90101
if (body.getSyncMode() != null) { t.setSyncMode(body.getSyncMode().getValue()); }
91102
t.setScheduleExpression(body.getScheduleExpression());
92103
t = taskService.create(t);
@@ -102,7 +113,7 @@ public ResponseEntity<CollectionTaskResponse> updateTask(String id, UpdateCollec
102113
t.setId(id);
103114
t.setName(body.getName());
104115
t.setDescription(body.getDescription());
105-
t.setConfig(mapToJsonString(body.getConfig()));
116+
t.setConfig(mapToJsonString(body.getConfig(), t.getId()));
106117
if (body.getSyncMode() != null) { t.setSyncMode(body.getSyncMode().getValue()); }
107118
t.setScheduleExpression(body.getScheduleExpression());
108119
t = taskService.update(t);

backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/DatasetApplicationService.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,13 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
281281
log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);
282282

283283
// 1. 调用数据归集服务获取任务详情
284-
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId);
284+
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
285285
if (taskDetail == null) {
286286
log.error("获取归集任务详情失败,任务ID: {}", dataSourceId);
287287
return;
288288
}
289289

290-
log.info("获取到归集任务详情: {}", taskDetail.getName());
290+
log.info("获取到归集任务详情: {}", taskDetail);
291291

292292
// 2. 解析任务配置
293293
LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
@@ -296,12 +296,6 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
296296
return;
297297
}
298298

299-
// 3. 检查任务类型是否为 LOCAL_COLLECTION
300-
if (!"LOCAL_COLLECTION".equalsIgnoreCase(config.getType())) {
301-
log.info("任务类型不是 LOCAL_COLLECTION,跳过文件扫描。任务类型: {}", config.getType());
302-
return;
303-
}
304-
305299
// 4. 获取文件路径列表
306300
List<String> filePaths = config.getFilePaths();
307301
if (CollectionUtils.isEmpty(filePaths)) {

backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/CollectionTaskClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.dataengine.datamanagement.infrastructure.client;
22

3+
import com.dataengine.common.infrastructure.common.Response;
34
import com.dataengine.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse;
45
import org.springframework.cloud.openfeign.FeignClient;
56
import org.springframework.web.bind.annotation.GetMapping;
@@ -17,5 +18,5 @@ public interface CollectionTaskClient {
1718
* @return 任务详情
1819
*/
1920
@GetMapping("/api/data-collection/tasks/{id}")
20-
CollectionTaskDetailResponse getTaskDetail(@PathVariable("id") String taskId);
21+
Response<CollectionTaskDetailResponse> getTaskDetail(@PathVariable("id") String taskId);
2122
}

backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/infrastructure/client/dto/LocalCollectionConfig.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,4 @@ public class LocalCollectionConfig {
1818
* 文件路径列表
1919
*/
2020
private List<String> filePaths;
21-
22-
/**
23-
* 其他配置项
24-
*/
25-
private String sourceType;
26-
private String targetType;
2721
}

backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetController.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.dataengine.datamanagement.domain.model.dataset.Dataset;
88
import com.dataengine.datamanagement.interfaces.converter.DatasetConverter;
99
import com.dataengine.datamanagement.interfaces.dto.*;
10+
import lombok.extern.slf4j.Slf4j;
1011
import org.springframework.beans.factory.annotation.Autowired;
1112
import org.springframework.data.domain.Page;
1213
import org.springframework.http.HttpStatus;
@@ -18,6 +19,7 @@
1819
/**
1920
* 数据集 REST 控制器(UUID 模式)
2021
*/
22+
@Slf4j
2123
@RestController
2224
@RequestMapping("/data-management/datasets")
2325
public class DatasetController {
@@ -47,6 +49,7 @@ public ResponseEntity<Response<DatasetResponse>> createDataset(@RequestBody Crea
4749
Dataset dataset = datasetApplicationService.createDataset(createDatasetRequest);
4850
return ResponseEntity.status(HttpStatus.CREATED).body(Response.ok(DatasetConverter.INSTANCE.convertToResponse(dataset)));
4951
} catch (IllegalArgumentException e) {
52+
log.error("Failed to create dataset", e);
5053
return ResponseEntity.badRequest().body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null));
5154
}
5255
}

backend/services/main-application/src/main/resources/config/application-datacollection.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ dataengine:
22
data-collection:
33
# DataX配置
44
datax:
5-
home-path: ${DATAX_HOME:D:/datax}
5+
home-path: ${DATAX_HOME:/opt/datax}
66
python-path: ${DATAX_PYTHON_PATH:python3}
77
job-config-path: ${DATAX_JOB_PATH:./data/temp/datax/jobs}
88
log-path: ${DATAX_LOG_PATH:./logs/datax}

backend/shared/domain-common/src/main/java/com/dataengine/common/infrastructure/common/Response.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.dataengine.common.infrastructure.exception.ErrorCode;
44
import lombok.AllArgsConstructor;
55
import lombok.Getter;
6+
import lombok.NoArgsConstructor;
67
import lombok.Setter;
78

89
import java.io.Serial;
@@ -13,6 +14,7 @@
1314
*/
1415
@Getter
1516
@Setter
17+
@NoArgsConstructor
1618
@AllArgsConstructor
1719
public class Response<T> implements Serializable {
1820
@Serial

deployment/docker/data-platform/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ services:
44
container_name: backend
55
image: backend
66
restart: on-failure
7+
privileged: true
78
ports:
89
- "8080:8080"
910
volumes:

0 commit comments

Comments
 (0)