Skip to content

Commit 28a79eb

Browse files
author
yang.guo
authored
Merge pull request #165 from FlowCI/feature/api/agent_dup_error
Feature/api/agent dup error
2 parents c0f84d7 + e7301b0 commit 28a79eb

File tree

3 files changed

+74
-9
lines changed

3 files changed

+74
-9
lines changed

platform-agent/src/main/java/com/flow/platform/agent/AgentManager.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.flow.platform.domain.Cmd;
2020
import com.flow.platform.domain.Jsonable;
2121
import com.flow.platform.util.Logger;
22-
import com.flow.platform.util.zk.*;
23-
22+
import com.flow.platform.util.zk.ZKClient;
23+
import com.flow.platform.util.zk.ZkException;
2424
import java.io.IOException;
2525
import java.util.LinkedList;
2626
import java.util.List;
@@ -48,6 +48,9 @@ public class AgentManager implements Runnable, TreeCacheListener, AutoCloseable
4848
private int zkTimeout;
4949
private ZKClient zkClient;
5050

51+
// node delete or not, default true
52+
private Boolean canDeleted = true;
53+
5154
private String zone; // agent running zone
5255
private String name; // agent name, can be machine name
5356

@@ -92,6 +95,10 @@ public void stop() {
9295
public void run() {
9396
// init zookeeper
9497
zkClient.start();
98+
99+
// if node is exists, exit
100+
checkNodePathExistAndExit();
101+
95102
registerZkNodeAndWatch();
96103

97104
synchronized (STATUS_LOCKER) {
@@ -103,6 +110,21 @@ public void run() {
103110
}
104111
}
105112

113+
/**
114+
* if node exist , exit
115+
*/
116+
private void checkNodePathExistAndExit() {
117+
if (this.zkClient.exist(this.nodePath)) {
118+
exit();
119+
}
120+
}
121+
122+
private void exit(){
123+
this.canDeleted = false;
124+
LOGGER.info("One Agent is running in other place. Please first to stop another agent, thx!");
125+
Runtime.getRuntime().exit(1);
126+
}
127+
106128
@Override
107129
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
108130
ChildData eventData = event.getData();
@@ -143,7 +165,11 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc
143165

144166
@Override
145167
public void close() throws IOException {
146-
removeZkNode();
168+
// only this node can delete
169+
if (this.canDeleted) {
170+
removeZkNode();
171+
}
172+
147173
stop();
148174
}
149175

@@ -194,8 +220,13 @@ private void onDataChanged(String path) {
194220
* @return path of zookeeper or null if failure
195221
*/
196222
private String registerZkNodeAndWatch() {
197-
String path = zkClient.createEphemeral(nodePath, null);
198-
zkClient.watchTree(path, this);
223+
String path = null;
224+
try {
225+
path = zkClient.createEphemeral(nodePath);
226+
zkClient.watchTree(path, this);
227+
} catch (ZkException e) {
228+
exit();
229+
}
199230
return path;
200231
}
201232

platform-agent/src/test/java/com/flow/platform/agent/test/AgentManagerTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import com.flow.platform.domain.Cmd;
2222
import com.flow.platform.domain.CmdType;
2323
import com.flow.platform.util.zk.ZKClient;
24+
import com.flow.platform.util.zk.ZkException;
25+
import java.util.concurrent.ArrayBlockingQueue;
26+
import java.util.concurrent.ThreadPoolExecutor;
27+
import java.util.concurrent.TimeUnit;
2428
import org.apache.curator.test.TestingServer;
2529
import org.apache.curator.utils.ZKPaths;
2630
import org.junit.After;
@@ -32,10 +36,6 @@
3236
import org.junit.Test;
3337
import org.junit.runners.MethodSorters;
3438

35-
import static org.junit.Assert.assertEquals;
36-
import static org.junit.Assert.assertNotNull;
37-
import static org.junit.Assert.assertNull;
38-
3939
/**
4040
4141
*/
@@ -82,6 +82,27 @@ public void should_agent_registered() throws Throwable {
8282
agent.stop();
8383
}
8484

85+
@Test
86+
public void should_zookeeper_create_node_atom() {
87+
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS,
88+
new ArrayBlockingQueue<>(5));
89+
final int[] size = {0};
90+
String agentNodePath = ZKPaths.makePath(ZK_ROOT, "flow");
91+
zkClient.delete(agentNodePath, false);
92+
for (int i = 0; i < 5; i++) {
93+
threadPoolExecutor.execute(() -> {
94+
try {
95+
zkClient.createEphemeral(agentNodePath);
96+
size[0] = size[0] + 1;
97+
} catch (ZkException e) {
98+
System.out.println(e);
99+
}
100+
});
101+
}
102+
103+
Assert.assertEquals(1, size[0]);
104+
}
105+
85106
@Test
86107
public void should_receive_command() throws Throwable {
87108
AgentManager agent = new AgentManager(server.getConnectString(), 20000, ZONE, MACHINE);

platform-util-zk/src/main/java/com/flow/platform/util/zk/ZKClient.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,19 @@ public String createEphemeral(String path, byte[] data) {
149149
return path;
150150
}
151151

152+
return createEphemeralPrivate(path, data);
153+
}
154+
155+
/**
156+
* if not create node, will throw exception
157+
* @param path
158+
* @return
159+
*/
160+
public String createEphemeral(String path) {
161+
return createEphemeralPrivate(path, null);
162+
}
163+
164+
private String createEphemeralPrivate(String path, byte[] data) {
152165
try {
153166
return client.create()
154167
.withMode(CreateMode.EPHEMERAL)

0 commit comments

Comments
 (0)