Skip to content

Commit c0b4f7c

Browse files
author
yang.guo
authored
Merge pull request #176 from FlowCI/develop
Develop
2 parents d3218cd + 5e4cf40 commit c0b4f7c

File tree

59 files changed

+795
-557
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+795
-557
lines changed

config/app-api.properties

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ system.email = [email protected]
2929
system.username = admin
3030
system.password = 123456
3131

32-
task.job.toggle.execution_timeout = false
33-
## 6s expire job
34-
task.job.toggle.execution_create_session_duration = 600
35-
## 1h expire job
32+
task.job.toggle.execution_timeout = true
33+
## expired in 1800 seconds for create session
34+
task.job.toggle.execution_create_session_duration = 1800
35+
## expired in 3600 seconds for job running
3636
task.job.toggle.execution_running_duration = 3600

platform-agent/src/main/java/com/flow/platform/agent/AgentManager.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.flow.platform.domain.Cmd;
2020
import com.flow.platform.domain.Jsonable;
2121
import com.flow.platform.util.Logger;
22-
import com.flow.platform.util.zk.*;
23-
22+
import com.flow.platform.util.zk.ZKClient;
23+
import com.flow.platform.util.zk.ZkException;
2424
import java.io.IOException;
2525
import java.util.LinkedList;
2626
import java.util.List;
@@ -48,6 +48,9 @@ public class AgentManager implements Runnable, TreeCacheListener, AutoCloseable
4848
private int zkTimeout;
4949
private ZKClient zkClient;
5050

51+
// node delete or not, default true
52+
private Boolean canDeleted = true;
53+
5154
private String zone; // agent running zone
5255
private String name; // agent name, can be machine name
5356

@@ -92,6 +95,10 @@ public void stop() {
9295
public void run() {
9396
// init zookeeper
9497
zkClient.start();
98+
99+
// if node is exists, exit
100+
checkNodePathExistAndExit();
101+
95102
registerZkNodeAndWatch();
96103

97104
synchronized (STATUS_LOCKER) {
@@ -103,6 +110,21 @@ public void run() {
103110
}
104111
}
105112

113+
/**
114+
* if node exist , exit
115+
*/
116+
private void checkNodePathExistAndExit() {
117+
if (this.zkClient.exist(this.nodePath)) {
118+
exit();
119+
}
120+
}
121+
122+
private void exit(){
123+
this.canDeleted = false;
124+
LOGGER.info("One Agent is running in other place. Please first to stop another agent, thx!");
125+
Runtime.getRuntime().exit(1);
126+
}
127+
106128
@Override
107129
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
108130
ChildData eventData = event.getData();
@@ -143,7 +165,11 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc
143165

144166
@Override
145167
public void close() throws IOException {
146-
removeZkNode();
168+
// only this node can delete
169+
if (this.canDeleted) {
170+
removeZkNode();
171+
}
172+
147173
stop();
148174
}
149175

@@ -194,8 +220,13 @@ private void onDataChanged(String path) {
194220
* @return path of zookeeper or null if failure
195221
*/
196222
private String registerZkNodeAndWatch() {
197-
String path = zkClient.createEphemeral(nodePath, null);
198-
zkClient.watchTree(path, this);
223+
String path = null;
224+
try {
225+
path = zkClient.createEphemeral(nodePath);
226+
zkClient.watchTree(path, this);
227+
} catch (ZkException e) {
228+
exit();
229+
}
199230
return path;
200231
}
201232

platform-agent/src/main/java/com/flow/platform/agent/CmdManager.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,27 +173,23 @@ public void run() {
173173
ProcEventHandler procEventHandler =
174174
new ProcEventHandler(getCmd(), extraProcEventListeners, running, finished);
175175

176-
CmdExecutor executor;
177176
try {
178-
executor = new CmdExecutor(
177+
CmdExecutor executor = new CmdExecutor(
179178
procEventHandler,
180179
logListener,
181180
cmd.getInputs(),
182181
cmd.getWorkingDir(),
183182
cmd.getOutputEnvFilter(),
184183
cmd.getTimeout(),
185184
Lists.newArrayList(getCmd().getCmd()));
185+
186+
executor.run();
186187
} catch (Throwable e) {
187188
LOGGER.errorMarker("execute", "Cannot init CmdExecutor for cmd " + cmd, e);
188-
189189
CmdResult result = new CmdResult();
190190
result.getExceptions().add(e);
191191
procEventHandler.onException(result);
192-
193-
return;
194192
}
195-
196-
executor.run();
197193
}
198194
});
199195

@@ -262,6 +258,7 @@ public synchronized void kill() {
262258
for (Map.Entry<Cmd, CmdResult> entry : running.entrySet()) {
263259
CmdResult r = entry.getValue();
264260
Cmd cmd = entry.getKey();
261+
finished.put(cmd, r);
265262

266263
r.getProcess().destroy();
267264

@@ -270,12 +267,9 @@ public synchronized void kill() {
270267
}
271268

272269
try {
273-
if (!cmdExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
274-
cmdExecutor.shutdownNow();
275-
LOGGER.warn("Force to terminate CmdExecutor since been waiting 10 seconds");
276-
}
277-
} catch (Throwable e) {
278-
LOGGER.error("Exception while waiting for all cmd thread finish", e);
270+
cmdExecutor.shutdownNow();
271+
} catch (Throwable ignore) {
272+
279273
} finally {
280274
cmdExecutor = createExecutor(); // reset cmd executor
281275
LOGGER.trace("Cmd thread terminated");

platform-agent/src/test/java/com/flow/platform/agent/test/AgentManagerTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,15 @@
2121
import com.flow.platform.domain.Cmd;
2222
import com.flow.platform.domain.CmdType;
2323
import com.flow.platform.util.zk.ZKClient;
24+
import com.flow.platform.util.zk.ZkException;
25+
import java.util.concurrent.ArrayBlockingQueue;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
2430
import org.apache.curator.test.TestingServer;
2531
import org.apache.curator.utils.ZKPaths;
32+
import org.apache.zookeeper.ZKUtil;
2633
import org.junit.After;
2734
import org.junit.AfterClass;
2835
import org.junit.Assert;
@@ -32,10 +39,6 @@
3239
import org.junit.Test;
3340
import org.junit.runners.MethodSorters;
3441

35-
import static org.junit.Assert.assertEquals;
36-
import static org.junit.Assert.assertNotNull;
37-
import static org.junit.Assert.assertNull;
38-
3942
/**
4043
4144
*/
@@ -102,6 +105,7 @@ public void should_receive_command() throws Throwable {
102105

103106
@After
104107
public void after() throws Throwable {
108+
zkClient.delete(ZKPaths.makePath(ZK_ROOT, ZONE, MACHINE), true);
105109
zkClient.close();
106110
}
107111

platform-api/src/main/java/com/flow/platform/api/config/AppConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.flow.platform.core.config.DatabaseConfig;
2424
import com.flow.platform.core.queue.InMemoryQueue;
2525
import com.flow.platform.core.queue.PlatformQueue;
26+
import com.flow.platform.core.queue.PriorityMessage;
2627
import com.flow.platform.core.util.ThreadUtil;
2728
import com.flow.platform.util.Logger;
2829
import java.io.IOException;
@@ -34,15 +35,12 @@
3435
import org.apache.velocity.app.VelocityEngine;
3536
import org.apache.velocity.runtime.RuntimeConstants;
3637
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
37-
import org.springframework.beans.factory.annotation.Autowired;
3838
import org.springframework.beans.factory.annotation.Value;
3939
import org.springframework.context.annotation.Bean;
4040
import org.springframework.context.annotation.Configuration;
4141
import org.springframework.context.annotation.Import;
4242
import org.springframework.context.event.ApplicationEventMulticaster;
4343
import org.springframework.context.event.SimpleApplicationEventMulticaster;
44-
import org.springframework.scheduling.TaskScheduler;
45-
import org.springframework.scheduling.annotation.EnableScheduling;
4644
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4745

4846
/**
@@ -136,7 +134,7 @@ public ThreadPoolTaskExecutor taskExecutor() {
136134
* Queue to process cmd callback task
137135
*/
138136
@Bean
139-
public PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue() {
137+
public PlatformQueue<PriorityMessage> cmdCallbackQueue() {
140138
return new InMemoryQueue<>(executor, 50, "CmdCallbackQueue");
141139
}
142140

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818

1919
import com.flow.platform.api.domain.CmdCallbackQueueItem;
2020
import com.flow.platform.api.service.job.JobService;
21+
import com.flow.platform.core.exception.FlowException;
2122
import com.flow.platform.core.exception.NotFoundException;
2223
import com.flow.platform.core.queue.PlatformQueue;
24+
import com.flow.platform.core.queue.PriorityMessage;
2325
import com.flow.platform.core.queue.QueueListener;
26+
import com.flow.platform.core.util.ThreadUtil;
2427
import com.flow.platform.util.Logger;
2528
import javax.annotation.PostConstruct;
2629
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,12 +33,15 @@
3033
* @author yh@firim
3134
*/
3235
@Component
33-
public class CmdCallbackQueueConsumer implements QueueListener<CmdCallbackQueueItem> {
36+
public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage> {
3437

3538
private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);
3639

40+
// requeue 1 s
41+
private final static int REQUEUE_DELAY_TIME = 1000;
42+
3743
@Autowired
38-
private PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue;
44+
private PlatformQueue<PriorityMessage> cmdCallbackQueue;
3945

4046
@Autowired
4147
private JobService jobService;
@@ -46,28 +52,45 @@ public void init() {
4652
}
4753

4854
@Override
49-
public void onQueueItem(CmdCallbackQueueItem item) {
50-
if (item == null) {
55+
public void onQueueItem(PriorityMessage message) {
56+
if (message == null) {
5157
return;
5258
}
59+
60+
CmdCallbackQueueItem item = CmdCallbackQueueItem.parse(message.getBody(), CmdCallbackQueueItem.class);
61+
5362
try {
5463
jobService.callback(item);
5564
} catch (NotFoundException notFoundException) {
5665

66+
// detect retry times is reach the limit or not
67+
detectRetryTimes(item);
68+
5769
// re-enqueue cmd callback if job not found since transaction problem
58-
reEnqueueJobCallback(item, 1000);
70+
reEnqueueJobCallback(item, REQUEUE_DELAY_TIME, message.getMessageProperties().getPriority());
5971

6072
} catch (Throwable throwable) {
6173
LOGGER.traceMarker("onQueueItem", String.format("exception - %s", throwable));
6274
}
6375
}
6476

65-
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait) {
66-
try {
67-
Thread.sleep(wait);
68-
} catch (Throwable ignore) {
77+
private void detectRetryTimes(CmdCallbackQueueItem item) {
78+
if (item.getRetryTimes() <= 0) {
79+
throw new FlowException("retry times has reach the limit");
6980
}
81+
}
82+
83+
private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait, int priority) {
84+
85+
// sleep seconds
86+
ThreadUtil.sleep(wait);
87+
88+
// set retry times
89+
item.setRetryTimes(item.getRetryTimes() - 1);
90+
91+
//priority inc 1
92+
priority = priority + 1;
7093

71-
jobService.enterQueue(item);
94+
jobService.enterQueue(item, priority);
7295
}
7396
}

platform-api/src/main/java/com/flow/platform/api/controller/CmdWebhookController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void execute(@RequestBody Cmd cmd, @RequestParam String identifier) {
6464
cmd.getStatus(),
6565
cmd.getId(),
6666
cmd.getCmdResult());
67-
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd));
67+
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd), 1);
6868
} catch (NumberFormatException warn) {
6969
LOGGER.warn("Invalid job id format");
7070
}

platform-api/src/main/java/com/flow/platform/api/dao/job/NodeResultDaoImpl.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import javax.persistence.criteria.Predicate;
2929
import javax.persistence.criteria.Root;
3030
import org.hibernate.Session;
31-
import org.hibernate.query.Query;
3231
import org.springframework.stereotype.Repository;
3332

3433
/**
@@ -100,11 +99,8 @@ public int update(BigInteger jobId, NodeStatus target) {
10099

101100
@Override
102101
public void delete(List<BigInteger> jobIds) {
103-
execute((Session session) -> {
104-
String delete = String.format("delete from NodeResult where job_id in (:list)");
105-
Query query = session.createQuery(delete);
106-
query.setParameterList("list", jobIds);
107-
return true;
108-
});
102+
execute((Session session) -> session.createQuery("delete from NodeResult where key.jobId in ( :jobIds )")
103+
.setParameterList("jobIds", jobIds)
104+
.executeUpdate());
109105
}
110106
}

platform-api/src/main/java/com/flow/platform/api/domain/CmdCallbackQueueItem.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,33 @@
1717
package com.flow.platform.api.domain;
1818

1919
import com.flow.platform.domain.Cmd;
20+
import com.flow.platform.domain.Jsonable;
2021
import java.math.BigInteger;
2122

2223
/**
2324
* @author yh@firim
2425
*/
25-
public class CmdCallbackQueueItem {
26+
public class CmdCallbackQueueItem extends Jsonable {
2627

2728
private final BigInteger jobId;
2829

2930
private final String path; // node path
3031

3132
private final Cmd cmd;
3233

33-
private Integer retryTimes = 0;
34+
// default retry times 5
35+
private Integer retryTimes = 5;
3436

3537
public CmdCallbackQueueItem(BigInteger jobId, Cmd cmd) {
3638
this.jobId = jobId;
3739
this.cmd = cmd;
3840
this.path = cmd.getExtra();
3941
}
4042

43+
public void setRetryTimes(Integer retryTimes) {
44+
this.retryTimes = retryTimes;
45+
}
46+
4147
public BigInteger getJobId() {
4248
return jobId;
4349
}

0 commit comments

Comments
 (0)