Skip to content

Commit e7e32a3

Browse files
author
yang.guo
authored
Merge pull request #166 from FlowCI/fix/agent/164
Fix/agent/164
2 parents 28a79eb + 4df4974 commit e7e32a3

File tree

8 files changed

+68
-51
lines changed

8 files changed

+68
-51
lines changed

platform-agent/src/main/java/com/flow/platform/agent/CmdManager.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,27 +173,23 @@ public void run() {
173173
ProcEventHandler procEventHandler =
174174
new ProcEventHandler(getCmd(), extraProcEventListeners, running, finished);
175175

176-
CmdExecutor executor;
177176
try {
178-
executor = new CmdExecutor(
177+
CmdExecutor executor = new CmdExecutor(
179178
procEventHandler,
180179
logListener,
181180
cmd.getInputs(),
182181
cmd.getWorkingDir(),
183182
cmd.getOutputEnvFilter(),
184183
cmd.getTimeout(),
185184
Lists.newArrayList(getCmd().getCmd()));
185+
186+
executor.run();
186187
} catch (Throwable e) {
187188
LOGGER.errorMarker("execute", "Cannot init CmdExecutor for cmd " + cmd, e);
188-
189189
CmdResult result = new CmdResult();
190190
result.getExceptions().add(e);
191191
procEventHandler.onException(result);
192-
193-
return;
194192
}
195-
196-
executor.run();
197193
}
198194
});
199195

@@ -270,12 +266,9 @@ public synchronized void kill() {
270266
}
271267

272268
try {
273-
if (!cmdExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
274-
cmdExecutor.shutdownNow();
275-
LOGGER.warn("Force to terminate CmdExecutor since been waiting 10 seconds");
276-
}
277-
} catch (Throwable e) {
278-
LOGGER.error("Exception while waiting for all cmd thread finish", e);
269+
cmdExecutor.shutdownNow();
270+
} catch (Throwable ignore) {
271+
279272
} finally {
280273
cmdExecutor = createExecutor(); // reset cmd executor
281274
LOGGER.trace("Cmd thread terminated");

platform-agent/src/test/java/com/flow/platform/agent/test/AgentManagerTest.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import com.flow.platform.util.zk.ZKClient;
2424
import com.flow.platform.util.zk.ZkException;
2525
import java.util.concurrent.ArrayBlockingQueue;
26+
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.ThreadPoolExecutor;
2728
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
2830
import org.apache.curator.test.TestingServer;
2931
import org.apache.curator.utils.ZKPaths;
32+
import org.apache.zookeeper.ZKUtil;
3033
import org.junit.After;
3134
import org.junit.AfterClass;
3235
import org.junit.Assert;
@@ -82,27 +85,6 @@ public void should_agent_registered() throws Throwable {
8285
agent.stop();
8386
}
8487

85-
@Test
86-
public void should_zookeeper_create_node_atom() {
87-
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS,
88-
new ArrayBlockingQueue<>(5));
89-
final int[] size = {0};
90-
String agentNodePath = ZKPaths.makePath(ZK_ROOT, "flow");
91-
zkClient.delete(agentNodePath, false);
92-
for (int i = 0; i < 5; i++) {
93-
threadPoolExecutor.execute(() -> {
94-
try {
95-
zkClient.createEphemeral(agentNodePath);
96-
size[0] = size[0] + 1;
97-
} catch (ZkException e) {
98-
System.out.println(e);
99-
}
100-
});
101-
}
102-
103-
Assert.assertEquals(1, size[0]);
104-
}
105-
10688
@Test
10789
public void should_receive_command() throws Throwable {
10890
AgentManager agent = new AgentManager(server.getConnectString(), 20000, ZONE, MACHINE);
@@ -123,6 +105,7 @@ public void should_receive_command() throws Throwable {
123105

124106
@After
125107
public void after() throws Throwable {
108+
zkClient.delete(ZKPaths.makePath(ZK_ROOT, ZONE, MACHINE), true);
126109
zkClient.close();
127110
}
128111

platform-cmd-runner/src/main/java/com/flow/platform/cmd/CmdExecutor.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.flow.platform.domain.CmdResult;
2020
import com.flow.platform.util.DateUtil;
21+
import com.flow.platform.util.Logger;
2122
import com.flow.platform.util.SystemUtil;
2223
import com.google.common.base.Strings;
2324
import com.google.common.collect.Lists;
@@ -82,6 +83,8 @@ public void onFinish() {
8283
}
8384
}
8485

86+
private final static Logger LOGGER = new Logger(CmdExecutor.class);
87+
8588
private final static File DEFAULT_WORKING_DIR = new File(
8689
System.getProperty("user.home", System.getProperty("user.dir")));
8790

@@ -226,7 +229,7 @@ public CmdResult run() {
226229

227230
outputResult.setExecutedTime(DateUtil.now());
228231
procListener.onExecuted(outputResult);
229-
System.out.println(String.format("====== 1. Process executed : %s ======", outputResult.getExitValue()));
232+
LOGGER.trace("====== 1. Process executed : %s ======", outputResult.getExitValue());
230233

231234
// wait for log thread with max 30 seconds to continue upload log
232235
logThreadCountDown.await(DEFAULT_LOGGING_WAITTING_SECONDS, TimeUnit.SECONDS);
@@ -239,16 +242,17 @@ public CmdResult run() {
239242

240243
outputResult.setFinishTime(DateUtil.now());
241244
procListener.onLogged(outputResult);
245+
LOGGER.trace("====== 2. Logging executed ======");
242246

243-
System.out.println(String.format("====== 2. Logging executed ======"));
244-
247+
} catch (InterruptedException ignore) {
248+
LOGGER.warn(ignore.getMessage());
245249
} catch (Throwable e) {
246250
outputResult.getExceptions().add(e);
247251
outputResult.setFinishTime(DateUtil.now());
248252
procListener.onException(outputResult);
249-
e.printStackTrace();
253+
LOGGER.warn(e.getMessage());
250254
} finally {
251-
System.out.println("====== 3. Process Done ======");
255+
LOGGER.trace("====== 3. Process Done ======");
252256
}
253257

254258
return outputResult;
@@ -290,7 +294,7 @@ private Runnable createCmdListExec(final OutputStream outputStream, final List<S
290294
}
291295

292296
} catch (IOException e) {
293-
System.out.println("Exception on write cmd: " + e.getMessage());
297+
LOGGER.warn("Exception on write cmd: " + e.getMessage());
294298
}
295299
};
296300
}
@@ -316,7 +320,7 @@ private Runnable createCmdLoggingReader() {
316320
} finally {
317321
logListener.onFinish();
318322
logThreadCountDown.countDown();
319-
System.out.println(" ===== Logging Reader Thread Finish =====");
323+
LOGGER.trace(" ===== Logging Reader Thread Finish =====");
320324
}
321325
};
322326
}
@@ -340,7 +344,7 @@ private Runnable createStdStreamReader(final Log.Type type, final InputStream is
340344

341345
} finally {
342346
stdThreadCountDown.countDown();
343-
System.out.println(String.format(" ===== %s Stream Reader Thread Finish =====", type));
347+
LOGGER.trace(" ===== %s Stream Reader Thread Finish =====", type);
344348
}
345349
};
346350
}

platform-control-center/src/main/java/com/flow/platform/cc/service/AgentServiceImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ public void saveWithStatus(Agent agent, AgentStatus status) {
165165

166166
agent.setStatus(status);
167167
agentDao.update(agent);
168+
LOGGER.trace("Agent status been updated to '%s'", status);
168169

169170
// send webhook if status changed
170171
if (statusIsChanged) {

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdDispatchServiceImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,12 @@ public void doExec(Agent target, Cmd cmd) {
323323
return;
324324
}
325325

326-
List<Cmd> runningCmdForSession = getRunningCmd(target.getSessionId());
327-
328-
// kill current running cmd
329-
for (Cmd runningCmd : runningCmdForSession) {
326+
// kill current running cmd and update agent status from cmd callback
327+
for (Cmd runningCmd : getRunningCmd(target.getSessionId())) {
330328
Cmd killCmd = cmdService.create(new CmdInfo(runningCmd.getAgentPath(), CmdType.KILL, null));
331329
handler.get(CmdType.KILL).exec(killCmd);
332330
}
333331

334-
// release session from target
335332
target.setSessionId(null);
336333
agentService.saveWithStatus(target, AgentStatus.IDLE);
337334
}

platform-control-center/src/main/java/com/flow/platform/cc/service/CmdServiceImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.time.ZonedDateTime;
5656
import java.util.EnumSet;
5757
import java.util.HashMap;
58+
import java.util.LinkedList;
5859
import java.util.List;
5960
import java.util.Map;
6061
import java.util.Set;
@@ -229,6 +230,8 @@ public void updateStatus(CmdStatusItem statusItem, boolean inQueue) {
229230
//TODO: missing unit test
230231
// set cmd status in sequence
231232
if (!cmd.addStatus(statusItem.getStatus())) {
233+
LOGGER.warn("Cannot add cmd '%s' from '%s' status to '%s'",
234+
cmd.getId(), cmd.getStatus(), statusItem.getStatus());
232235
return;
233236
}
234237

@@ -273,20 +276,19 @@ public void saveLog(String cmdId, MultipartFile file) {
273276

274277
/**
275278
* Update agent status when report cmd status and result
279+
* - DONOT update agent status if cmd with session, since it controlled by session cmd
276280
* - busy or idle by Cmd.Type.RUN_SHELL while report cmd status
277281
*
278282
* @param cmd Cmd object
279283
*/
280284
private void updateAgentStatusFromCmd(Cmd cmd) {
281-
// do not update agent status duration session
282-
String sessionId = cmd.getSessionId();
283-
if (sessionId != null && agentService.find(sessionId) != null) {
285+
if (cmd.hasSession()) {
284286
return;
285287
}
286288

287-
// update agent status by cmd status
288289
AgentPath agentPath = cmd.getAgentPath();
289290
boolean isAgentBusy = false;
291+
290292
for (Cmd tmp : listByAgentPath(agentPath)) {
291293
if (tmp.getType() != CmdType.RUN_SHELL) {
292294
continue;

platform-control-center/src/test/java/com/flow/platform/cc/test/service/CmdDispatchServiceTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@
3232
import com.flow.platform.domain.AgentStatus;
3333
import com.flow.platform.domain.Cmd;
3434
import com.flow.platform.domain.CmdInfo;
35+
import com.flow.platform.domain.CmdReport;
3536
import com.flow.platform.domain.CmdResult;
3637
import com.flow.platform.domain.CmdStatus;
3738
import com.flow.platform.domain.CmdType;
3839
import com.flow.platform.domain.Zone;
3940
import java.util.List;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.TimeUnit;
4043
import org.junit.After;
4144
import org.junit.Assert;
4245
import org.junit.Before;
@@ -173,6 +176,10 @@ public void should_running_cmd_been_killed_when_delete_session() throws Throwabl
173176
Assert.assertEquals(CmdType.KILL, killCmd.getType());
174177
Assert.assertNotEquals(cmd.getId(), killCmd.getId());
175178

179+
// when: mock the KILLED cmd been reported from agent
180+
CmdReport mockKilledReport = new CmdReport(cmd.getId(), CmdStatus.KILLED, new CmdResult(147));
181+
cmdService.updateStatus(new CmdStatusItem(mockKilledReport, true, true), false);
182+
176183
// then: verify agent status
177184
Agent sessionShouldReleased = agentService.find(cmd.getAgentPath());
178185
Assert.assertNull(sessionShouldReleased.getSessionId());

platform-util-zk/src/test/java/com/flow/platform/util/zk/test/ZkClientTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.flow.platform.util.zk.test;
1818

1919
import com.flow.platform.util.zk.ZKClient;
20+
import com.flow.platform.util.zk.ZkException;
21+
import java.util.concurrent.ArrayBlockingQueue;
2022
import java.util.concurrent.CountDownLatch;
2123
import java.util.concurrent.Executor;
2224
import java.util.concurrent.Executors;
@@ -76,6 +78,34 @@ public void should_return_false_if_node_not_exist() throws Throwable {
7678
Assert.assertFalse(zkClient.exist("/hello/not-exit"));
7779
}
7880

81+
@Test
82+
public void should_create_node_atom() throws Throwable {
83+
ThreadPoolExecutor threadPoolExecutor =
84+
new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
85+
86+
final AtomicInteger size = new AtomicInteger(0);
87+
final CountDownLatch latch = new CountDownLatch(5);
88+
89+
String agentNodePath = ZKPaths.makePath("/flow-agent", "flow-atom");
90+
zkClient.delete(agentNodePath, false);
91+
92+
for (int i = 0; i < 5; i++) {
93+
threadPoolExecutor.execute(() -> {
94+
try {
95+
zkClient.createEphemeral(agentNodePath);
96+
size.incrementAndGet();
97+
} catch (ZkException e) {
98+
System.out.println(e);
99+
} finally {
100+
latch.countDown();
101+
}
102+
});
103+
}
104+
105+
latch.await(30, TimeUnit.SECONDS);
106+
Assert.assertEquals(1, size.get());
107+
}
108+
79109
@Test
80110
public void should_create_and_delete_zk_node() throws Throwable {
81111
// init:

0 commit comments

Comments
 (0)