Skip to content

Commit 0f6f368

Browse files
author
yang.guo
authored
Merge pull request #125 from FlowCI/develop
Develop
2 parents 25b9012 + 5923d3e commit 0f6f368

File tree

25 files changed

+290
-50
lines changed

25 files changed

+290
-50
lines changed

platform-api/src/main/java/com/flow/platform/api/config/AppConfig.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.springframework.context.annotation.Bean;
3939
import org.springframework.context.annotation.Configuration;
4040
import org.springframework.context.annotation.Import;
41+
import org.springframework.context.event.ApplicationEventMulticaster;
42+
import org.springframework.context.event.SimpleApplicationEventMulticaster;
4143
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4244

4345
/**
@@ -49,7 +51,7 @@ public class AppConfig extends AppConfigBase {
4951

5052
public final static String NAME = "API";
5153

52-
public final static String VERSION = "alpha-0.1";
54+
public final static String VERSION = "0.1.0";
5355

5456
public final static String DEFAULT_YML_FILE = ".flow.yml";
5557

@@ -61,13 +63,22 @@ public class AppConfig extends AppConfigBase {
6163

6264
private final static String THREAD_NAME_PREFIX = "async-task-";
6365

66+
private final static String MULTICASTER_THREAD_NAME_PREFIX = "multi_async-task-";
67+
68+
private final static int MULTICASTER_ASYNC_POOL_SIZE = 1;
69+
6470
public final static String DEFAULT_USER_EMAIL = "[email protected]";
6571
public final static String DEFAULT_USER_NAME = "admin";
72+
6673
public final static String DEFAULT_USER_PASSWORD = "123456";
6774

6875
private final static ThreadPoolTaskExecutor executor =
6976
ThreadUtil.createTaskExecutor(ASYNC_POOL_SIZE, ASYNC_POOL_SIZE / 10, 100, THREAD_NAME_PREFIX);
7077

78+
private final static ThreadPoolTaskExecutor multicasterExecutor =
79+
ThreadUtil.createTaskExecutor(MULTICASTER_ASYNC_POOL_SIZE, MULTICASTER_ASYNC_POOL_SIZE, 1000,
80+
MULTICASTER_THREAD_NAME_PREFIX);
81+
7182
@Value("${api.workspace}")
7283
private String workspace;
7384

@@ -85,6 +96,14 @@ public Path workspace() {
8596
}
8697
}
8798

99+
@Bean(name = "applicationEventMulticaster")
100+
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
101+
multicasterExecutor.initialize();
102+
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
103+
eventMulticaster.setTaskExecutor(multicasterExecutor);
104+
return eventMulticaster;
105+
}
106+
88107
@Bean
89108
public ThreadLocal<User> currentUser() {
90109
return new ThreadLocal<>();

platform-api/src/main/java/com/flow/platform/api/config/WebConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void addCorsMappings(CorsRegistry registry) {
8383
.allowedOrigins("*")
8484
.allowedMethods("*")
8585
.allowCredentials(true)
86-
.allowedHeaders("origin", "content-type", "accept", "x-requested-with", "X-Authorization", "authenticate", "library");
86+
.allowedHeaders("origin", "content-type", "accept", "x-requested-with", "X-Authorization", "authenticate",
87+
"library");
8788
}
8889

8990
@Bean

platform-api/src/main/java/com/flow/platform/api/consumer/CmdLoggingConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected void handleTextMessage(WebSocketSession session, TextMessage message)
8484
*/
8585
private void sendCmdLog(String cmdId, String content, String number) {
8686
String event = String.format("/topic/cmd/%s", cmdId);
87-
template.convertAndSend(event, "{\"number\": \"" + number + "\", \"content\": \"" + content + "\"}");
87+
template.convertAndSend(event, "{\"number\": " + number + ", \"content\": \"" + content + "\"}");
8888
}
8989

9090
/**

platform-api/src/main/java/com/flow/platform/api/consumer/JobEventPushHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.flow.platform.api.domain.job.Job;
2121
import com.flow.platform.api.push.PushHandler;
2222
import com.flow.platform.api.service.job.JobService;
23+
import com.flow.platform.core.exception.NotFoundException;
2324
import java.math.BigInteger;
2425
import org.springframework.beans.factory.annotation.Autowired;
2526

@@ -37,4 +38,15 @@ protected void push(BigInteger jobId) {
3738
super.push(jobTopic, job);
3839
}
3940

41+
//TODO: because transaction not found job, add READ_UNCOMMITTED also no use
42+
protected void push(Job job) {
43+
Job jobConsist;
44+
try {
45+
jobConsist = jobService.find(job.getId());
46+
} catch (NotFoundException e) {
47+
jobConsist = job;
48+
}
49+
String jobTopic = String.format("%s/%s", WebSocketConfig.TOPIC_FOR_JOB, jobConsist.getNodePath());
50+
super.push(jobTopic, jobConsist);
51+
}
4052
}

platform-api/src/main/java/com/flow/platform/api/consumer/JobStatusEventConsumer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
import com.flow.platform.api.domain.job.JobStatus;
2121
import com.flow.platform.api.events.JobStatusChangeEvent;
2222
import com.flow.platform.api.service.MessageService;
23-
import com.flow.platform.util.ExceptionUtil;
2423
import com.flow.platform.util.Logger;
25-
import java.math.BigInteger;
2624
import org.springframework.beans.factory.annotation.Autowired;
2725
import org.springframework.context.ApplicationListener;
2826
import org.springframework.core.task.TaskExecutor;
@@ -44,9 +42,10 @@ public class JobStatusEventConsumer extends JobEventPushHandler implements Appli
4442

4543
@Override
4644
public void onApplicationEvent(JobStatusChangeEvent event) {
47-
LOGGER.debug("Job %s status change event from %s to %s", event.getJob().getId(), event.getFrom(), event.getTo());
45+
LOGGER
46+
.debug("Job %s status change event from %s to %s", event.getJob().getId(), event.getFrom(), event.getTo());
4847

49-
push(event.getJob().getId());
48+
push(event.getJob());
5049

5150
// async send message TODO:// only send failure message
5251
if (Job.FAILURE_STATUS.contains(event.getTo())) {

platform-api/src/main/java/com/flow/platform/api/consumer/NodeStatusEventConsumer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public void onApplicationEvent(NodeStatusChangeEvent event) {
3333
NodeResultKey resultKey = event.getResultKey();
3434
LOGGER.debug("Node result %s status change event from %s to %s",
3535
resultKey.getPath(), event.getFrom(), event.getTo());
36-
3736
push(resultKey.getJobId());
3837
}
3938
}

platform-api/src/main/java/com/flow/platform/api/service/AgentServiceImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.flow.platform.api.service.job.JobService;
2727
import com.flow.platform.api.util.PlatformURL;
2828
import com.flow.platform.core.exception.HttpException;
29+
import com.flow.platform.core.exception.IllegalParameterException;
2930
import com.flow.platform.core.exception.IllegalStatusException;
3031
import com.flow.platform.core.service.ApplicationEventService;
3132
import com.flow.platform.domain.Agent;
@@ -38,6 +39,7 @@
3839
import com.flow.platform.domain.Jsonable;
3940
import com.flow.platform.util.CollectionUtil;
4041
import com.flow.platform.util.Logger;
42+
import com.flow.platform.util.StringUtil;
4143
import com.flow.platform.util.http.HttpClient;
4244
import com.flow.platform.util.http.HttpResponse;
4345
import com.flow.platform.util.http.HttpURL;
@@ -144,6 +146,10 @@ public Boolean shutdown(String zone, String name, String password) {
144146

145147
@Override
146148
public AgentWithFlow create(AgentPath agentPath) {
149+
if (StringUtil.hasSpace(agentPath.getZone()) || StringUtil.hasSpace(agentPath.getName())) {
150+
throw new IllegalParameterException("Zone name or agent name cannot contain empty space");
151+
}
152+
147153
try {
148154
AgentPathWithWebhook pathWithWebhook = new AgentPathWithWebhook(agentPath, buildAgentWebhook());
149155

@@ -158,9 +164,7 @@ public AgentWithFlow create(AgentPath agentPath) {
158164
}
159165

160166
Agent agent = Agent.parse(response.getBody(), Agent.class);
161-
162-
AgentWithFlow agentWithFlow = new AgentWithFlow(agent, null);
163-
return agentWithFlow;
167+
return new AgentWithFlow(agent, null);
164168

165169
} catch (UnsupportedEncodingException | JsonSyntaxException e) {
166170
throw new IllegalStatusException("Unable to create agent", e);

platform-api/src/main/java/com/flow/platform/api/service/job/JobNodeServiceImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,12 @@
2525
import org.springframework.cache.Cache;
2626
import org.springframework.cache.CacheManager;
2727
import org.springframework.stereotype.Service;
28-
import org.springframework.transaction.annotation.Transactional;
2928

3029
/**
3130
* @author lhl
3231
*/
3332

3433
@Service
35-
@Transactional
3634
public class JobNodeServiceImpl implements JobNodeService {
3735

3836
private final Logger LOGGER = new Logger(JobYml.class);

platform-api/src/main/java/com/flow/platform/api/service/job/JobSearchServiceImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public List<Job> match(SearchCondition searchCondition, List<Job> jobs) {
8282
for (Job job : jobs) {
8383
if (job.getNumber().toString().equals(words)) { // compare job number
8484
copyJobs.add(job);
85-
} else if (words.equals(
85+
} else if (job.getRootResult() != null && words.equals(
8686
job.getRootResult().getOutputs().get(GitEnvs.FLOW_GIT_BRANCH.toString()))) { //compare branch
8787
copyJobs.add(job);
8888
}
@@ -105,7 +105,8 @@ public List<Job> match(SearchCondition searchCondition, List<Job> jobs) {
105105

106106
List<Job> copyJobs = new LinkedList<>();
107107
for (Job job : jobs) {
108-
if (branch.equals(job.getRootResult().getOutputs().get(GitEnvs.FLOW_GIT_BRANCH.toString()))) {
108+
if (job.getRootResult() != null && branch
109+
.equals(job.getRootResult().getOutputs().get(GitEnvs.FLOW_GIT_BRANCH.toString()))) {
109110
copyJobs.add(job);
110111
}
111112
}

platform-api/src/main/java/com/flow/platform/api/service/job/JobServiceImpl.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,16 @@
7474
import org.springframework.beans.factory.annotation.Value;
7575
import org.springframework.scheduling.annotation.Scheduled;
7676
import org.springframework.stereotype.Service;
77-
import org.springframework.transaction.annotation.Isolation;
78-
import org.springframework.transaction.annotation.Propagation;
7977
import org.springframework.transaction.annotation.Transactional;
8078

8179
/**
8280
* @author yh@firim
8381
*/
84-
@Service(value = "jobService")
85-
@Transactional(isolation = Isolation.REPEATABLE_READ)
82+
@Service
8683
public class JobServiceImpl extends ApplicationEventService implements JobService {
8784

8885
private static Logger LOGGER = new Logger(JobService.class);
8986

90-
private Integer RETRY_TIMEs = 5;
91-
9287
private final Integer createSessionRetryTimes = 5;
9388

9489
@Value("${task.job.toggle.execution_timeout}")
@@ -268,6 +263,7 @@ public void callback(CmdCallbackQueueItem cmdQueueItem) {
268263
}
269264

270265
@Override
266+
@Transactional
271267
public void deleteJob(String path) {
272268
List<BigInteger> jobIds = jobDao.findJobIdsByPath(path);
273269
// TODO : Late optimization and paging jobIds
@@ -392,7 +388,6 @@ private void onRunShellCallback(String path, Cmd cmd, Job job) {
392388
}
393389

394390
@Override
395-
@Transactional(propagation = Propagation.NOT_SUPPORTED)
396391
public void enterQueue(CmdCallbackQueueItem cmdQueueItem) {
397392
cmdCallbackQueue.enqueue(cmdQueueItem);
398393
}

0 commit comments

Comments
 (0)