Skip to content

Commit 4a3d32c

Browse files
author
yuluozyf
committed
【fix】修复数据归集问题
1 parent cc6114f commit 4a3d32c

35 files changed

+774
-2198
lines changed

backend/openapi/specs/data-collection.yaml

Lines changed: 150 additions & 448 deletions
Large diffs are not rendered by default.

backend/openapi/specs/data-management.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,14 @@ info:
33
title: Data Management Service API
44
description: |
55
数据管理服务API,提供数据集的创建、管理和文件操作功能。
6-
6+
77
主要功能:
88
- 数据集的创建和管理
99
- 多种数据集类型支持(图像、文本、音频、视频、多模态等)
1010
- 数据集文件管理
1111
- 数据集标签和元数据管理
1212
- 数据集统计信息
1313
version: 1.0.0
14-
contact:
15-
name: Data Engine Platform Team
16-
1714

1815
servers:
1916
- url: http://localhost:8092/api/v1/data-management

backend/services/data-collection-service/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@
148148
<useTags>true</useTags>
149149
<useSpringBoot3>true</useSpringBoot3>
150150
<documentationProvider>springdoc</documentationProvider>
151+
<dateLibrary>java8-localdatetime</dateLibrary>
152+
<java8>true</java8>
151153
</configOptions>
152154
</configuration>
153155
</execution>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.dataengine.collection.application.scheduler;
2+
3+
import com.dataengine.collection.application.service.DataxExecutionService;
4+
import com.dataengine.collection.domain.model.CollectionTask;
5+
import com.dataengine.collection.domain.model.TaskStatus;
6+
import com.dataengine.collection.domain.model.TaskExecution;
7+
import com.dataengine.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
8+
import com.dataengine.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
9+
import lombok.RequiredArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.springframework.scheduling.annotation.Scheduled;
12+
import org.springframework.scheduling.support.CronExpression;
13+
import org.springframework.stereotype.Component;
14+
import org.springframework.util.StringUtils;
15+
16+
import java.time.LocalDateTime;
17+
import java.util.List;
18+
19+
@Slf4j
20+
@Component
21+
@RequiredArgsConstructor
22+
public class TaskSchedulerInitializer {
23+
24+
private final CollectionTaskMapper taskMapper;
25+
private final TaskExecutionMapper executionMapper;
26+
private final DataxExecutionService dataxExecutionService;
27+
28+
// 定期扫描激活的采集任务,根据 Cron 判断是否到期执行
29+
@Scheduled(fixedDelayString = "${dataengine.data-collection.scheduler.scan-interval-ms:10000}")
30+
public void scanAndTrigger() {
31+
List<CollectionTask> tasks = taskMapper.selectActiveTasks();
32+
if (tasks == null || tasks.isEmpty()) {
33+
return;
34+
}
35+
LocalDateTime now = LocalDateTime.now();
36+
for (CollectionTask task : tasks) {
37+
String cronExpr = task.getScheduleExpression();
38+
if (!StringUtils.hasText(cronExpr)) {
39+
continue;
40+
}
41+
try {
42+
// 如果最近一次执行仍在运行,则跳过
43+
TaskExecution latest = executionMapper.selectLatestByTaskId(task.getId());
44+
if (latest != null && latest.getStatus() == TaskStatus.RUNNING) {
45+
continue;
46+
}
47+
48+
CronExpression cron = CronExpression.parse(cronExpr);
49+
LocalDateTime base = latest != null && latest.getStartedAt() != null
50+
? latest.getStartedAt()
51+
: now.minusYears(1); // 没有历史记录时,拉长基准时间确保到期判定
52+
LocalDateTime nextTime = cron.next(base);
53+
54+
if (nextTime != null && !nextTime.isAfter(now)) {
55+
// 到期,触发一次执行
56+
TaskExecution exec = dataxExecutionService.createExecution(task);
57+
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
58+
dataxExecutionService.runAsync(task, exec.getId(), timeout);
59+
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId());
60+
}
61+
} catch (Exception ex) {
62+
log.warn("Skip task {} due to invalid cron or scheduling error: {}", task.getId(), ex.getMessage());
63+
}
64+
}
65+
}
66+
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/CollectionTaskService.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import com.dataengine.collection.domain.model.CollectionTask;
44
import com.dataengine.collection.domain.model.TaskExecution;
55
import com.dataengine.collection.domain.model.TaskStatus;
6-
import com.dataengine.collection.domain.model.ExecutionStatus;
6+
import com.dataengine.collection.domain.model.DataxTemplate;
77
import com.dataengine.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
88
import com.dataengine.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
99
import lombok.RequiredArgsConstructor;
@@ -22,7 +22,7 @@ public class CollectionTaskService {
2222
@Transactional
2323
public CollectionTask create(CollectionTask task) {
2424
task.setId(UUID.randomUUID().toString());
25-
task.setStatus(TaskStatus.DRAFT);
25+
task.setStatus(TaskStatus.READY);
2626
task.setCreatedAt(LocalDateTime.now());
2727
task.setUpdatedAt(LocalDateTime.now());
2828
taskMapper.insert(task);
@@ -41,36 +41,24 @@ public CollectionTask update(CollectionTask task) {
4141

4242
public CollectionTask get(String id) { return taskMapper.selectById(id); }
4343

44-
public List<CollectionTask> list(Integer page, Integer size, String status, String name,
45-
String sourceDataSourceId, String targetDataSourceId) {
44+
public List<CollectionTask> list(Integer page, Integer size, String status, String name) {
4645
Map<String, Object> p = new HashMap<>();
4746
p.put("status", status);
4847
p.put("name", name);
49-
p.put("sourceDataSourceId", sourceDataSourceId);
50-
p.put("targetDataSourceId", targetDataSourceId);
5148
if (page != null && size != null) {
5249
p.put("offset", page * size);
5350
p.put("limit", size);
5451
}
5552
return taskMapper.selectAll(p);
5653
}
5754

58-
public long count(String status, String name, String sourceDataSourceId, String targetDataSourceId) {
59-
Map<String, Object> p = new HashMap<>();
60-
p.put("status", status);
61-
p.put("name", name);
62-
p.put("sourceDataSourceId", sourceDataSourceId);
63-
p.put("targetDataSourceId", targetDataSourceId);
64-
return taskMapper.count(p);
65-
}
66-
6755
@Transactional
6856
public TaskExecution startExecution(CollectionTask task) {
6957
TaskExecution exec = new TaskExecution();
7058
exec.setId(UUID.randomUUID().toString());
7159
exec.setTaskId(task.getId());
7260
exec.setTaskName(task.getName());
73-
exec.setStatus(ExecutionStatus.RUNNING);
61+
exec.setStatus(TaskStatus.RUNNING);
7462
exec.setProgress(0.0);
7563
exec.setStartedAt(LocalDateTime.now());
7664
exec.setCreatedAt(LocalDateTime.now());
@@ -79,4 +67,14 @@ public TaskExecution startExecution(CollectionTask task) {
7967
taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name());
8068
return exec;
8169
}
70+
71+
// ---- Template related merged methods ----
72+
public List<DataxTemplate> listTemplates(String sourceType, String targetType, int page, int size) {
73+
int offset = page * size;
74+
return taskMapper.selectList(sourceType, targetType, offset, size);
75+
}
76+
77+
public int countTemplates(String sourceType, String targetType) {
78+
return taskMapper.countTemplates(sourceType, targetType);
79+
}
8280
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/DataSourceService.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/DataxExecutionService.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
import com.dataengine.collection.domain.model.CollectionTask;
44
import com.dataengine.collection.domain.model.TaskExecution;
55
import com.dataengine.collection.domain.model.TaskStatus;
6-
import com.dataengine.collection.domain.model.ExecutionStatus;
6+
import com.dataengine.collection.domain.model.TaskStatus;
77
import com.dataengine.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
8-
import com.dataengine.collection.infrastructure.persistence.mapper.TaskExecutionLogMapper;
98
import com.dataengine.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
109
import com.dataengine.collection.infrastructure.runtime.datax.DataxJobBuilder;
1110
import com.dataengine.collection.infrastructure.runtime.datax.DataxProcessRunner;
@@ -41,7 +40,7 @@ public TaskExecution createExecution(CollectionTask task) {
4140
exec.setId(UUID.randomUUID().toString());
4241
exec.setTaskId(task.getId());
4342
exec.setTaskName(task.getName());
44-
exec.setStatus(ExecutionStatus.RUNNING);
43+
exec.setStatus(TaskStatus.RUNNING);
4544
exec.setProgress(0.0);
4645
exec.setStartedAt(LocalDateTime.now());
4746
exec.setCreatedAt(LocalDateTime.now());
@@ -59,12 +58,12 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds
5958
int code = processRunner.runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
6059
log.info("DataX finished with code {} for execution {}", code, executionId);
6160
// 简化:成功即完成
62-
executionMapper.completeExecution(executionId, ExecutionStatus.SUCCESS.name(), LocalDateTime.now(),
61+
executionMapper.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
6362
0, 0L, 0L, 0L, null, null);
64-
taskMapper.updateStatus(task.getId(), TaskStatus.COMPLETED.name());
63+
taskMapper.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
6564
} catch (Exception e) {
6665
log.error("DataX execution failed", e);
67-
executionMapper.completeExecution(executionId, ExecutionStatus.FAILED.name(), LocalDateTime.now(),
66+
executionMapper.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
6867
0, 0L, 0L, 0L, e.getMessage(), null);
6968
taskMapper.updateStatus(task.getId(), TaskStatus.FAILED.name());
7069
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/application/service/TaskExecutionService.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.dataengine.collection.domain.model.CollectionTask;
44
import com.dataengine.collection.domain.model.TaskExecution;
55
import com.dataengine.collection.domain.model.TaskStatus;
6-
import com.dataengine.collection.domain.model.ExecutionStatus;
76
import com.dataengine.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
87
import com.dataengine.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
98
import lombok.RequiredArgsConstructor;
@@ -45,15 +44,40 @@ public long count(String taskId, String status, LocalDateTime startDate, LocalDa
4544
return executionMapper.count(p);
4645
}
4746

47+
// --- Added convenience methods ---
48+
public TaskExecution get(String id) { return executionMapper.selectById(id); }
49+
public TaskExecution getLatestByTaskId(String taskId) { return executionMapper.selectLatestByTaskId(taskId); }
50+
4851
@Transactional
4952
public void complete(String executionId, boolean success, long successCount, long failedCount,
5053
long dataSizeBytes, String errorMessage, String resultJson) {
5154
LocalDateTime now = LocalDateTime.now();
5255
TaskExecution exec = executionMapper.selectById(executionId);
56+
if (exec == null) { return; }
5357
int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds();
54-
executionMapper.completeExecution(executionId, success ? ExecutionStatus.SUCCESS.name() : ExecutionStatus.FAILED.name(),
58+
executionMapper.completeExecution(executionId, success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name(),
5559
now, duration, successCount, failedCount, dataSizeBytes, errorMessage, resultJson);
5660
CollectionTask task = taskMapper.selectById(exec.getTaskId());
57-
taskMapper.updateStatus(task.getId(), success ? TaskStatus.COMPLETED.name() : TaskStatus.FAILED.name());
61+
if (task != null) {
62+
taskMapper.updateStatus(task.getId(), success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name());
63+
}
64+
}
65+
66+
@Transactional
67+
public void stop(String executionId) {
68+
TaskExecution exec = executionMapper.selectById(executionId);
69+
if (exec == null || exec.getStatus() != TaskStatus.RUNNING) { return; }
70+
LocalDateTime now = LocalDateTime.now();
71+
int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds();
72+
// Reuse completeExecution to persist STOPPED status and timing info
73+
executionMapper.completeExecution(exec.getId(), TaskStatus.STOPPED.name(), now, duration,
74+
exec.getRecordsSuccess(), exec.getRecordsFailed(), exec.getDataSizeBytes(), null, exec.getResult());
75+
taskMapper.updateStatus(exec.getTaskId(), TaskStatus.STOPPED.name());
76+
}
77+
78+
@Transactional
79+
public void stopLatestByTaskId(String taskId) {
80+
TaskExecution latest = executionMapper.selectLatestByTaskId(taskId);
81+
if (latest != null) { stop(latest.getId()); }
5882
}
5983
}

backend/services/data-collection-service/src/main/java/com/dataengine/collection/domain/model/CollectionStatistics.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

backend/services/data-collection-service/src/main/java/com/dataengine/collection/domain/model/CollectionTask.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ public class CollectionTask {
99
private String id;
1010
private String name;
1111
private String description;
12-
private String sourceDataSourceId;
13-
private String targetDataSourceId;
14-
private String config; // DataX JSON 配置
12+
private String config; // DataX JSON 配置,包含源端和目标端配置信息
1513
private TaskStatus status;
14+
private String syncMode; // ONCE / SCHEDULED
1615
private String scheduleExpression;
1716
private Integer retryCount;
1817
private Integer timeoutSeconds;

0 commit comments

Comments
 (0)