Skip to content

Commit 30d4210

Browse files
authored
Merge pull request #360 from FlowCI/feature/1469
Feature/1469
2 parents c5fb5ca + 1c8ca44 commit 30d4210

File tree

21 files changed

+228
-204
lines changed

21 files changed

+228
-204
lines changed

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@
6464
<artifactId>minio</artifactId>
6565
</dependency>
6666

67+
<dependency>
68+
<groupId>org.codehaus.groovy</groupId>
69+
<artifactId>groovy-all</artifactId>
70+
<type>pom</type>
71+
</dependency>
72+
6773
<dependency>
6874
<groupId>com.github.ben-manes.caffeine</groupId>
6975
<artifactId>caffeine</artifactId>

core/src/main/java/com/flowci/core/agent/domain/ShellIn.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.flowci.core.agent.domain;
22

3+
import com.fasterxml.jackson.annotation.JsonIgnore;
34
import com.flowci.domain.DockerOption;
45
import com.flowci.domain.StringVars;
56
import com.flowci.domain.Vars;
7+
import com.flowci.util.StringHelper;
68
import com.google.common.base.Strings;
79
import lombok.Getter;
810
import lombok.Setter;
@@ -31,6 +33,9 @@ public final class ShellIn extends CmdIn {
3133

3234
private List<DockerOption> dockers;
3335

36+
@JsonIgnore
37+
private String condition;
38+
3439
private List<String> scripts = new LinkedList<>();
3540

3641
private int timeout = 1800;
@@ -57,4 +62,9 @@ public void addEnvFilters(Set<String> exports) {
5762
public void addInputs(StringVars vars) {
5863
inputs.putAll(vars);
5964
}
65+
66+
@JsonIgnore
67+
public boolean hasCondition() {
68+
return StringHelper.hasValue(condition);
69+
}
6070
}

core/src/main/java/com/flowci/core/job/config/JobConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.flowci.core.common.config.AppProperties;
2020
import com.flowci.core.common.helper.CacheHelper;
21+
import com.flowci.core.common.helper.ThreadHelper;
2122
import com.flowci.core.job.domain.Step;
2223
import com.flowci.tree.NodeTree;
2324
import com.flowci.util.FileHelper;
@@ -26,11 +27,13 @@
2627
import org.springframework.beans.factory.annotation.Autowired;
2728
import org.springframework.context.annotation.Bean;
2829
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2931

3032
import java.io.IOException;
3133
import java.nio.file.Path;
3234
import java.nio.file.Paths;
3335
import java.util.List;
36+
import java.util.concurrent.ThreadPoolExecutor;
3437

3538
/**
3639
* @author yang
@@ -58,4 +61,9 @@ public Path pluginDir() throws IOException {
5861
Path pluginDir = Paths.get(workspace, "repos");
5962
return FileHelper.createDirectory(pluginDir);
6063
}
64+
65+
@Bean("jobConditionExecutor")
66+
public ThreadPoolTaskExecutor jobConditionExecutor() {
67+
return ThreadHelper.createTaskExecutor(20, 20, 100, "job-cond-");
68+
}
6169
}

core/src/main/java/com/flowci/core/job/domain/Step.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
)
4949
public class Step implements Executed {
5050

51+
public static final String MessageSkippedOnCondition = "Skipped due to condition";
52+
5153
@Id
5254
private String id;
5355

core/src/main/java/com/flowci/core/job/manager/CmdManagerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public CmdIn createShellCmd(Job job, Step step, NodeTree tree) {
5656
.setId(step.getId())
5757
.setFlowId(job.getFlowId())
5858
.setJobId(job.getId())
59+
.setCondition(node.getCondition())
5960
.setAllowFailure(node.isAllowFailure())
6061
.setDockers(node.getDockers())
6162
.setTimeout(job.getTimeout());
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.flowci.core.job.manager;
2+
3+
import com.flowci.core.agent.domain.CmdIn;
4+
import groovy.util.ScriptException;
5+
6+
public interface ConditionManager {
7+
8+
boolean run(CmdIn in) throws ScriptException;
9+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.flowci.core.job.manager;
2+
3+
import com.flowci.core.agent.domain.CmdIn;
4+
import com.flowci.core.agent.domain.ShellIn;
5+
import groovy.lang.Binding;
6+
import groovy.lang.GroovyRuntimeException;
7+
import groovy.lang.GroovyShell;
8+
import groovy.util.ScriptException;
9+
import lombok.extern.log4j.Log4j2;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
12+
import org.springframework.stereotype.Component;
13+
14+
import java.util.concurrent.*;
15+
16+
@Log4j2
17+
@Component
18+
public class ConditionManagerImpl implements ConditionManager {
19+
20+
private final int DefaultTimeout = 2; // seconds
21+
22+
@Autowired
23+
private ThreadPoolTaskExecutor jobConditionExecutor;
24+
25+
@Override
26+
public boolean run(CmdIn in) throws ScriptException {
27+
if (!(in instanceof ShellIn)) {
28+
return true;
29+
}
30+
31+
ShellIn shell = (ShellIn) in;
32+
if (!shell.hasCondition()) {
33+
return true;
34+
}
35+
36+
Future<Boolean> submit = jobConditionExecutor.submit(() -> {
37+
Binding binding = new Binding();
38+
shell.getInputs().forEach(binding::setVariable);
39+
40+
try {
41+
GroovyShell groovy = new GroovyShell(binding);
42+
Object value = groovy.evaluate(shell.getCondition());
43+
if (value instanceof Boolean) {
44+
return (Boolean) value;
45+
}
46+
throw new Exception("The return type is not boolean");
47+
} catch (GroovyRuntimeException e) {
48+
throw new Exception(e.getMessage());
49+
}
50+
});
51+
52+
try {
53+
return submit.get(DefaultTimeout, TimeUnit.SECONDS);
54+
} catch (InterruptedException e) {
55+
throw new ScriptException("Condition script interrupted");
56+
} catch (ExecutionException e) {
57+
throw new ScriptException(e.getMessage());
58+
} catch (TimeoutException e) {
59+
submit.cancel(true);
60+
throw new ScriptException("Condition script timeout");
61+
}
62+
}
63+
}

core/src/main/java/com/flowci/core/job/manager/JobActionManagerImpl.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.flowci.core.common.manager.SpringEventManager;
88
import com.flowci.core.common.rabbit.RabbitOperations;
99
import com.flowci.core.job.dao.JobDao;
10+
import com.flowci.core.job.domain.Executed;
1011
import com.flowci.core.job.domain.Job;
1112
import com.flowci.core.job.domain.Step;
1213
import com.flowci.core.job.event.JobReceivedEvent;
@@ -29,6 +30,7 @@
2930
import com.flowci.zookeeper.InterLock;
3031
import com.flowci.zookeeper.ZookeeperClient;
3132
import com.google.common.base.Strings;
33+
import groovy.util.ScriptException;
3234
import lombok.Getter;
3335
import lombok.Setter;
3436
import lombok.experimental.Accessors;
@@ -42,10 +44,7 @@
4244
import java.nio.file.Files;
4345
import java.nio.file.Path;
4446
import java.nio.file.Paths;
45-
import java.util.Base64;
46-
import java.util.List;
47-
import java.util.Objects;
48-
import java.util.Optional;
47+
import java.util.*;
4948
import java.util.function.Consumer;
5049
import java.util.function.Function;
5150

@@ -120,6 +119,9 @@ public class JobActionManagerImpl implements JobActionManager {
120119
@Autowired
121120
private RabbitOperations jobsQueueManager;
122121

122+
@Autowired
123+
private ConditionManager conditionManager;
124+
123125
@Autowired
124126
private LocalTaskService localTaskService;
125127

@@ -328,7 +330,7 @@ public void accept(JobSmContext context) {
328330

329331
Sm.add(QueuedToRunning, new Action<JobSmContext>() {
330332
@Override
331-
public void accept(JobSmContext context) {
333+
public void accept(JobSmContext context) throws Exception {
332334
Job job = context.job;
333335
eventManager.publish(new JobReceivedEvent(this, job));
334336

@@ -387,7 +389,7 @@ public boolean canRun(JobSmContext context) {
387389
}
388390

389391
@Override
390-
public void accept(JobSmContext context) {
392+
public void accept(JobSmContext context) throws Exception {
391393
Job job = context.job;
392394
log.debug("Job {} is locked", job.getId());
393395

@@ -618,7 +620,7 @@ private SimpleSecret getSimpleSecret(String credentialName) {
618620
/**
619621
* Dispatch job to agent when Queue to Running
620622
*/
621-
private void dispatch(Job job, Agent agent) {
623+
private void dispatch(Job job, Agent agent) throws ScriptException {
622624
NodeTree tree = ymlManager.getTree(job);
623625
StepNode next = tree.next(NodePath.create(job.getCurrentPath()));
624626

@@ -636,13 +638,30 @@ private void dispatch(Job job, Agent agent) {
636638
job.setAgentId(agent.getId());
637639
job.setAgentSnapshot(agent);
638640

639-
// set executed cmd step to running
640-
Step nextStep = stepService.toStatus(job.getId(), nextPath, Step.Status.RUNNING, null);
641-
642641
// dispatch job to agent queue
642+
Step nextStep = stepService.get(job.getId(), nextPath);
643643
CmdIn cmd = cmdManager.createShellCmd(job, nextStep, tree);
644-
setJobStatusAndSave(job, Job.Status.RUNNING, null);
644+
boolean canExecute = conditionManager.run(cmd);
645+
646+
if (!canExecute) {
647+
nextStep.setStartAt(new Date());
648+
nextStep.setFinishAt(new Date());
649+
updateJobTime(job, nextStep, tree, next);
650+
651+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
652+
stepService.toStatus(nextStep, Executed.Status.SKIPPED, Step.MessageSkippedOnCondition);
653+
654+
JobSmContext context = new JobSmContext();
655+
context.job = job;
656+
context.step = nextStep;
657+
context.agentId = agent.getId();
658+
659+
this.toNextStep(context);
660+
return;
661+
}
645662

663+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
664+
stepService.toStatus(nextStep, Executed.Status.RUNNING, null);
646665
agentService.dispatch(cmd, agent);
647666
logInfo(job, "send to agent: step={}, agent={}", next.getName(), agent.getName());
648667
}
@@ -652,9 +671,9 @@ private void dispatch(Job job, Agent agent) {
652671
*
653672
* @return true if next step dispatched, false if no more steps or failure
654673
*/
655-
private boolean toNextStep(JobSmContext context) {
674+
private boolean toNextStep(JobSmContext context) throws ScriptException {
656675
Job job = context.job;
657-
Step step = context.step;
676+
Step step = context.step; // current step
658677

659678
// save executed cmd
660679
stepService.resultUpdate(step);
@@ -677,15 +696,29 @@ private boolean toNextStep(JobSmContext context) {
677696
Optional<StepNode> next = findNext(tree, node, step.isSuccess());
678697
if (next.isPresent()) {
679698
String nextPath = next.get().getPathAsString();
680-
job.setCurrentPath(nextPath);
681-
682-
Step nextStep = stepService.toStatus(job.getId(), nextPath, Step.Status.RUNNING, null);
699+
Step nextStep = stepService.get(job.getId(), nextPath);
683700
context.setStep(nextStep);
684701

702+
job.setCurrentPath(nextPath);
703+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
704+
685705
Agent agent = agentService.get(job.getAgentId());
686706
CmdIn cmd = cmdManager.createShellCmd(job, nextStep, tree);
687-
setJobStatusAndSave(job, Job.Status.RUNNING, null);
707+
boolean canExecute = conditionManager.run(cmd);
708+
709+
if (!canExecute) {
710+
nextStep.setStartAt(new Date());
711+
nextStep.setFinishAt(new Date());
712+
updateJobTime(job, nextStep, tree, next.get());
688713

714+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
715+
stepService.toStatus(nextStep, Step.Status.SKIPPED, Step.MessageSkippedOnCondition);
716+
return toNextStep(context);
717+
}
718+
719+
// TODO: run condition
720+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
721+
stepService.toStatus(nextStep, Step.Status.RUNNING, null);
689722
agentService.dispatch(cmd, agent);
690723
logInfo(job, "send to agent: step={}, agent={}", next.get().getName(), agent.getName());
691724
return true;
@@ -722,11 +755,11 @@ private synchronized void setJobStatusAndSave(Job job, Job.Status newStatus, Str
722755
logInfo(job, "status = {}", job.getStatus());
723756
}
724757

725-
private void updateJobTime(Job job, Step execCmd, NodeTree tree, StepNode node) {
758+
private void updateJobTime(Job job, Step step, NodeTree tree, StepNode node) {
726759
if (tree.isFirst(node.getPath())) {
727-
job.setStartAt(execCmd.getStartAt());
760+
job.setStartAt(step.getStartAt());
728761
}
729-
job.setFinishAt(execCmd.getFinishAt());
762+
job.setFinishAt(step.getFinishAt());
730763
}
731764

732765
private void updateJobContextAndLatestStatus(Job job, StepNode node, Step cmd) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.flowci.core.test.job;
2+
3+
import com.flowci.core.agent.domain.ShellIn;
4+
import com.flowci.core.job.manager.ConditionManager;
5+
import com.flowci.core.test.SpringScenario;
6+
import groovy.util.ScriptException;
7+
import org.junit.Test;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.testng.Assert;
10+
11+
public class ConditionManagerTest extends SpringScenario {
12+
13+
@Autowired
14+
private ConditionManager conditionManager;
15+
16+
@Test
17+
public void should_run_groovy_condition() throws ScriptException {
18+
ShellIn in = new ShellIn();
19+
in.getInputs().put("foo", "helloword");
20+
in.setCondition(
21+
"println \"$foo\"; "
22+
+ "return true;"
23+
);
24+
25+
Assert.assertTrue(conditionManager.run(in));
26+
}
27+
28+
@Test(expected = ScriptException.class)
29+
public void should_throw_exception_if_wrong_return_type() throws ScriptException {
30+
ShellIn in = new ShellIn();
31+
in.getInputs().put("foo", "helloword");
32+
in.setCondition(
33+
"println \"$foo\"; "
34+
+ "hello = \"1234\";"
35+
+ "return hello;"
36+
);
37+
38+
conditionManager.run(in);
39+
}
40+
41+
@Test(expected = ScriptException.class)
42+
public void should_throw_exception_if_timeout() throws ScriptException {
43+
ShellIn in = new ShellIn();
44+
in.getInputs().put("foo", "helloword");
45+
in.setCondition(
46+
"sleep(6000); "
47+
+ "println \"$foo\"; "
48+
+ "hello = \"1234\";"
49+
+ "return true;"
50+
);
51+
52+
conditionManager.run(in);
53+
}
54+
}

core/src/test/resources/flow-with-before.yml renamed to core/src/test/resources/flow-with-condition.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ steps:
1010
- envs:
1111
FLOW_WORKSPACE: "echo step"
1212
FLOW_VERSION: "echo step version"
13-
before: |
13+
condition: |
1414
return false
1515
script: |
1616
echo hello

0 commit comments

Comments
 (0)