Skip to content

Commit caa57ad

Browse files
committed
refactor clean up notification and local docker task bean
1 parent e89bbc6 commit caa57ad

File tree

17 files changed

+134
-179
lines changed

17 files changed

+134
-179
lines changed

core/src/main/java/com/flowci/core/flow/domain/SaveNotify.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

core/src/main/java/com/flowci/core/flow/service/YmlServiceImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.flowci.core.flow.domain.Yml;
2424
import com.flowci.core.flow.event.FlowDeletedEvent;
2525
import com.flowci.core.plugin.event.GetPluginEvent;
26-
import com.flowci.domain.Notification;
26+
import com.flowci.domain.LocalTask;
2727
import com.flowci.domain.Vars;
2828
import com.flowci.exception.ArgumentException;
2929
import com.flowci.exception.NotFoundException;
@@ -146,8 +146,8 @@ private Optional<RuntimeException> verifyPlugins(FlowNode flowNode) {
146146
}
147147
}
148148

149-
for (Notification n : flowNode.getNotifications()) {
150-
plugins.add(n.getPlugin());
149+
for (LocalTask t : flowNode.getNotifications()) {
150+
plugins.add(t.getPlugin());
151151
}
152152

153153
for (String plugin : plugins) {

core/src/main/java/com/flowci/core/job/domain/LocalDockerTask.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

core/src/main/java/com/flowci/core/job/domain/LocalTask.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

core/src/main/java/com/flowci/core/job/event/StartAsyncLocalTaskEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.flowci.core.job.event;
22

33
import com.flowci.core.common.event.AbstractSyncEvent;
4-
import com.flowci.core.job.domain.LocalTask;
4+
import com.flowci.domain.LocalTask;
55
import lombok.Getter;
66

77
/**

core/src/main/java/com/flowci/core/job/manager/DockerManager.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import com.flowci.core.api.adviser.ApiAuth;
44
import com.flowci.core.common.domain.Variables;
55
import com.flowci.core.common.helper.ThreadHelper;
6-
import com.flowci.core.job.domain.LocalDockerTask;
76
import com.flowci.domain.StringVars;
7+
import com.flowci.domain.Vars;
8+
import com.flowci.util.StringHelper;
89
import com.github.dockerjava.api.DockerClient;
910
import com.github.dockerjava.api.async.ResultCallback;
1011
import com.github.dockerjava.api.command.CreateContainerCmd;
1112
import com.github.dockerjava.api.command.CreateContainerResponse;
1213
import com.github.dockerjava.api.command.InspectContainerResponse;
1314
import com.github.dockerjava.api.model.*;
15+
import lombok.Getter;
16+
import lombok.Setter;
1417
import lombok.extern.log4j.Log4j2;
1518
import org.springframework.beans.factory.annotation.Autowired;
1619
import org.springframework.stereotype.Component;
@@ -58,20 +61,20 @@ public boolean pullImage(String image) throws InterruptedException {
5861
return callback.isSuccess;
5962
}
6063

61-
public String createAndStartContainer(LocalDockerTask task) {
64+
public String createAndStartContainer(Option option) {
6265
StringVars defaultEnv = new StringVars(4);
6366
defaultEnv.put(Variables.App.Url, serverUrl);
6467
defaultEnv.put(Variables.Agent.Token, ApiAuth.LocalTaskToken);
6568
defaultEnv.put(Variables.Agent.Workspace, "/ws/");
6669
defaultEnv.put(Variables.Agent.PluginDir, "/ws/.plugins");
6770

68-
CreateContainerCmd createContainerCmd = client.createContainerCmd(task.getImage())
69-
.withEnv(defaultEnv.merge(task.getInputs()).toList())
70-
.withCmd("/bin/bash", "-c", task.getScript());
71+
CreateContainerCmd createContainerCmd = client.createContainerCmd(option.image)
72+
.withEnv(defaultEnv.merge(option.inputs).toList())
73+
.withCmd("/bin/bash", "-c", option.script);
7174

72-
if (task.hasPlugin()) {
75+
if (option.hasPlugin()) {
7376
createContainerCmd.withBinds(
74-
new Bind(task.getPluginDir().toString(), new Volume("/ws/.plugins/" + task.getPlugin())));
77+
new Bind(option.pluginDir, new Volume("/ws/.plugins/" + option.plugin)));
7578
}
7679

7780
CreateContainerResponse container = createContainerCmd.exec();
@@ -132,6 +135,25 @@ public void removeContainer(String cid) {
132135
log.debug("Container {} been removed", cid);
133136
}
134137

138+
@Getter
139+
@Setter
140+
public static final class Option {
141+
142+
private String image;
143+
144+
private Vars<String> inputs;
145+
146+
private String script;
147+
148+
private String plugin;
149+
150+
private String pluginDir;
151+
152+
private boolean hasPlugin() {
153+
return StringHelper.hasValue(plugin) && StringHelper.hasValue(pluginDir) ;
154+
}
155+
}
156+
135157
private static abstract class DockerCallback<T> implements ResultCallback<T> {
136158

137159
protected CountDownLatch counter = new CountDownLatch(1);

core/src/main/java/com/flowci/core/job/service/JobActionServiceImpl.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.flowci.core.job.dao.JobDao;
99
import com.flowci.core.job.domain.ExecutedCmd;
1010
import com.flowci.core.job.domain.Job;
11-
import com.flowci.core.job.domain.LocalDockerTask;
1211
import com.flowci.core.job.event.JobReceivedEvent;
1312
import com.flowci.core.job.event.JobStatusChangeEvent;
1413
import com.flowci.core.job.manager.CmdManager;
@@ -807,21 +806,8 @@ private Consumer<JobSmContext> notificationConsumer() {
807806
Job job = context.job;
808807
NodeTree tree = ymlManager.getTree(job);
809808

810-
for (Notification n : tree.getRoot().getNotifications()) {
811-
if (!n.isEnabled()) {
812-
continue;
813-
}
814-
815-
StringVars input = new StringVars(job.getContext());
816-
input.merge(n.getInputs());
817-
818-
LocalDockerTask task = new LocalDockerTask();
819-
task.setName(n.getPlugin()); // plugin name as task name
820-
task.setPlugin(n.getPlugin());
821-
task.setJobId(job.getId());
822-
task.setInputs(input);
823-
824-
localTaskService.executeAsync(task);
809+
for (LocalTask t : tree.getRoot().getNotifications()) {
810+
localTaskService.executeAsync(job, t);
825811
}
826812
};
827813
}

core/src/main/java/com/flowci/core/job/service/LocalTaskService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.flowci.core.job.domain.ExecutedLocalTask;
44
import com.flowci.core.job.domain.Job;
5-
import com.flowci.core.job.domain.LocalDockerTask;
5+
import com.flowci.domain.LocalTask;
66

77
import java.util.List;
88

@@ -12,7 +12,7 @@ public interface LocalTaskService {
1212

1313
List<ExecutedLocalTask> list(Job job);
1414

15-
void executeAsync(LocalDockerTask task);
15+
void executeAsync(Job job, LocalTask task);
1616

17-
ExecutedLocalTask execute(LocalDockerTask task);
17+
ExecutedLocalTask execute(Job job, LocalTask task);
1818
}

core/src/main/java/com/flowci/core/job/service/LocalTaskServiceImpl.java

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
import com.flowci.core.job.domain.Executed;
66
import com.flowci.core.job.domain.ExecutedLocalTask;
77
import com.flowci.core.job.domain.Job;
8-
import com.flowci.core.job.domain.LocalDockerTask;
98
import com.flowci.core.job.manager.DockerManager;
109
import com.flowci.core.job.manager.YmlManager;
1110
import com.flowci.core.plugin.domain.Plugin;
1211
import com.flowci.core.plugin.event.GetPluginAndVerifySetContext;
1312
import com.flowci.core.plugin.event.GetPluginEvent;
14-
import com.flowci.domain.Notification;
13+
import com.flowci.domain.LocalTask;
14+
import com.flowci.domain.StringVars;
1515
import com.flowci.exception.NotAvailableException;
1616
import com.flowci.exception.StatusException;
1717
import com.flowci.tree.NodeTree;
@@ -30,6 +30,9 @@
3030
@Service
3131
public class LocalTaskServiceImpl implements LocalTaskService {
3232

33+
private final static String DefaultImage = "flowci/plugin-runtime";
34+
private final static int DefaultTimeout = 60; // seconds
35+
3336
@Autowired
3437
private ExecutedLocalTaskDao executedLocalTaskDao;
3538

@@ -48,19 +51,15 @@ public class LocalTaskServiceImpl implements LocalTaskService {
4851
@Override
4952
public void init(Job job) {
5053
NodeTree tree = ymlManager.getTree(job);
51-
List<Notification> list = tree.getRoot().getNotifications();
54+
List<LocalTask> list = tree.getRoot().getNotifications();
5255
if (list.isEmpty()) {
5356
return;
5457
}
5558

5659
List<ExecutedLocalTask> tasks = new ArrayList<>(list.size());
57-
for (Notification n : list) {
58-
if (!n.isEnabled()) {
59-
continue;
60-
}
61-
60+
for (LocalTask n : list) {
6261
ExecutedLocalTask t = new ExecutedLocalTask();
63-
t.setName(n.getPlugin());
62+
t.setName(n.getPlugin()); // name is plugin name
6463
t.setJobId(job.getId());
6564
tasks.add(t);
6665
}
@@ -73,14 +72,14 @@ public List<ExecutedLocalTask> list(Job job) {
7372
}
7473

7574
@Override
76-
public void executeAsync(LocalDockerTask task) {
77-
localTaskExecutor.execute(() -> execute(task));
75+
public void executeAsync(Job job, LocalTask task) {
76+
localTaskExecutor.execute(() -> execute(job, task));
7877
}
7978

8079
@Override
81-
public ExecutedLocalTask execute(LocalDockerTask task) {
80+
public ExecutedLocalTask execute(Job job, LocalTask task) {
8281
Optional<ExecutedLocalTask> optional =
83-
executedLocalTaskDao.findByJobIdAndAndName(task.getJobId(), task.getName());
82+
executedLocalTaskDao.findByJobIdAndAndName(job.getId(), task.getPlugin());
8483

8584
if (!optional.isPresent()) {
8685
throw new StatusException("Executed local task should be init() before execute");
@@ -91,47 +90,55 @@ public ExecutedLocalTask execute(LocalDockerTask task) {
9190
exec.setStatus(Executed.Status.RUNNING);
9291
executedLocalTaskDao.save(exec);
9392

93+
DockerManager.Option option = new DockerManager.Option();
94+
option.setImage(DefaultImage);
95+
option.setInputs(new StringVars(job.getContext()).merge(task.getEnvs()));
96+
9497
if (task.hasPlugin()) {
9598
String name = task.getPlugin();
96-
GetPluginEvent event = eventManager.publish(new GetPluginAndVerifySetContext(this, name, task.getInputs()));
99+
GetPluginEvent event = eventManager.publish(new GetPluginAndVerifySetContext(this, name, option.getInputs()));
97100

98101
if (event.hasError()) {
99-
exec.setError(event.getError().getMessage());
102+
String message = event.getError().getMessage();
103+
log.warn(message);
104+
105+
exec.setError(message);
100106
exec.setStatus(Executed.Status.EXCEPTION);
101107
exec.setFinishAt(new Date());
102108
executedLocalTaskDao.save(exec);
103109
return exec;
104110
}
105111

106112
Plugin plugin = event.getFetched();
107-
task.setScript(plugin.getScript());
108-
task.setPluginDir(event.getDir());
113+
114+
option.setScript(plugin.getScript());
115+
option.setPlugin(plugin.getName());
116+
option.setPluginDir(event.getDir().toString());
109117

110118
// apply docker image only from plugin if it's specified
111119
ObjectsHelper.ifNotNull(plugin.getDocker(), (docker) -> {
112-
task.setImage(plugin.getDocker().getImage());
120+
option.setImage(plugin.getDocker().getImage());
113121
});
114122
}
115123

116-
log.info("Start local task {} image = {} for job {}", task.getName(), task.getImage(), task.getJobId());
117-
executedLocalTaskDao.save(runDockerTask(task, exec));
124+
log.info("Start local task {} image = {} for job {}", task.getPlugin(), option.getImage(), job.getId());
125+
executedLocalTaskDao.save(runDockerTask(option, exec));
118126
return exec;
119127
}
120128

121-
private ExecutedLocalTask runDockerTask(LocalDockerTask task, ExecutedLocalTask r) {
129+
private ExecutedLocalTask runDockerTask(DockerManager.Option option, ExecutedLocalTask r) {
122130
try {
123-
String image = task.getImage();
124-
131+
String image = option.getImage();
125132
boolean isSuccess = dockerManager.pullImage(image);
126133
if (!isSuccess) {
127134
throw new NotAvailableException("Docker image {0} not available", image);
128135
}
129136

130-
String cid = dockerManager.createAndStartContainer(task);
137+
String cid = dockerManager.createAndStartContainer(option);
131138
r.setContainerId(cid);
132139
dockerManager.printContainerLog(cid);
133140

134-
if (!dockerManager.waitContainer(cid, task.getTimeoutInSecond())) {
141+
if (!dockerManager.waitContainer(cid, DefaultTimeout)) {
135142
dockerManager.killContainer(cid);
136143
}
137144

core/src/test/java/com/flowci/core/test/job/JobServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public void should_finish_whole_job() throws InterruptedException, IOException {
225225
Mockito.doAnswer((Answer<Void>) invocation -> {
226226
localTaskCountDown.countDown();
227227
return null;
228-
}).when(localTaskManager).executeAsync(Mockito.any());
228+
}).when(localTaskManager).executeAsync(Mockito.any(), Mockito.any());
229229

230230
addEventListener((ApplicationListener<StartAsyncLocalTaskEvent>) event -> localTaskCountDown.countDown());
231231

0 commit comments

Comments
 (0)