Skip to content

Commit b06c3b3

Browse files
author
yang.guo
authored
Merge pull request #100 from FlowCI/feature/agent/zookeeper_refactor
Feature/agent/zookeeper refactor
2 parents 666908e + 6526212 commit b06c3b3

File tree

33 files changed

+335
-184
lines changed

33 files changed

+335
-184
lines changed

platform-agent/src/main/java/com/flow/platform/agent/AgentManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.flow.platform.util.Logger;
2222
import com.flow.platform.util.zk.*;
2323

24-
import java.io.Closeable;
2524
import java.io.IOException;
2625
import java.util.LinkedList;
2726
import java.util.List;
@@ -41,7 +40,9 @@ public class AgentManager implements Runnable, TreeCacheListener, AutoCloseable
4140

4241
// Zk root path /flow-agents/{zone}/{name}
4342
private final static Object STATUS_LOCKER = new Object();
44-
private final static int ZK_RECONNECT_TIME = 5;
43+
44+
private final static int ZK_RECONNECT_TIME = 1;
45+
private final static int ZK_RETRY_PERIOD = 500;
4546

4647
private String zkHost;
4748
private int zkTimeout;
@@ -59,7 +60,7 @@ public AgentManager(String zkHost, int zkTimeout, String zone, String name) thro
5960
this.zkHost = zkHost;
6061
this.zkTimeout = zkTimeout;
6162

62-
this.zkClient = new ZKClient(zkHost);
63+
this.zkClient = new ZKClient(zkHost, ZK_RETRY_PERIOD, ZK_RECONNECT_TIME);
6364
this.zone = zone;
6465
this.name = name;
6566
this.zonePath = ZKPaths.makePath(Config.ZK_ROOT, this.zone);
@@ -97,7 +98,7 @@ public void run() {
9798
try {
9899
STATUS_LOCKER.wait();
99100
} catch (InterruptedException e) {
100-
e.printStackTrace();
101+
LOGGER.warn("InterrupatdException : " + e.getMessage());
101102
}
102103
}
103104
}
@@ -106,13 +107,18 @@ public void run() {
106107
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
107108
ChildData eventData = event.getData();
108109

110+
if (event.getType() == Type.CONNECTION_LOST) {
111+
LOGGER.traceMarker("ZK-Event", "========= Connection lost from zk server =========");
112+
return;
113+
}
114+
109115
if (event.getType() == Type.INITIALIZED) {
110-
LOGGER.trace("========= Connected to zookeeper server =========");
116+
LOGGER.traceMarker("ZK-Event", "========= Connected to zk server =========");
111117
return;
112118
}
113119

114120
if (event.getType() == Type.NODE_ADDED) {
115-
LOGGER.trace("========= Node been created: %s =========", eventData.getPath());
121+
LOGGER.traceMarker("ZK-Event", "========= Node been created: %s =========", eventData.getPath());
116122
return;
117123
}
118124

@@ -176,6 +182,6 @@ private String registerZkNodeAndWatch() {
176182
}
177183

178184
private void removeZkNode() {
179-
zkClient.delete(nodePath, false);
185+
zkClient.deleteWithoutGuaranteed(nodePath, false);
180186
}
181187
}

platform-agent/src/main/java/com/flow/platform/agent/App.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,26 @@ public static void main(String args[]) {
4343
token = args[1];
4444
}
4545

46-
LOGGER.trace("========= Run agent =========");
46+
LOGGER.trace("========= Flow Agent Started =========");
47+
LOGGER.trace("=== Server: " + baseUrl);
48+
LOGGER.trace("=== Token: " + token);
49+
4750
Runtime.getRuntime().addShutdownHook(new ShutdownHook());
4851

4952
try {
50-
LOGGER.trace("========= Init config =========");
53+
LOGGER.trace("=== Start to load configuration");
5154

5255
Config.AGENT_SETTINGS = Config.loadAgentConfig(baseUrl, token);
53-
LOGGER.trace(" -- Settings: %s", Config.agentSettings());
56+
LOGGER.trace("====== Settings: %s", Config.agentSettings());
5457

5558
Config.ZK_URL = Config.AGENT_SETTINGS.getZookeeperUrl();
56-
LOGGER.trace(" -- Zookeeper host: %s", Config.zkUrl());
59+
LOGGER.trace("====== Zookeeper host: %s", Config.zkUrl());
5760

5861
Config.ZONE = Config.AGENT_SETTINGS.getAgentPath().getZone();
59-
LOGGER.trace(" -- Working zone: %s", Config.zone());
62+
LOGGER.trace("====== Working zone: %s", Config.zone());
6063

6164
Config.NAME = Config.AGENT_SETTINGS.getAgentPath().getName();
62-
LOGGER.trace(" -- Agent agent: %s", Config.name());
65+
LOGGER.trace("====== Agent agent: %s", Config.name());
6366

6467
LOGGER.trace("========= Config initialized =========");
6568
} catch (Throwable e) {

platform-agent/src/main/java/com/flow/platform/agent/ReportManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public boolean cmdLogUploadSync(final String cmdId, final Path path) {
115115
.retry(5)
116116
.bodyAsString();
117117

118-
if (response.hasSuccess()) {
118+
if (!response.hasSuccess()) {
119119
LOGGER.warn("Fail to upload zipped cmd log to : %s ", url);
120120
return false;
121121
}

platform-api/src/main/java/com/flow/platform/api/controller/FlowController.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ public Node delete() {
163163
@PostMapping("/{root}/env")
164164
@WebSecurity(action = Actions.FLOW_SET_ENV)
165165
public Node addFlowEnv(@RequestBody Map<String, String> envs) {
166-
return nodeService.addFlowEnv(currentNodePath.get(), envs);
166+
Flow flow = nodeService.findFlow(currentNodePath.get());
167+
return nodeService.addFlowEnv(flow, envs);
167168
}
168169

169170
/**
@@ -192,7 +193,8 @@ public Node addFlowEnv(@RequestBody Map<String, String> envs) {
192193
@DeleteMapping("/{root}/env")
193194
@WebSecurity(action = Actions.FLOW_SET_ENV)
194195
public Node delFlowEnv(@RequestBody Set<String> envKeys) {
195-
return nodeService.delFlowEnv(currentNodePath.get(), envKeys);
196+
Flow flow = nodeService.findFlow(currentNodePath.get());
197+
return nodeService.delFlowEnv(flow, envKeys);
196198
}
197199

198200
/**

platform-api/src/main/java/com/flow/platform/api/controller/GitWebHookController.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818

1919
import com.flow.platform.api.domain.envs.FlowEnvs;
2020
import com.flow.platform.api.domain.envs.FlowEnvs.YmlStatusValue;
21+
import com.flow.platform.api.domain.node.Flow;
2122
import com.flow.platform.api.domain.user.User;
22-
import com.flow.platform.api.git.GitEventDataExtractor;
23+
import com.flow.platform.api.git.GitEventEnvConverter;
2324
import com.flow.platform.api.git.GitWebhookTriggerFinishEvent;
2425
import com.flow.platform.api.service.job.JobService;
2526
import com.flow.platform.api.service.node.NodeService;
@@ -78,10 +79,11 @@ public void onEventReceived(@RequestHeader HttpHeaders headers, HttpServletReque
7879
LOGGER.trace("Git Webhook received: %s", hookEvent.toString());
7980

8081
// reset flow yml status to not found otherwise yml cannot start to load
81-
nodeService.addFlowEnv(path, EnvUtil.build(FlowEnvs.FLOW_YML_STATUS, YmlStatusValue.NOT_FOUND));
82+
Flow flow = nodeService.findFlow(path);
83+
nodeService.addFlowEnv(flow, EnvUtil.build(FlowEnvs.FLOW_YML_STATUS, YmlStatusValue.NOT_FOUND));
8284

8385
// extract git related env variables from event, and temporary set to node for git loading
84-
final Map<String, String> gitEnvs = GitEventDataExtractor.extract(hookEvent);
86+
final Map<String, String> gitEnvs = GitEventEnvConverter.convert(hookEvent);
8587

8688
// get user email from git event
8789
final User user = new User(hookEvent.getUserEmail(), StringUtil.EMPTY, StringUtil.EMPTY);

platform-api/src/main/java/com/flow/platform/api/controller/JobController.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import com.flow.platform.util.git.model.GitEventType;
3131
import com.google.common.collect.Lists;
3232
import java.util.Collection;
33+
import java.util.Collections;
34+
import java.util.LinkedHashMap;
3335
import java.util.List;
3436
import java.util.Map;
3537
import javax.servlet.http.HttpServletResponse;
@@ -95,7 +97,7 @@ public void setLocale(@RequestParam(required = false) String locale) {
9597
@PostMapping(path = "/{root}")
9698
public void create() {
9799
String path = currentNodePath.get();
98-
jobService.createJobAndYmlLoad(path, GitEventType.MANUAL, null, currentUser.get(), null);
100+
jobService.createJobAndYmlLoad(path, GitEventType.MANUAL, new LinkedHashMap<>(), currentUser.get(), null);
99101
}
100102

101103
/**

platform-api/src/main/java/com/flow/platform/api/controller/UserController.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,14 @@ public UserListResponse list() {
8181
*
8282
* @apiSuccessExample {String} Success-Response:
8383
* HTTP/1.1 200 OK
84-
* eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbkBmaXIuaW0iLCJleHAiOjE1MDM3MTk0NjF9.Lv3vSvQTv_qgpuFD8e59t60YbAWafZO6W5cjYMx5lcw
84+
*
85+
* xxx.xxx.xxx
8586
*
8687
* @apiErrorExample {json} Error-Response:
8788
* HTTP/1.1 400 Bad Request
8889
* {
8990
* "message": "Illegal login request parameter: username format false"
9091
* }
91-
*
92-
* @apiErrorExample {json} Error-Response:
93-
* HTTP/1.1 500 Internal Server Error
94-
* {
95-
* "message": "JSON parse error: java.io.EOFException: End of input at line 4 column 1 path $.password; nested exception is com.google.gson.JsonSyntaxException: java.io.EOFException: End of input at line 4 column 1 path $.password"
96-
* }
9792
*/
9893
@PostMapping("/login")
9994
public String login(@RequestBody LoginParam loginForm) {
@@ -127,12 +122,6 @@ public String login(@RequestBody LoginParam loginForm) {
127122
* {
128123
* "message": "Illegal register request parameter: email already exist"
129124
* }
130-
*
131-
* @apiErrorExample {json} Error-Response:
132-
* HTTP/1.1 500 Internal Server Error
133-
* {
134-
* "message": "JSON parse error: java.io.EOFException: End of input at line 6 column 1 path $.roleId; nested exception is com.google.gson.JsonSyntaxException: java.io.EOFException: End of input at line 6 column 1 path $.roleId"
135-
* }
136125
*/
137126
@PostMapping("/register")
138127
public void register(@RequestBody RegisterUserParam registerUserParam) {
@@ -156,9 +145,9 @@ public void register(@RequestBody RegisterUserParam registerUserParam) {
156145
* HTTP/1.1 200 OK
157146
*
158147
* @apiErrorExample {json} Error-Response:
159-
* HTTP/1.1 500 Internal Server Error
148+
* HTTP/1.1 400
160149
* {
161-
* "message": "JSON parse error: java.io.EOFException: End of input at line 4 column 1 path $[2]; nested exception is com.google.gson.JsonSyntaxException: java.io.EOFException: End of input at line 4 column 1 path $[2]"
150+
* "message": xxx
162151
* }
163152
*/
164153
@PostMapping(path = "/delete")
@@ -185,9 +174,10 @@ public void delete(@RequestBody ListParam<String> listParam) {
185174
* HTTP/1.1 200 OK
186175
*
187176
* @apiErrorExample {json} Error-Response:
188-
* HTTP/1.1 500 Internal Server Error
177+
* HTTP/1.1 400
178+
*
189179
* {
190-
* "message": "JSON parse error: java.io.EOFException: End of input at line 6 column 1 path $.roleId; nested exception is com.google.gson.JsonSyntaxException: java.io.EOFException: End of input at line 6 column 1 path $.roleId"
180+
* "message": xxx
191181
* }
192182
*/
193183
@PostMapping("/role/update")

platform-api/src/main/java/com/flow/platform/api/domain/EnvObject.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,37 @@ public void setEnvs(Map<String, String> envs) {
4343
this.envs = envs;
4444
}
4545

46+
/**
47+
* Get env value by EnvKey
48+
*/
4649
public String getEnv(EnvKey key) {
4750
return envs.get(key.name());
4851
}
4952

53+
/**
54+
* Get env value by EnvKey
55+
* Return default value if env value is null
56+
*/
57+
public String getEnv(EnvKey key, String defaultValue) {
58+
return getEnv(key.name(), defaultValue);
59+
}
60+
61+
/**
62+
* Get env value by string key
63+
*/
5064
public String getEnv(String key) {
5165
return envs.get(key);
5266
}
5367

68+
/**
69+
* Get env value by string key
70+
* Return default value if env value is null
71+
*/
72+
public String getEnv(String key, String defaultValue) {
73+
String value = envs.get(key);
74+
return value == null ? defaultValue : value;
75+
}
76+
5477
public void putEnv(EnvKey key, EnvValue value) {
5578
envs.put(key.name(), value.value());
5679
}

platform-api/src/main/java/com/flow/platform/api/git/GitEventDataExtractor.java renamed to platform-api/src/main/java/com/flow/platform/api/git/GitEventEnvConverter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import com.flow.platform.api.domain.envs.GitEnvs;
2020
import com.flow.platform.core.exception.IllegalParameterException;
21+
import com.flow.platform.util.git.model.GitCommit;
2122
import com.flow.platform.util.git.model.GitEvent;
2223
import com.flow.platform.util.git.model.GitPullRequestEvent;
2324
import com.flow.platform.util.git.model.GitPullRequestEvent.State;
2425
import com.flow.platform.util.git.model.GitPushTagEvent;
26+
import java.util.Collections;
2527
import java.util.HashMap;
2628
import java.util.Map;
2729

@@ -30,9 +32,21 @@
3032
*
3133
* @author yang
3234
*/
33-
public class GitEventDataExtractor {
35+
public class GitEventEnvConverter {
3436

35-
public static Map<String, String> extract(GitEvent event) {
37+
public static Map<String, String> convert(GitCommit commit) {
38+
if (commit == null) {
39+
return Collections.emptyMap();
40+
}
41+
42+
Map<String, String> info = new HashMap<>(3);
43+
info.put(GitEnvs.FLOW_GIT_COMMIT_ID.name(), commit.getId());
44+
info.put(GitEnvs.FLOW_GIT_AUTHOR.name(), commit.getAuthor());
45+
info.put(GitEnvs.FLOW_GIT_CHANGELOG.name(), commit.getMessage());
46+
return info;
47+
}
48+
49+
public static Map<String, String> convert(GitEvent event) {
3650
if (event instanceof GitPullRequestEvent) {
3751
GitPullRequestEvent pr = (GitPullRequestEvent) event;
3852
Map<String, String> info = new HashMap<>(6);

platform-api/src/main/java/com/flow/platform/api/service/AgentServiceImpl.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.flow.platform.util.CollectionUtil;
3333
import com.flow.platform.util.Logger;
3434
import com.flow.platform.util.http.HttpClient;
35+
import com.flow.platform.util.http.HttpResponse;
3536
import com.google.common.base.Strings;
3637
import com.google.gson.JsonSyntaxException;
3738
import java.io.UnsupportedEncodingException;
@@ -72,16 +73,16 @@ public class AgentServiceImpl implements AgentService {
7273

7374
@Override
7475
public List<AgentWithFlow> list() {
75-
String res = HttpClient.build(platformURL.getAgentUrl())
76+
HttpResponse<String> response = HttpClient.build(platformURL.getAgentUrl())
7677
.get()
7778
.retry(httpRetryTimes)
78-
.bodyAsString().getBody();
79+
.bodyAsString();
7980

80-
if (Strings.isNullOrEmpty(res)) {
81+
if (!response.hasSuccess()) {
8182
throw new HttpException("Unable to load agent list");
8283
}
8384

84-
Agent[] agents = Jsonable.GSON_CONFIG.fromJson(res, Agent[].class);
85+
Agent[] agents = Jsonable.GSON_CONFIG.fromJson(response.getBody(), Agent[].class);
8586

8687
// get all session id from agent collection
8788
List<String> sessionIds = CollectionUtil.toPropertyList("sessionId", agents);
@@ -135,17 +136,17 @@ public Agent create(AgentPath agentPath) {
135136
try {
136137
AgentPathWithWebhook pathWithWebhook = new AgentPathWithWebhook(agentPath, buildAgentWebhook());
137138

138-
final String agentJson = HttpClient.build(platformURL.getAgentCreateUrl())
139+
HttpResponse<String> response = HttpClient.build(platformURL.getAgentCreateUrl())
139140
.post(pathWithWebhook.toJson())
140141
.withContentType(ContentType.APPLICATION_JSON)
141142
.retry(httpRetryTimes)
142-
.bodyAsString().getBody();
143+
.bodyAsString();
143144

144-
if (Strings.isNullOrEmpty(agentJson)) {
145+
if (!response.hasSuccess()) {
145146
throw new HttpException("Unable to create agent via control center");
146147
}
147148

148-
return Agent.parse(agentJson, Agent.class);
149+
return Agent.parse(response.getBody(), Agent.class);
149150

150151
} catch (UnsupportedEncodingException | JsonSyntaxException e) {
151152
throw new IllegalStatusException("Unable to create agent", e);
@@ -155,16 +156,16 @@ public Agent create(AgentPath agentPath) {
155156
@Override
156157
public AgentSettings settings(String token) {
157158
String url = platformURL.getAgentSettingsUrl() + "?" + "token=" + token;
158-
String settingsJson = HttpClient.build(url)
159+
HttpResponse<String> response = HttpClient.build(url)
159160
.get()
160161
.retry(httpRetryTimes)
161-
.bodyAsString().getBody();
162+
.bodyAsString();
162163

163-
if (Strings.isNullOrEmpty(settingsJson)) {
164+
if (!response.hasSuccess()) {
164165
throw new HttpException("Unable to get agent settings from control center");
165166
}
166167

167-
return AgentSettings.parse(settingsJson, AgentSettings.class);
168+
return AgentSettings.parse(response.getBody(), AgentSettings.class);
168169
}
169170

170171
private String buildAgentWebhook() {

0 commit comments

Comments
 (0)