Skip to content

Commit 5d12aaf

Browse files
authored
Merge pull request #376 from FlowCI/develop
Develop
2 parents 08929a3 + aa7f1b3 commit 5d12aaf

35 files changed

+376
-217
lines changed

.run/Application - 1.run.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<env name="FLOWCI_SERVER_PORT" value="8080" />
1515
<env name="FLOWCI_TEMPLATES" value="https://raw.githubusercontent.com/FlowCI/templates/master/templates.json" />
1616
<env name="FLOWCI_AGENT_IMAGE" value="flowci/agent:dev" />
17+
<env name="FLOWCI_SERVER_URL" value="http://192.168.0.4:8080" />
1718
</envs>
1819
<method v="2">
1920
<option name="Make" enabled="true" />

core/src/main/java/com/flowci/core/agent/domain/ShellIn.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package com.flowci.core.agent.domain;
22

3-
import com.fasterxml.jackson.annotation.JsonIgnore;
43
import com.flowci.domain.DockerOption;
54
import com.flowci.domain.StringVars;
65
import com.flowci.domain.Vars;
7-
import com.flowci.util.StringHelper;
86
import com.google.common.base.Strings;
97
import lombok.Getter;
108
import lombok.Setter;
@@ -37,14 +35,13 @@ public enum ShellType {
3735

3836
private List<DockerOption> dockers;
3937

40-
@JsonIgnore
41-
private String condition;
42-
4338
private List<String> bash;
4439

4540
private List<String> pwsh;
4641

47-
private int timeout = 1800;
42+
private int timeout = 1800; // from StepNode.timeout
43+
44+
private int retry; // from StepNode.retry
4845

4946
private Vars<String> inputs;
5047

@@ -75,9 +72,4 @@ public void addEnvFilters(Set<String> exports) {
7572
public void addInputs(StringVars vars) {
7673
inputs.putAll(vars);
7774
}
78-
79-
@JsonIgnore
80-
public boolean hasCondition() {
81-
return StringHelper.hasValue(condition);
82-
}
8375
}

core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,19 @@ public <T> void writeMessage(String token, ResponseMessage<T> msg) {
5454

5555
try {
5656
byte[] bytes = objectMapper.writeValueAsBytes(msg);
57-
session.sendMessage(new BinaryMessage(bytes));
57+
writeMessage(session, bytes);
5858
} catch (IOException e) {
5959
log.warn("Unable to write response message for agent {}: {}", token, e.getMessage());
6060
}
6161
}
6262

63-
public void writeMessage(String token, byte[] bytes) throws IOException {
63+
public void writeMessage(String token, byte[] bytes) {
6464
WebSocketSession session = agentSessionStore.get(token);
6565
if (session == null) {
6666
log.warn("Agent {} not connected", token);
6767
return;
6868
}
69-
70-
session.sendMessage(new BinaryMessage(bytes));
69+
writeMessage(session, bytes);
7170
}
7271

7372
@Override
@@ -123,12 +122,22 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
123122
private <T> void writeMessage(WebSocketSession session, ResponseMessage<T> msg) {
124123
try {
125124
byte[] bytes = objectMapper.writeValueAsBytes(msg);
126-
session.sendMessage(new BinaryMessage(bytes));
125+
writeMessage(session, bytes);
127126
} catch (IOException e) {
128127
log.warn(e);
129128
}
130129
}
131130

131+
private void writeMessage(WebSocketSession session, byte[] body) {
132+
synchronized (session) {
133+
try {
134+
session.sendMessage(new BinaryMessage(body));
135+
} catch (IOException e) {
136+
log.warn(e);
137+
}
138+
}
139+
}
140+
132141
private void onConnected(WebSocketSession session, String token, byte[] body) {
133142
try {
134143
AgentInit init = objectMapper.readValue(body, AgentInit.class);

core/src/main/java/com/flowci/core/common/config/AppConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.core.ResolvableType;
3535
import org.springframework.core.task.TaskExecutor;
3636
import org.springframework.scheduling.annotation.EnableScheduling;
37+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
3738

3839
import javax.annotation.PostConstruct;
3940
import java.io.IOException;
@@ -80,8 +81,10 @@ public ObjectMapper objectMapper() {
8081
}
8182

8283
@Bean("appTaskExecutor")
83-
public TaskExecutor getAppTaskExecutor() {
84-
return ThreadHelper.createTaskExecutor(100, 100, 50, "app-task-");
84+
public ThreadPoolTaskExecutor getAppTaskExecutor() {
85+
int corePoolSize = appProperties.getCorePoolSize();
86+
int maxPoolSize = appProperties.getMaxPoolSize();
87+
return ThreadHelper.createTaskExecutor(maxPoolSize, corePoolSize, 10, "app-task-");
8588
}
8689

8790
@Bean(name = "applicationEventMulticaster")

core/src/main/java/com/flowci/core/common/config/AppProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public class AppProperties {
5757

5858
private boolean socketContainer;
5959

60+
private int corePoolSize;
61+
62+
private int maxPoolSize;
63+
6064
@Bean("zkProperties")
6165
@ConfigurationProperties(prefix = "app.zookeeper")
6266
public Zookeeper zk() {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.flowci.core.common.manager;
2+
3+
import com.flowci.domain.Vars;
4+
import groovy.util.ScriptException;
5+
6+
import javax.annotation.Nullable;
7+
8+
public interface ConditionManager {
9+
10+
/**
11+
* Verify the input condition is groovy script with boolean return or not
12+
*/
13+
void verify(@Nullable String condition) throws ScriptException;
14+
15+
boolean run(@Nullable String groovyScript, @Nullable Vars<String> envs) throws ScriptException;
16+
}

core/src/main/java/com/flowci/core/job/manager/ConditionManagerImpl.java renamed to core/src/main/java/com/flowci/core/common/manager/ConditionManagerImpl.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package com.flowci.core.job.manager;
1+
package com.flowci.core.common.manager;
22

3-
import com.flowci.core.agent.domain.CmdIn;
4-
import com.flowci.core.agent.domain.ShellIn;
3+
import com.flowci.domain.Vars;
4+
import com.google.common.base.Strings;
55
import groovy.lang.Binding;
66
import groovy.lang.GroovyRuntimeException;
77
import groovy.lang.GroovyShell;
@@ -11,7 +11,11 @@
1111
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
1212
import org.springframework.stereotype.Component;
1313

14-
import java.util.concurrent.*;
14+
import javax.annotation.Nullable;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.Future;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.TimeoutException;
1519

1620
@Log4j2
1721
@Component
@@ -23,23 +27,29 @@ public class ConditionManagerImpl implements ConditionManager {
2327
private ThreadPoolTaskExecutor jobConditionExecutor;
2428

2529
@Override
26-
public boolean run(CmdIn in) throws ScriptException {
27-
if (!(in instanceof ShellIn)) {
28-
return true;
30+
public void verify(@Nullable String condition) throws ScriptException {
31+
try {
32+
new GroovyShell().parse(condition);
33+
} catch (Exception e) {
34+
throw new ScriptException("Invalid groovy condition: " + e.getMessage());
2935
}
36+
}
3037

31-
ShellIn shell = (ShellIn) in;
32-
if (!shell.hasCondition()) {
38+
@Override
39+
public boolean run(String groovyScript, Vars<String> envs) throws ScriptException {
40+
if (Strings.isNullOrEmpty(groovyScript)) {
3341
return true;
3442
}
3543

3644
Future<Boolean> submit = jobConditionExecutor.submit(() -> {
3745
Binding binding = new Binding();
38-
shell.getInputs().forEach(binding::setVariable);
46+
if (envs != null) {
47+
envs.forEach(binding::setVariable);
48+
}
3949

4050
try {
4151
GroovyShell groovy = new GroovyShell(binding);
42-
Object value = groovy.evaluate(shell.getCondition());
52+
Object value = groovy.evaluate(groovyScript);
4353
if (value instanceof Boolean) {
4454
return (Boolean) value;
4555
}

core/src/main/java/com/flowci/core/common/service/SettingServiceImpl.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,21 @@ public class SettingServiceImpl implements SettingService {
3535
public void setDefaultValue() {
3636
taskManager.run("init-default-settings", () -> {
3737
Optional<Settings> optional = settingsDao.findById(Settings.DefaultId);
38-
if (!optional.isPresent()) {
3938

40-
String address = serverProperties.getAddress().toString().replace("/", "");
41-
String serverUrl = environment.getProperty(
42-
Variables.App.ServerUrl,
43-
String.format("http://%s:%s", address, serverProperties.getPort())
44-
);
39+
String address = serverProperties.getAddress().toString().replace("/", "");
40+
String serverUrl = environment.getProperty(
41+
Variables.App.ServerUrl,
42+
String.format("http://%s:%s", address, serverProperties.getPort())
43+
);
4544

46-
Settings s = new Settings();
47-
s.setServerUrl(serverUrl);
48-
settingsDao.save(s);
45+
Settings s = new Settings();
46+
47+
if (optional.isPresent()) {
48+
s = optional.get();
4949
}
50+
51+
s.setServerUrl(serverUrl);
52+
settingsDao.save(s);
5053
});
5154
}
5255

core/src/main/java/com/flowci/core/flow/service/FlowServiceImpl.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,41 @@
1717
package com.flowci.core.flow.service;
1818

1919
import com.flowci.core.common.domain.Variables;
20+
import com.flowci.core.common.manager.ConditionManager;
2021
import com.flowci.core.common.manager.SessionManager;
2122
import com.flowci.core.common.manager.SpringEventManager;
2223
import com.flowci.core.flow.dao.FlowDao;
2324
import com.flowci.core.flow.dao.FlowUserDao;
25+
import com.flowci.core.flow.dao.YmlDao;
2426
import com.flowci.core.flow.domain.ConfirmOption;
2527
import com.flowci.core.flow.domain.Flow;
2628
import com.flowci.core.flow.domain.Flow.Status;
29+
import com.flowci.core.flow.domain.WebhookStatus;
30+
import com.flowci.core.flow.domain.Yml;
2731
import com.flowci.core.flow.event.FlowCreatedEvent;
2832
import com.flowci.core.flow.event.FlowDeletedEvent;
2933
import com.flowci.core.flow.event.FlowInitEvent;
34+
import com.flowci.core.job.domain.Job;
35+
import com.flowci.core.job.event.CreateNewJobEvent;
3036
import com.flowci.core.secret.domain.Secret;
3137
import com.flowci.core.secret.event.CreateAuthEvent;
3238
import com.flowci.core.secret.event.CreateRsaEvent;
3339
import com.flowci.core.secret.event.GetSecretEvent;
40+
import com.flowci.core.trigger.domain.GitPingTrigger;
41+
import com.flowci.core.trigger.domain.GitTrigger;
42+
import com.flowci.core.trigger.event.GitHookEvent;
3443
import com.flowci.core.user.event.UserDeletedEvent;
3544
import com.flowci.domain.*;
3645
import com.flowci.exception.ArgumentException;
3746
import com.flowci.exception.DuplicateException;
3847
import com.flowci.exception.NotFoundException;
3948
import com.flowci.exception.StatusException;
4049
import com.flowci.store.FileManager;
50+
import com.flowci.tree.FlowNode;
51+
import com.flowci.tree.YmlParser;
4152
import com.flowci.util.ObjectsHelper;
4253
import com.google.common.collect.Sets;
54+
import groovy.util.ScriptException;
4355
import lombok.extern.log4j.Log4j2;
4456
import org.springframework.beans.factory.annotation.Autowired;
4557
import org.springframework.context.event.ContextRefreshedEvent;
@@ -60,6 +72,9 @@ public class FlowServiceImpl implements FlowService {
6072
@Autowired
6173
private FlowDao flowDao;
6274

75+
@Autowired
76+
private YmlDao ymlDao;
77+
6378
@Autowired
6479
private FlowUserDao flowUserDao;
6580

@@ -72,6 +87,9 @@ public class FlowServiceImpl implements FlowService {
7287
@Autowired
7388
private FileManager fileManager;
7489

90+
@Autowired
91+
private ConditionManager conditionManager;
92+
7593
@Autowired
7694
private YmlService ymlService;
7795

@@ -293,10 +311,64 @@ public void deleteUserFromFlow(UserDeletedEvent event) {
293311
// TODO:
294312
}
295313

314+
@EventListener
315+
public void onGitHookEvent(GitHookEvent event) {
316+
GitTrigger trigger = event.getTrigger();
317+
Flow flow = flowDao.findByName(event.getFlow());
318+
319+
if (event.isPingEvent()) {
320+
GitPingTrigger ping = (GitPingTrigger) trigger;
321+
322+
WebhookStatus ws = new WebhookStatus();
323+
ws.setAdded(true);
324+
ws.setCreatedAt(ping.getCreatedAt());
325+
ws.setEvents(ping.getEvents());
326+
327+
flow.setWebhookStatus(ws);
328+
flowDao.save(flow);
329+
return;
330+
}
331+
332+
if (trigger.isSkip()) {
333+
log.info("Ignore git trigger {} since skip message", trigger);
334+
return;
335+
}
336+
337+
Optional<Yml> optional = ymlDao.findById(flow.getId());
338+
if (!optional.isPresent()) {
339+
log.warn("No available yml for flow {}", flow.getName());
340+
return;
341+
}
342+
343+
Yml yml = optional.get();
344+
FlowNode root = YmlParser.load(flow.getName(), yml.getRaw());
345+
if (!canStartJob(root, trigger)) {
346+
log.debug("Cannot start job, condition not match: {}", root.getCondition());
347+
return;
348+
}
349+
350+
StringVars gitInput = trigger.toVariableMap();
351+
Job.Trigger jobTrigger = trigger.toJobTrigger();
352+
353+
eventManager.publish(new CreateNewJobEvent(this, flow, yml.getRaw(), jobTrigger, gitInput));
354+
}
355+
356+
296357
// ====================================================================
297358
// %% Utils
298359
// ====================================================================
299360

361+
private boolean canStartJob(FlowNode root, GitTrigger trigger) {
362+
try {
363+
String groovy = root.getCondition();
364+
Vars<String> envs = trigger.toVariableMap();
365+
return conditionManager.run(groovy, envs);
366+
} catch (ScriptException e) {
367+
log.warn("Illegal groovy script at condition section", e);
368+
return false;
369+
}
370+
}
371+
300372
private void setupDefaultVars(Flow flow) {
301373
Vars<VarValue> localVars = flow.getLocally();
302374
localVars.put(Variables.Flow.Name, VarValue.of(flow.getName(), VarType.STRING, false));

0 commit comments

Comments
 (0)