Skip to content

Commit 8f6edb9

Browse files
committed
delete & create agent
2 parents 8dc1857 + 78bdf13 commit 8f6edb9

File tree

54 files changed

+1250
-470
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1250
-470
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
.idea/
22
*.iml
33
.DS_Store
4+
deploy.sh
45

56
node_modules/
67
apidoc/

docker/app-cc.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ zone.default.default_cmd_timeout = 600
2525

2626
### rabbitmq config ###
2727
mq.host = amqp://localhost:5672
28-
mq.queue.cmd.name = flow-cmd-queue-default
2928
mq.management.host = http://localhost:15672
3029

3130
#### cmd queue settings ###
31+
queue.cmd.retry.enable = false
32+
queue.cmd.rabbit.enable = false
33+
queue.cmd.rabbit.name = flow-cmd-queue-default
3234
queue.cmd.idle_agent.timeout = 30
3335
queue.cmd.idle_agent.period = 5
3436

platform-agent/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,21 @@
9797

9898
<build>
9999
<finalName>flow-agent-${client.version}</finalName>
100+
101+
<resources>
102+
<resource>
103+
<directory>src/main/resources</directory>
104+
<filtering>true</filtering>
105+
</resource>
106+
</resources>
107+
108+
<testResources>
109+
<testResource>
110+
<directory>src/test/resources</directory>
111+
<filtering>true</filtering>
112+
</testResource>
113+
</testResources>
114+
100115
<plugins>
101116
<plugin>
102117
<groupId>org.apache.maven.plugins</groupId>

platform-agent/src/main/java/com/flow/platform/agent/CmdManager.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@
1717
package com.flow.platform.agent;
1818

1919
import com.flow.platform.cmd.CmdExecutor;
20+
import com.flow.platform.cmd.Log;
21+
import com.flow.platform.cmd.Log.Type;
2022
import com.flow.platform.cmd.ProcListener;
2123
import com.flow.platform.domain.Cmd;
2224
import com.flow.platform.domain.CmdResult;
2325
import com.flow.platform.domain.CmdStatus;
2426
import com.flow.platform.domain.CmdType;
27+
import com.flow.platform.domain.Jsonable;
2528
import com.flow.platform.util.Logger;
2629
import com.google.common.collect.Lists;
2730
import com.google.common.collect.Maps;
28-
2931
import java.time.ZonedDateTime;
3032
import java.util.ArrayList;
31-
import java.util.Date;
33+
import java.util.HashMap;
3234
import java.util.List;
3335
import java.util.Map;
34-
import java.util.concurrent.*;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.LinkedBlockingQueue;
39+
import java.util.concurrent.ThreadFactory;
40+
import java.util.concurrent.ThreadPoolExecutor;
41+
import java.util.concurrent.TimeUnit;
3542

3643
/**
3744
* Singleton class to handle command
@@ -166,7 +173,6 @@ public void run() {
166173
ProcEventHandler procEventHandler =
167174
new ProcEventHandler(getCmd(), extraProcEventListeners, running, finished);
168175

169-
170176
CmdExecutor executor;
171177
try {
172178
executor = new CmdExecutor(
@@ -194,6 +200,15 @@ public void run() {
194200
return;
195201
}
196202

203+
if (cmd.getType() == CmdType.SYSTEM_INFO) {
204+
LogEventHandler logListener = new LogEventHandler(cmd);
205+
Log log = new Log(Type.STDERR, collectionAgentInfo());
206+
logListener.onLog(log);
207+
logListener.onFinish();
208+
cmd.setStatus(CmdStatus.EXECUTED);
209+
return;
210+
}
211+
197212
// kill current running proc
198213
if (cmd.getType() == CmdType.KILL) {
199214
defaultExecutor.execute(this::kill);
@@ -214,6 +229,29 @@ public void run() {
214229
}
215230
}
216231

232+
/**
233+
* collect agent info
234+
* @return
235+
*/
236+
private String collectionAgentInfo() {
237+
String javaVersion = System.getProperty("java.version");
238+
String osName = System.getProperty("os.name");
239+
Runtime runtime = Runtime.getRuntime();
240+
int kb = 1024;
241+
long total = runtime.totalMemory();
242+
long free = runtime.freeMemory();
243+
long use = total - free;
244+
Map<String, String> dic = new HashMap<>(7);
245+
dic.put("javaVersion", javaVersion);
246+
dic.put("osName", osName);
247+
dic.put("totalMemory", total / kb + "MB");
248+
dic.put("useMemory", use / kb + "MB");
249+
dic.put("zone", Config.zone());
250+
dic.put("name", Config.name());
251+
dic.put("agentVersion", Config.getProperty("version"));
252+
return Jsonable.GSON_CONFIG.toJson(dic);
253+
}
254+
217255
/**
218256
* Kill all current running process
219257
*/

platform-agent/src/main/java/com/flow/platform/agent/Config.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,26 @@
1818

1919
import com.flow.platform.domain.AgentSettings;
2020
import com.flow.platform.domain.Jsonable;
21+
import com.flow.platform.util.ExceptionUtil;
22+
import com.flow.platform.util.Logger;
2123
import com.flow.platform.util.http.HttpClient;
2224
import com.flow.platform.util.http.HttpResponse;
2325
import com.flow.platform.util.zk.ZKClient;
26+
import java.io.FileInputStream;
27+
import java.io.InputStream;
28+
import java.net.URL;
2429
import java.nio.file.Path;
2530
import java.nio.file.Paths;
31+
import java.util.Properties;
2632
import org.apache.curator.utils.ZKPaths;
2733

2834
/**
2935
3036
*/
3137
public class Config {
3238

39+
private final static Logger LOGGER = new Logger(Config.class);
40+
3341
public final static String ZK_ROOT = "flow-agents";
3442

3543
/* Config properties by using -Dxxx.xxx = xxx as JVM parameter */
@@ -50,6 +58,8 @@ public class Config {
5058
public static String ZONE;
5159
public static String NAME;
5260

61+
private static Properties properties;
62+
5363
public static boolean isDebug() {
5464
String boolStr = System.getProperty(PROP_IS_DEBUG, "false");
5565
return Boolean.parseBoolean(boolStr);
@@ -60,6 +70,29 @@ public static int zkTimeout() {
6070
return Integer.parseInt(intStr);
6171
}
6272

73+
/**
74+
* get property from application.properties
75+
* @param name
76+
* @return
77+
*/
78+
public static String getProperty(String name) {
79+
String value;
80+
URL resource = Config.class.getClassLoader().getResource("application.properties");
81+
if (properties == null) {
82+
try (InputStream fileInputStream = new FileInputStream(resource.getFile())) {
83+
properties = new Properties();
84+
properties.load(fileInputStream);
85+
} catch (Throwable e) {
86+
LOGGER.warn("get property from application.properties error %s",
87+
ExceptionUtil.findRootCause(e).getMessage());
88+
}
89+
}
90+
91+
value = properties.getProperty(name);
92+
93+
return value;
94+
}
95+
6396
/**
6497
* Is delete cmd log after uploaded
6598
*/
@@ -104,7 +137,7 @@ public static Path logDir() {
104137
}
105138

106139
public static int concurrentThreadNum() {
107-
String intStr = System.getProperty(PROP_CONCURRENT_THREAD, "1");
140+
String intStr = System.getProperty(PROP_CONCURRENT_THREAD, "2");
108141
return Integer.parseInt(intStr);
109142
}
110143

platform-agent/src/main/java/com/flow/platform/agent/LogEventHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,10 @@ public void onFinish() {
121121
}
122122

123123
public String websocketLogFormat(Log log) {
124-
return String.format("%s#%s#%s#%s#%s", log.getNumber(), cmd.getZoneName(), cmd.getAgentName(), cmd.getId(),
125-
log.getContent());
124+
return String
125+
.format("%s#%s#%s#%s#%s#%s", cmd.getType(), log.getNumber(), cmd.getZoneName(), cmd.getAgentName(),
126+
cmd.getId(),
127+
log.getContent());
126128
}
127129

128130
private void initWebSocketSession(String url, int wsConnectionTimeout) throws Exception {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
version=${client.version}

platform-agent/src/test/java/com/flow/platform/agent/test/CmdManagerTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.flow.platform.cmd.ProcListener;
2222
import com.flow.platform.domain.Cmd;
2323
import com.flow.platform.domain.CmdResult;
24+
import com.flow.platform.domain.CmdStatus;
2425
import com.flow.platform.domain.CmdType;
2526
import java.io.IOException;
2627
import java.util.Map;
@@ -213,4 +214,56 @@ public void onException(CmdResult result) {
213214
Assert.assertNotNull(result.getExitValue());
214215
Assert.assertEquals(CmdResult.EXIT_VALUE_FOR_KILL, result.getExitValue());
215216
}
217+
218+
219+
@Test
220+
public void should_success_run_sys_cmd() throws InterruptedException {
221+
222+
Cmd cmd = new Cmd("zone1", "agent1", CmdType.RUN_SHELL, resourcePath);
223+
cmd.setId(UUID.randomUUID().toString());
224+
CountDownLatch finishCountDownLatch = new CountDownLatch(1);
225+
CountDownLatch startCountDownLatch = new CountDownLatch(1);
226+
227+
cmdManager.getExtraProcEventListeners().add(new ProcListener() {
228+
@Override
229+
public void onStarted(CmdResult result) {
230+
startCountDownLatch.countDown();
231+
try {
232+
Thread.sleep(3000);
233+
} catch (Throwable e) {
234+
}
235+
}
236+
237+
@Override
238+
public void onLogged(CmdResult result) {
239+
finishCountDownLatch.countDown();
240+
}
241+
242+
@Override
243+
public void onExecuted(CmdResult result) {
244+
245+
}
246+
247+
@Override
248+
public void onException(CmdResult result) {
249+
250+
}
251+
});
252+
253+
// when: start and kill task immediately
254+
cmdManager.execute(cmd);
255+
256+
startCountDownLatch.await();
257+
Assert.assertEquals(1, cmdManager.getRunning().size());
258+
259+
Cmd cmdSys = new Cmd("zone1", "agent1", CmdType.SYSTEM_INFO, "");
260+
cmdSys.setId(UUID.randomUUID().toString());
261+
262+
cmdManager.execute(cmdSys);
263+
Assert.assertEquals(CmdStatus.EXECUTED, cmdSys.getStatus());
264+
265+
finishCountDownLatch.await();
266+
Assert.assertEquals(1, cmdManager.getFinished().size());
267+
268+
}
216269
}

platform-agent/src/test/java/com/flow/platform/agent/test/LogEventHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public void should_get_correct_format_websocket() throws Throwable {
5858

5959
// then:
6060
String expect = String
61-
.format("%s#%s#%s#%s#%s", null, cmd.getZoneName(), cmd.getAgentName(), cmd.getId(), mockLogContent);
61+
.format("%s#%s#%s#%s#%s#%s", CmdType.RUN_SHELL, null, cmd.getZoneName(), cmd.getAgentName(), cmd.getId(),
62+
mockLogContent);
6263
Assert.assertEquals(expect, socketIoData);
6364
}
6465
}

platform-api/src/main/java/com/flow/platform/api/config/AppConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.flow.platform.api.util.PlatformURL;
2222
import com.flow.platform.core.config.AppConfigBase;
2323
import com.flow.platform.core.config.DatabaseConfig;
24+
import com.flow.platform.core.queue.InMemoryQueue;
25+
import com.flow.platform.core.queue.PlatformQueue;
2426
import com.flow.platform.core.util.ThreadUtil;
2527
import com.flow.platform.util.Logger;
2628
import java.io.IOException;
@@ -68,6 +70,9 @@ public class AppConfig extends AppConfigBase {
6870
public final static String DEFAULT_USER_NAME = "admin";
6971
public final static String DEFAULT_USER_PASSWORD = "123456";
7072

73+
private final static ThreadPoolTaskExecutor executor =
74+
ThreadUtil.createTaskExecutor(ASYNC_POOL_SIZE, ASYNC_POOL_SIZE / 10, 100, THREAD_NAME_PREFIX);
75+
7176
@Value("${api.workspace}")
7277
private String workspace;
7378

@@ -98,12 +103,15 @@ public ThreadLocal<String> currentNodePath() {
98103
@Bean
99104
@Override
100105
public ThreadPoolTaskExecutor taskExecutor() {
101-
return ThreadUtil.createTaskExecutor(ASYNC_POOL_SIZE, ASYNC_POOL_SIZE / 10, 100, THREAD_NAME_PREFIX);
106+
return executor;
102107
}
103108

109+
/**
110+
* Queue to process cmd callback task
111+
*/
104112
@Bean
105-
public BlockingQueue<CmdCallbackQueueItem> cmdBaseBlockingQueue() {
106-
return new LinkedBlockingQueue<>(50);
113+
public PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue() {
114+
return new InMemoryQueue<>(executor, 50);
107115
}
108116

109117
@Override

0 commit comments

Comments
 (0)