Skip to content

Commit 492655c

Browse files
committed
handle condition on job
1 parent 94526fe commit 492655c

File tree

4 files changed

+58
-24
lines changed

4 files changed

+58
-24
lines changed

core/src/main/java/com/flowci/core/job/manager/CmdManagerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public CmdIn createShellCmd(Job job, Step step, NodeTree tree) {
5656
.setId(step.getId())
5757
.setFlowId(job.getFlowId())
5858
.setJobId(job.getId())
59+
.setCondition(node.getCondition())
5960
.setAllowFailure(node.isAllowFailure())
6061
.setDockers(node.getDockers())
6162
.setTimeout(job.getTimeout());

core/src/main/java/com/flowci/core/job/manager/JobActionManagerImpl.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.flowci.core.common.manager.SpringEventManager;
88
import com.flowci.core.common.rabbit.RabbitOperations;
99
import com.flowci.core.job.dao.JobDao;
10+
import com.flowci.core.job.domain.Executed;
1011
import com.flowci.core.job.domain.Job;
1112
import com.flowci.core.job.domain.Step;
1213
import com.flowci.core.job.event.JobReceivedEvent;
@@ -29,6 +30,7 @@
2930
import com.flowci.zookeeper.InterLock;
3031
import com.flowci.zookeeper.ZookeeperClient;
3132
import com.google.common.base.Strings;
33+
import groovy.util.ScriptException;
3234
import lombok.Getter;
3335
import lombok.Setter;
3436
import lombok.experimental.Accessors;
@@ -42,10 +44,7 @@
4244
import java.nio.file.Files;
4345
import java.nio.file.Path;
4446
import java.nio.file.Paths;
45-
import java.util.Base64;
46-
import java.util.List;
47-
import java.util.Objects;
48-
import java.util.Optional;
47+
import java.util.*;
4948
import java.util.function.Consumer;
5049
import java.util.function.Function;
5150

@@ -120,6 +119,9 @@ public class JobActionManagerImpl implements JobActionManager {
120119
@Autowired
121120
private RabbitOperations jobsQueueManager;
122121

122+
@Autowired
123+
private ConditionManager conditionManager;
124+
123125
@Autowired
124126
private LocalTaskService localTaskService;
125127

@@ -328,7 +330,7 @@ public void accept(JobSmContext context) {
328330

329331
Sm.add(QueuedToRunning, new Action<JobSmContext>() {
330332
@Override
331-
public void accept(JobSmContext context) {
333+
public void accept(JobSmContext context) throws Exception {
332334
Job job = context.job;
333335
eventManager.publish(new JobReceivedEvent(this, job));
334336

@@ -387,7 +389,7 @@ public boolean canRun(JobSmContext context) {
387389
}
388390

389391
@Override
390-
public void accept(JobSmContext context) {
392+
public void accept(JobSmContext context) throws Exception {
391393
Job job = context.job;
392394
log.debug("Job {} is locked", job.getId());
393395

@@ -618,7 +620,7 @@ private SimpleSecret getSimpleSecret(String credentialName) {
618620
/**
619621
* Dispatch job to agent when Queue to Running
620622
*/
621-
private void dispatch(Job job, Agent agent) {
623+
private void dispatch(Job job, Agent agent) throws ScriptException {
622624
NodeTree tree = ymlManager.getTree(job);
623625
StepNode next = tree.next(NodePath.create(job.getCurrentPath()));
624626

@@ -636,13 +638,30 @@ private void dispatch(Job job, Agent agent) {
636638
job.setAgentId(agent.getId());
637639
job.setAgentSnapshot(agent);
638640

639-
// set executed cmd step to running
640-
Step nextStep = stepService.toStatus(job.getId(), nextPath, Step.Status.RUNNING, null);
641-
642641
// dispatch job to agent queue
642+
Step nextStep = stepService.get(job.getId(), nextPath);
643643
CmdIn cmd = cmdManager.createShellCmd(job, nextStep, tree);
644-
setJobStatusAndSave(job, Job.Status.RUNNING, null);
644+
boolean canExecute = conditionManager.run(cmd);
645+
646+
if (!canExecute) {
647+
nextStep.setStartAt(new Date());
648+
nextStep.setFinishAt(new Date());
649+
updateJobTime(job, nextStep, tree, next);
650+
651+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
652+
stepService.toStatus(nextStep, Executed.Status.SKIPPED, null);
653+
654+
JobSmContext context = new JobSmContext();
655+
context.job = job;
656+
context.step = nextStep;
657+
context.agentId = agent.getId();
658+
659+
this.toNextStep(context);
660+
return;
661+
}
645662

663+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
664+
stepService.toStatus(nextStep, Executed.Status.RUNNING, null);
646665
agentService.dispatch(cmd, agent);
647666
logInfo(job, "send to agent: step={}, agent={}", next.getName(), agent.getName());
648667
}
@@ -652,9 +671,9 @@ private void dispatch(Job job, Agent agent) {
652671
*
653672
* @return true if next step dispatched, false if no more steps or failure
654673
*/
655-
private boolean toNextStep(JobSmContext context) {
674+
private boolean toNextStep(JobSmContext context) throws ScriptException {
656675
Job job = context.job;
657-
Step step = context.step;
676+
Step step = context.step; // current step
658677

659678
// save executed cmd
660679
stepService.resultUpdate(step);
@@ -677,15 +696,29 @@ private boolean toNextStep(JobSmContext context) {
677696
Optional<StepNode> next = findNext(tree, node, step.isSuccess());
678697
if (next.isPresent()) {
679698
String nextPath = next.get().getPathAsString();
680-
job.setCurrentPath(nextPath);
681-
682-
Step nextStep = stepService.toStatus(job.getId(), nextPath, Step.Status.RUNNING, null);
699+
Step nextStep = stepService.get(job.getId(), nextPath);
683700
context.setStep(nextStep);
684701

702+
job.setCurrentPath(nextPath);
703+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
704+
685705
Agent agent = agentService.get(job.getAgentId());
686706
CmdIn cmd = cmdManager.createShellCmd(job, nextStep, tree);
687-
setJobStatusAndSave(job, Job.Status.RUNNING, null);
707+
boolean canExecute = conditionManager.run(cmd);
708+
709+
if (!canExecute) {
710+
nextStep.setStartAt(new Date());
711+
nextStep.setFinishAt(new Date());
712+
updateJobTime(job, nextStep, tree, next.get());
688713

714+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
715+
stepService.toStatus(nextStep, Step.Status.SKIPPED, null);
716+
return toNextStep(context);
717+
}
718+
719+
// TODO: run condition
720+
setJobStatusAndSave(job, Job.Status.RUNNING, null);
721+
stepService.toStatus(nextStep, Step.Status.RUNNING, null);
689722
agentService.dispatch(cmd, agent);
690723
logInfo(job, "send to agent: step={}, agent={}", next.get().getName(), agent.getName());
691724
return true;
@@ -722,11 +755,11 @@ private synchronized void setJobStatusAndSave(Job job, Job.Status newStatus, Str
722755
logInfo(job, "status = {}", job.getStatus());
723756
}
724757

725-
private void updateJobTime(Job job, Step execCmd, NodeTree tree, StepNode node) {
758+
private void updateJobTime(Job job, Step step, NodeTree tree, StepNode node) {
726759
if (tree.isFirst(node.getPath())) {
727-
job.setStartAt(execCmd.getStartAt());
760+
job.setStartAt(step.getStartAt());
728761
}
729-
job.setFinishAt(execCmd.getFinishAt());
762+
job.setFinishAt(step.getFinishAt());
730763
}
731764

732765
private void updateJobContextAndLatestStatus(Job job, StepNode node, Step cmd) {

core/src/test/resources/flow-with-before.yml renamed to core/src/test/resources/flow-with-condition.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ steps:
1010
- envs:
1111
FLOW_WORKSPACE: "echo step"
1212
FLOW_VERSION: "echo step version"
13-
before: |
13+
condition: |
1414
return false
1515
script: |
1616
echo hello

sm/src/main/java/com/flowci/sm/Action.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package com.flowci.sm;
22

3-
import java.util.function.Consumer;
4-
5-
public abstract class Action<T extends Context> implements Consumer<T> {
3+
public abstract class Action<T extends Context> {
64

75
public boolean canRun(T context) {
86
return true;
97
}
108

9+
public abstract void accept(T t) throws Exception;
10+
1111
public void onException(Throwable e, T context) {
1212
// ignore by default
1313
}

0 commit comments

Comments
 (0)