Skip to content

Commit b416a12

Browse files
committed
Merge remote-tracking branch 'origin/develop_930' into develop_930
2 parents f19ee7e + 2ce2f77 commit b416a12

File tree

11 files changed

+214
-199
lines changed

11 files changed

+214
-199
lines changed

backend/services/data-collection-service/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,37 @@
164164
<classifier>exec</classifier>
165165
</configuration>
166166
</plugin>
167+
<plugin>
168+
<groupId>org.apache.maven.plugins</groupId>
169+
<artifactId>maven-compiler-plugin</artifactId>
170+
<version>3.11.0</version>
171+
<configuration>
172+
<source>${maven.compiler.source}</source>
173+
<target>${maven.compiler.target}</target>
174+
<annotationProcessorPaths>
175+
<!-- 顺序很重要 -->
176+
<path>
177+
<groupId>org.projectlombok</groupId>
178+
<artifactId>lombok</artifactId>
179+
<version>${lombok.version}</version>
180+
</path>
181+
<path>
182+
<groupId>org.projectlombok</groupId>
183+
<artifactId>lombok-mapstruct-binding</artifactId>
184+
<version>${lombok-mapstruct-binding.version}</version>
185+
</path>
186+
<path>
187+
<groupId>org.mapstruct</groupId>
188+
<artifactId>mapstruct-processor</artifactId>
189+
<version>${mapstruct.version}</version>
190+
</path>
191+
</annotationProcessorPaths>
192+
<compilerArgs>
193+
<arg>-parameters</arg>
194+
<arg>-Amapstruct.defaultComponentModel=spring</arg>
195+
</compilerArgs>
196+
</configuration>
197+
</plugin>
167198
</plugins>
168199
</build>
169200
</project>

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/CollectionTaskService.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.springframework.transaction.annotation.Transactional;
1414

1515
import java.time.LocalDateTime;
16-
import java.util.*;
16+
import java.util.HashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.Objects;
1720

1821
@Slf4j
1922
@Service
@@ -29,13 +32,17 @@ public CollectionTask create(CollectionTask task) {
2932
task.setCreatedAt(LocalDateTime.now());
3033
task.setUpdatedAt(LocalDateTime.now());
3134
taskMapper.insert(task);
35+
executeTaskNow(task);
36+
return task;
37+
}
38+
39+
private void executeTaskNow(CollectionTask task) {
3240
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE.getValue())) {
3341
TaskExecution exec = dataxExecutionService.createExecution(task);
3442
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
3543
dataxExecutionService.runAsync(task, exec.getId(), timeout);
3644
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
3745
}
38-
return task;
3946
}
4047

4148
@Transactional
@@ -63,18 +70,7 @@ public List<CollectionTask> list(Integer page, Integer size, String status, Stri
6370

6471
@Transactional
6572
public TaskExecution startExecution(CollectionTask task) {
66-
TaskExecution exec = new TaskExecution();
67-
exec.setId(UUID.randomUUID().toString());
68-
exec.setTaskId(task.getId());
69-
exec.setTaskName(task.getName());
70-
exec.setStatus(TaskStatus.RUNNING);
71-
exec.setProgress(0.0);
72-
exec.setStartedAt(LocalDateTime.now());
73-
exec.setCreatedAt(LocalDateTime.now());
74-
executionMapper.insert(exec);
75-
taskMapper.updateLastExecution(task.getId(), exec.getId());
76-
taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name());
77-
return exec;
73+
return dataxExecutionService.createExecution(task);
7874
}
7975

8076
// ---- Template related merged methods ----

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/DataxExecutionService.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,15 @@ public class DataxExecutionService {
2828

2929
private final DataxJobBuilder jobBuilder;
3030
private final DataxProcessRunner processRunner;
31-
private final DataxProperties props;
3231
private final TaskExecutionMapper executionMapper;
3332
private final CollectionTaskMapper taskMapper;
3433

3534

3635
@Transactional
3736
public TaskExecution createExecution(CollectionTask task) {
38-
39-
TaskExecution exec = new TaskExecution();
40-
exec.setId(UUID.randomUUID().toString());
37+
TaskExecution exec = TaskExecution.initTaskExecution();
4138
exec.setTaskId(task.getId());
4239
exec.setTaskName(task.getName());
43-
exec.setStatus(TaskStatus.RUNNING);
44-
exec.setProgress(0.0);
45-
exec.setStartedAt(LocalDateTime.now());
46-
exec.setCreatedAt(LocalDateTime.now());
4740
executionMapper.insert(exec);
4841
taskMapper.updateLastExecution(task.getId(), exec.getId());
4942
taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name());

backend/services/data-collection-service/src/main/java/com/dataengine/collection/domain/model/CollectionTask.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package com.dataengine.collection.domain.model;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
36
import lombok.Data;
47

58
import java.time.LocalDateTime;
9+
import java.util.Collections;
10+
import java.util.Map;
611

712
@Data
813
public class CollectionTask {
@@ -22,4 +27,19 @@ public class CollectionTask {
2227
private LocalDateTime updatedAt;
2328
private String createdBy;
2429
private String updatedBy;
30+
31+
public void addPath() {
32+
try {
33+
ObjectMapper objectMapper = new ObjectMapper();
34+
Map<String, Object> parameter = objectMapper.readValue(
35+
config,
36+
new TypeReference<>() {}
37+
);
38+
parameter.put("destPath", "/dataset/local/" + id);
39+
parameter.put("filePaths", Collections.singletonList(parameter.get("destPath")));
40+
config = objectMapper.writeValueAsString(parameter);
41+
} catch (JsonProcessingException e) {
42+
throw new RuntimeException(e);
43+
}
44+
}
2545
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/domain/model/TaskExecution.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import lombok.Data;
44

55
import java.time.LocalDateTime;
6+
import java.util.UUID;
67

78
@Data
89
public class TaskExecution {
@@ -25,4 +26,14 @@ public class TaskExecution {
2526
private String config;
2627
private String result;
2728
private LocalDateTime createdAt;
29+
30+
public static TaskExecution initTaskExecution() {
31+
TaskExecution exec = new TaskExecution();
32+
exec.setId(UUID.randomUUID().toString());
33+
exec.setStatus(TaskStatus.RUNNING);
34+
exec.setProgress(0.0);
35+
exec.setStartedAt(LocalDateTime.now());
36+
exec.setCreatedAt(LocalDateTime.now());
37+
return exec;
38+
}
2839
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/infrastructure/runtime/datax/DataxJobBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.nio.file.Files;
1414
import java.nio.file.Path;
1515
import java.nio.file.Paths;
16+
import java.util.Collections;
1617
import java.util.HashMap;
1718
import java.util.List;
1819
import java.util.Map;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.dataengine.collection.interfaces.converter;
2+
3+
import com.dataengine.collection.domain.model.CollectionTask;
4+
import com.dataengine.collection.domain.model.DataxTemplate;
5+
import com.dataengine.collection.interfaces.dto.*;
6+
import com.dataengine.common.infrastructure.exception.BusinessException;
7+
import com.dataengine.common.infrastructure.exception.SystemErrorCode;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.mapstruct.Mapper;
11+
import org.mapstruct.Mapping;
12+
import org.mapstruct.Named;
13+
import org.mapstruct.factory.Mappers;
14+
15+
import java.util.Map;
16+
17+
@Mapper
18+
public interface CollectionTaskConverter {
19+
CollectionTaskConverter INSTANCE = Mappers.getMapper(CollectionTaskConverter.class);
20+
21+
@Mapping(source = "config", target = "config", qualifiedByName = "parseJsonToMap")
22+
CollectionTaskResponse toResponse(CollectionTask task);
23+
24+
CollectionTaskSummary toSummary(CollectionTask task);
25+
26+
DataxTemplateSummary toTemplateSummary(DataxTemplate template);
27+
28+
@Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString")
29+
CollectionTask toCollectionTask(CreateCollectionTaskRequest request);
30+
31+
@Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString")
32+
CollectionTask toCollectionTask(UpdateCollectionTaskRequest request);
33+
34+
@Named("parseJsonToMap")
35+
default Map<String, Object> parseJsonToMap(String json) {
36+
try {
37+
ObjectMapper objectMapper = new ObjectMapper();
38+
return objectMapper.readValue(json, Map.class);
39+
} catch (Exception e) {
40+
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
41+
}
42+
}
43+
44+
@Named("mapToJsonString")
45+
default String mapToJsonString(Map<String, Object> map) {
46+
try {
47+
ObjectMapper objectMapper = new ObjectMapper();
48+
return objectMapper.writeValueAsString(map != null ? map : Map.of());
49+
} catch (Exception e) {
50+
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
51+
}
52+
}
53+
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/interfaces/facade/CollectionTaskController.java

Lines changed: 0 additions & 167 deletions
This file was deleted.

0 commit comments

Comments
 (0)