Skip to content

Commit 92c1406

Browse files
committed
refactor and send task status change event
1 parent caa57ad commit 92c1406

File tree

6 files changed

+73
-30
lines changed

6 files changed

+73
-30
lines changed

core/src/main/java/com/flowci/core/job/domain/Executed.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ public interface Executed {
1919
Status.SKIPPED
2020
);
2121

22+
Set<Status> FinishStatus = ImmutableSet.of(
23+
Status.SUCCESS,
24+
Status.SKIPPED,
25+
Status.EXCEPTION,
26+
Status.KILLED,
27+
Status.TIMEOUT
28+
);
29+
2230
enum Status {
2331

2432
PENDING(-1),
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.flowci.core.job.event;
2+
3+
import com.flowci.core.job.domain.ExecutedLocalTask;
4+
import lombok.Getter;
5+
import org.springframework.context.ApplicationEvent;
6+
7+
import java.util.List;
8+
9+
@Getter
10+
public class TaskStatusChangeEvent extends ApplicationEvent {
11+
12+
private final String jobId;
13+
14+
private final List<ExecutedLocalTask> tasks;
15+
16+
public TaskStatusChangeEvent(Object source, String jobId, List<ExecutedLocalTask> tasks) {
17+
super(source);
18+
this.jobId = jobId;
19+
this.tasks = tasks;
20+
}
21+
}

core/src/main/java/com/flowci/core/job/service/JobActionServiceImpl.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -804,11 +804,7 @@ private void releaseLock(InterLock lock, String jobId) {
804804
private Consumer<JobSmContext> notificationConsumer() {
805805
return context -> {
806806
Job job = context.job;
807-
NodeTree tree = ymlManager.getTree(job);
808-
809-
for (LocalTask t : tree.getRoot().getNotifications()) {
810-
localTaskService.executeAsync(job, t);
811-
}
807+
localTaskService.executeAsync(job);
812808
};
813809
}
814810

core/src/main/java/com/flowci/core/job/service/LocalTaskService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public interface LocalTaskService {
1212

1313
List<ExecutedLocalTask> list(Job job);
1414

15-
void executeAsync(Job job, LocalTask task);
15+
void executeAsync(Job job);
1616

1717
ExecutedLocalTask execute(Job job, LocalTask task);
1818
}

core/src/main/java/com/flowci/core/job/service/LocalTaskServiceImpl.java

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.flowci.core.job.domain.Executed;
66
import com.flowci.core.job.domain.ExecutedLocalTask;
77
import com.flowci.core.job.domain.Job;
8+
import com.flowci.core.job.event.TaskStatusChangeEvent;
89
import com.flowci.core.job.manager.DockerManager;
910
import com.flowci.core.job.manager.YmlManager;
1011
import com.flowci.core.plugin.domain.Plugin;
@@ -72,8 +73,13 @@ public List<ExecutedLocalTask> list(Job job) {
7273
}
7374

7475
@Override
75-
public void executeAsync(Job job, LocalTask task) {
76-
localTaskExecutor.execute(() -> execute(job, task));
76+
public void executeAsync(Job job) {
77+
localTaskExecutor.execute(() -> {
78+
NodeTree tree = ymlManager.getTree(job);
79+
for (LocalTask t : tree.getRoot().getNotifications()) {
80+
execute(job, t);
81+
}
82+
});
7783
}
7884

7985
@Override
@@ -86,9 +92,7 @@ public ExecutedLocalTask execute(Job job, LocalTask task) {
8692
}
8793

8894
ExecutedLocalTask exec = optional.get();
89-
exec.setStartAt(new Date());
90-
exec.setStatus(Executed.Status.RUNNING);
91-
executedLocalTaskDao.save(exec);
95+
updateStatusTimeAndSave(exec, Executed.Status.RUNNING, null);
9296

9397
DockerManager.Option option = new DockerManager.Option();
9498
option.setImage(DefaultImage);
@@ -101,11 +105,7 @@ public ExecutedLocalTask execute(Job job, LocalTask task) {
101105
if (event.hasError()) {
102106
String message = event.getError().getMessage();
103107
log.warn(message);
104-
105-
exec.setError(message);
106-
exec.setStatus(Executed.Status.EXCEPTION);
107-
exec.setFinishAt(new Date());
108-
executedLocalTaskDao.save(exec);
108+
updateStatusTimeAndSave(exec, Executed.Status.EXCEPTION, message);;
109109
return exec;
110110
}
111111

@@ -121,12 +121,41 @@ public ExecutedLocalTask execute(Job job, LocalTask task) {
121121
});
122122
}
123123

124-
log.info("Start local task {} image = {} for job {}", task.getPlugin(), option.getImage(), job.getId());
125-
executedLocalTaskDao.save(runDockerTask(option, exec));
124+
try {
125+
log.info("Start local task {} image = {} for job {}", task.getPlugin(), option.getImage(), job.getId());
126+
runDockerTask(option, exec);
127+
updateStatusTimeAndSave(exec, Executed.Status.SUCCESS, null);
128+
} catch (Exception e) {
129+
log.warn(e.getMessage());
130+
updateStatusTimeAndSave(exec, Executed.Status.EXCEPTION, e.getMessage());
131+
}
132+
126133
return exec;
127134
}
128135

129-
private ExecutedLocalTask runDockerTask(DockerManager.Option option, ExecutedLocalTask r) {
136+
private void updateStatusTimeAndSave(ExecutedLocalTask t, Executed.Status status, String error) {
137+
if (t.getStatus() == status) {
138+
return;
139+
}
140+
141+
if (Executed.Status.RUNNING == status) {
142+
t.setStartAt(new Date());
143+
}
144+
145+
if (Executed.FinishStatus.contains(status)) {
146+
t.setFinishAt(new Date());
147+
}
148+
149+
t.setStatus(status);
150+
t.setError(error);
151+
executedLocalTaskDao.save(t);
152+
153+
String jobId = t.getJobId();
154+
List<ExecutedLocalTask> list = executedLocalTaskDao.findAllByJobId(jobId);
155+
eventManager.publish(new TaskStatusChangeEvent(this, t.getJobId(), list));
156+
}
157+
158+
private void runDockerTask(DockerManager.Option option, ExecutedLocalTask r) throws InterruptedException, RuntimeException {
130159
try {
131160
String image = option.getImage();
132161
boolean isSuccess = dockerManager.pullImage(image);
@@ -143,19 +172,10 @@ private ExecutedLocalTask runDockerTask(DockerManager.Option option, ExecutedLoc
143172
}
144173

145174
r.setCode(dockerManager.getContainerExitCode(cid));
146-
} catch (InterruptedException | RuntimeException e) {
147-
log.warn(e.getMessage());
148-
r.setError(e.getMessage());
149-
r.setStatus(Executed.Status.EXCEPTION);
150175
} finally {
151-
r.setStatus(Executed.Status.SUCCESS);
152-
r.setFinishAt(new Date());
153-
154176
if (r.hasContainerId()) {
155177
dockerManager.removeContainer(r.getContainerId());
156178
}
157179
}
158-
159-
return r;
160180
}
161181
}

core/src/main/java/com/flowci/core/job/service/StepServiceImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ public ExecutedCmd toStatus(ExecutedCmd entity, Executed.Status status, String e
135135
return entity;
136136
}
137137

138-
// TODO: status check
139-
140138
entity.setStatus(status);
141139
entity.setError(err);
142140
executedCmdDao.save(entity);

0 commit comments

Comments
 (0)