Skip to content

Commit 784989a

Browse files
authored
[Feature-4518][admin] Support for automatically restarting failed jobs (#4519)
1 parent a938dc1 commit 784989a

File tree

19 files changed

+167
-8
lines changed

19 files changed

+167
-8
lines changed

.github/workflows/backend.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ jobs:
205205
localhost:5000/dinky/flink:flink
206206
- name: Init Env Jar
207207
run: |
208-
mkdir O e2e_test/docker-compose-env/dinky/jars
208+
mkdir -p e2e_test/docker-compose-env/dinky/jars
209209
wget -O e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar &&
210210
wget -O e2e_test/docker-compose-env/flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar &&
211211
wget -O e2e_test/docker-compose-env/dinky/javax.ws.rs-api-2.1.1.jar https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.jar

dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,14 @@ public class TaskDTO extends AbstractStatementDTO {
157157
notes = "ID of the version associated with the task")
158158
private Integer versionId;
159159

160+
@ApiModelProperty(
161+
value = "Auto Restart",
162+
dataType = "Boolean",
163+
example = "false",
164+
notes =
165+
"Whether to automatically restart the job from the latest checkpoint when it fails or becomes UNKNOWN")
166+
private Boolean autoRestart;
167+
160168
@ApiModelProperty(value = "Enabled", required = true, dataType = "Boolean", example = "true")
161169
private Boolean enabled;
162170

dinky-admin/src/main/java/org/dinky/data/dto/TaskSaveDTO.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ public class TaskSaveDTO {
165165
@TableField(typeHandler = ListTypeHandler.class)
166166
private List<Integer> secondLevelOwners;
167167

168+
@ApiModelProperty(
169+
value = "Auto Restart",
170+
dataType = "Boolean",
171+
example = "false",
172+
notes = "Auto Restart option for the task")
173+
private Boolean autoRestart;
174+
168175
public Task toTaskEntity() {
169176
Task task = new Task();
170177
BeanUtil.copyProperties(this, task);

dinky-admin/src/main/java/org/dinky/data/model/Task.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ public class Task extends SuperEntity<Task> {
161161
notes = "ID of the version associated with the task")
162162
private Integer versionId;
163163

164+
@ApiModelProperty(
165+
value = "Auto Restart",
166+
dataType = "Boolean",
167+
example = "false",
168+
notes =
169+
"Whether to automatically restart the job from the latest checkpoint when it fails or becomes UNKNOWN")
170+
private Boolean autoRestart;
171+
164172
@ApiModelProperty(value = "Enabled", dataType = "Boolean", example = "true", notes = "Whether the task is enabled")
165173
private Boolean enabled;
166174

dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.dinky.data.model.ClusterInstance;
4242
import org.dinky.data.model.SystemConfiguration;
4343
import org.dinky.data.model.ext.JobInfoDetail;
44+
import org.dinky.data.model.job.History;
4445
import org.dinky.data.model.job.JobInstance;
4546
import org.dinky.gateway.Gateway;
4647
import org.dinky.gateway.config.GatewayConfig;
@@ -52,6 +53,7 @@
5253
import org.dinky.service.HistoryService;
5354
import org.dinky.service.JobHistoryService;
5455
import org.dinky.service.JobInstanceService;
56+
import org.dinky.service.TaskService;
5557
import org.dinky.utils.JsonUtils;
5658
import org.dinky.utils.TimeUtil;
5759

@@ -83,12 +85,14 @@ public class JobRefreshHandler {
8385
private static final JobHistoryService jobHistoryService;
8486
private static final ClusterInstanceService clusterInstanceService;
8587
private static final HistoryService historyService;
88+
private static final TaskService taskService;
8689

8790
static {
8891
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
8992
jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class);
9093
clusterInstanceService = SpringContextUtils.getBean("clusterInstanceServiceImpl", ClusterInstanceService.class);
9194
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
95+
taskService = SpringContextUtils.getBean("taskServiceImpl", TaskService.class);
9296
}
9397

9498
/**
@@ -170,7 +174,9 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave)
170174

171175
boolean isTransition = false;
172176

173-
if (JobStatus.isTransition(jobInstance.getStatus())) {
177+
if (JobStatus.isTransition(
178+
jobInstance.getStatus(),
179+
Asserts.isNull(jobDataDto.getJob()) ? null : jobDataDto.getJob().getEndTime())) {
174180
Long finishTime = TimeUtil.localDateTimeToLong(jobInstance.getFinishTime());
175181
long duration = Duration.between(jobInstance.getFinishTime(), LocalDateTime.now())
176182
.toMinutes();
@@ -217,6 +223,10 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave)
217223
if (isDone) {
218224
try {
219225
log.debug("Job is done: {}->{}", jobInstance.getId(), jobInstance.getName());
226+
// 检查是否需要自动重启
227+
if (shouldAutoRestart(jobInstance, jobInfoDetail)) {
228+
tryAutoRestart(jobInstance, jobInfoDetail);
229+
}
220230
handleJobDone(jobInfoDetail);
221231
} catch (Exception e) {
222232
log.error("failed handel job done:", e);
@@ -346,6 +356,94 @@ private static void handleJobDone(JobInfoDetail jobInfoDetail) {
346356
}
347357
}
348358

359+
/**
360+
* Check if the job should be auto-restarted.
361+
*
362+
* @param jobInstance The job instance.
363+
* @param jobInfoDetail The job info detail.
364+
* @return True if the job should be auto-restarted, false otherwise.
365+
*/
366+
private static boolean shouldAutoRestart(JobInstance jobInstance, JobInfoDetail jobInfoDetail) {
367+
String status = jobInstance.getStatus();
368+
// 只对FAILED和UNKNOWN状态进行自动重启
369+
if (!JobStatus.FAILED.getValue().equals(status)
370+
&& !JobStatus.UNKNOWN.getValue().equals(status)) {
371+
return false;
372+
}
373+
374+
// 检查任务配置中是否启用了自动重启
375+
try {
376+
History history = jobInfoDetail.getHistory();
377+
if (Asserts.isNull(history) || Asserts.isNull(history.getConfigJson())) {
378+
return false;
379+
}
380+
381+
JobConfig jobConfig = history.getConfigJson();
382+
Boolean autoRestart = jobConfig.getAutoRestart();
383+
return Boolean.TRUE.equals(autoRestart);
384+
} catch (Exception e) {
385+
log.warn("Failed to check auto restart config for job {}: {}", jobInstance.getId(), e.getMessage());
386+
return false;
387+
}
388+
}
389+
390+
/**
391+
* Try to auto-restart the job from the latest checkpoint.
392+
*
393+
* @param jobInstance The job instance.
394+
* @param jobInfoDetail The job info detail.
395+
*/
396+
private static void tryAutoRestart(JobInstance jobInstance, JobInfoDetail jobInfoDetail) {
397+
if (Asserts.isNull(jobInstance.getTaskId())) {
398+
log.warn("Cannot auto restart job {}: taskId is null", jobInstance.getId());
399+
return;
400+
}
401+
402+
try {
403+
// 获取最新的checkpoint路径
404+
String checkpointPath = getLatestCheckpointPath(jobInfoDetail.getJobDataDto());
405+
log.info("Auto restarting job {} from checkpoint: {}", jobInstance.getId(), checkpointPath);
406+
taskService.restartTask(jobInstance.getTaskId(), checkpointPath);
407+
log.info("Auto restart job {} triggered successfully", jobInstance.getId());
408+
} catch (Exception e) {
409+
log.error("Failed to auto restart job {}: {}", jobInstance.getId(), e.getMessage(), e);
410+
}
411+
}
412+
413+
/**
414+
* Get the latest checkpoint path from JobDataDto.
415+
*
416+
* @param jobDataDto The job data DTO.
417+
* @return The latest checkpoint path, or null if not found.
418+
*/
419+
private static String getLatestCheckpointPath(JobDataDto jobDataDto) {
420+
if (Asserts.isNull(jobDataDto) || Asserts.isNull(jobDataDto.getCheckpoints())) {
421+
return null;
422+
}
423+
424+
CheckPointOverView checkpoints = jobDataDto.getCheckpoints();
425+
CheckPointOverView.LatestCheckpoints latestCheckpoints = checkpoints.getLatestCheckpoints();
426+
if (Asserts.isNull(latestCheckpoints)) {
427+
return null;
428+
}
429+
430+
// 优先使用completed checkpoint
431+
CheckPointOverView.CompletedCheckpointStatistics completedCheckpoint =
432+
latestCheckpoints.getCompletedCheckpointStatistics();
433+
if (Asserts.isNotNull(completedCheckpoint) && Asserts.isNotNullString(completedCheckpoint.getExternalPath())) {
434+
return completedCheckpoint.getExternalPath();
435+
}
436+
437+
// 如果没有completed checkpoint,尝试使用savepoint
438+
CheckPointOverView.CompletedCheckpointStatistics savepointStatistics =
439+
latestCheckpoints.getSavepointStatistics();
440+
if (Asserts.isNotNull(savepointStatistics) && Asserts.isNotNullString(savepointStatistics.getExternalPath())) {
441+
return savepointStatistics.getExternalPath();
442+
}
443+
444+
return null;
445+
}
446+
349447
/**
350448
* In a YARN cluster with HA mode enabled,
351449
* if the jobManagerHost cannot be connected,

dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ public String buildEnvSql(AbstractStatementDTO task) {
326326
sql += envTask.getStatement() + CommonConstant.LineSep;
327327
}
328328
}
329-
log.info("Initializing data permissions...");
330-
userService.buildRowPermission();
329+
/*log.info("Initializing data permissions...");
330+
userService.buildRowPermission();*/
331331
log.info("Finish initialize FlinkSQLEnv.");
332332
return sql;
333333
}

dinky-admin/src/main/resources/db/migration/h2/V20250301.1.3.0__release.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,6 @@ insert into `dinky_sys_menu` (`id`, `parent_id`, `name`, `path`, `component`, `p
3838
values (178, 177, '编辑', '/settings/globalsetting/approval/edit', null, 'settings:globalsetting:approval:edit',
3939
'EditOutlined', 'F', 0, 171, '2024-12-30 23:45:30', '2024-12-30 23:45:30', null);
4040

41+
ALTER TABLE `dinky_task` ADD COLUMN `auto_restart` tinyint(1) DEFAULT 0 COMMENT '是否自动重启,当作业失败或UNKNOWN时自动从最新checkpoint重启';
42+
4143
SET FOREIGN_KEY_CHECKS = 1;

dinky-admin/src/main/resources/db/migration/mysql/V20250301.1.3.0__release.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,7 @@ insert into `dinky_sys_menu` (`id`, `parent_id`, `name`, `path`, `component`, `p
7070
values (178, 177, '编辑', '/settings/globalsetting/approval/edit', null, 'settings:globalsetting:approval:edit',
7171
'EditOutlined', 'F', 0, 171, '2024-12-30 23:45:30', '2024-12-30 23:45:30', null);
7272

73+
ALTER TABLE `dinky_task` ADD COLUMN `auto_restart` tinyint(1) NULL DEFAULT 0 COMMENT '是否自动重启,当作业失败或UNKNOWN时自动从最新checkpoint重启';
74+
7375
SET
7476
FOREIGN_KEY_CHECKS = 1;

dinky-admin/src/main/resources/db/migration/postgresql/V20250301.1.3.0__release.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,6 @@ insert into public.dinky_sys_menu (id, parent_id, name, path, component, perms,
7979
order_num, create_time, update_time, note)
8080
values (178, 177, '编辑', '/settings/globalsetting/approval/edit', null, 'settings:globalsetting:approval:edit',
8181
'EditOutlined', 'F', 0, 171, '2024-12-30 23:45:30', '2024-12-30 23:45:30', null);
82+
83+
ALTER TABLE public.dinky_task ADD COLUMN IF NOT EXISTS auto_restart boolean DEFAULT FALSE;
84+
COMMENT ON COLUMN public.dinky_task.auto_restart IS '是否自动重启,当作业失败或UNKNOWN时自动从最新checkpoint重启';

dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,10 @@ public static boolean isDone(String value) {
109109
}
110110
}
111111

112-
public static boolean isTransition(String value) {
112+
public static boolean isTransition(String value, Long endTime) {
113113
switch (get(value)) {
114114
case FAILED:
115+
return endTime == -1;
115116
case RECONNECTING:
116117
return true;
117118
default:

0 commit comments

Comments
 (0)