Skip to content

Commit 515e996

Browse files
refactored code / addressed comments
1 parent 9ef1c12 commit 515e996

File tree

5 files changed

+82
-73
lines changed

5 files changed

+82
-73
lines changed

agent/src/main/java/com/cloud/agent/Agent.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,16 +1011,20 @@ public void processResponse(final Response response, final Link link) {
10111011
listener.processControlResponse(response, (AgentControlAnswer)answer);
10121012
}
10131013
} else if (answer instanceof PingAnswer) {
1014-
if ((((PingAnswer) answer).isSendStartup()) && reconnectAllowed) {
1015-
logger.info("Management server requested startup command to reinitialize the agent");
1016-
sendStartup(link);
1017-
}
1018-
shell.setAvoidHosts(((PingAnswer) answer).getAvoidMsList());
1014+
processPingAnswer((PingAnswer) answer);
10191015
} else {
10201016
updateLastPingResponseTime();
10211017
}
10221018
}
10231019

1020+
private void processPingAnswer(final PingAnswer answer) {
1021+
if ((answer.isSendStartup()) && reconnectAllowed) {
1022+
logger.info("Management server requested startup command to reinitialize the agent");
1023+
sendStartup(link);
1024+
}
1025+
shell.setAvoidHosts(answer.getAvoidMsList());
1026+
}
1027+
10241028
public void processReadyCommand(final Command cmd) {
10251029
final ReadyCommand ready = (ReadyCommand)cmd;
10261030
// Set human readable sizes;

engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,7 @@ public boolean configure(final String name, final Map<String, Object> params) th
255255

256256
_executor = new ThreadPoolExecutor(agentTaskThreads, agentTaskThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentTaskPool"));
257257

258-
_connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool"));
259-
// allow core threads to time out even when there are no items in the queue
260-
_connectExecutor.allowCoreThreadTimeOut(true);
258+
initConnectExecutor();
261259

262260
maxConcurrentNewAgentConnections = RemoteAgentMaxConcurrentNewConnections.value();
263261

@@ -273,10 +271,6 @@ public boolean configure(final String name, final Map<String, Object> params) th
273271
logger.debug("Created DirectAgentAttache pool with size: {}.", directAgentPoolSize);
274272
_directAgentThreadCap = Math.round(directAgentPoolSize * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0
275273

276-
_monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor"));
277-
278-
newAgentConnectionsMonitor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NewAgentConnectionsMonitor"));
279-
280274
initializeCommandTimeouts();
281275

282276
return true;
@@ -388,10 +382,8 @@ public void onManagementServerMaintenance() {
388382
public void onManagementServerCancelMaintenance() {
389383
logger.debug("Management server maintenance disabled");
390384
if (_connectExecutor.isShutdown()) {
391-
_connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool"));
392-
_connectExecutor.allowCoreThreadTimeOut(true);
385+
initConnectExecutor();
393386
}
394-
395387
startDirectlyConnectedHosts(true);
396388
if (_connection != null) {
397389
try {
@@ -402,16 +394,30 @@ public void onManagementServerCancelMaintenance() {
402394
}
403395

404396
if (_monitorExecutor.isShutdown()) {
405-
_monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor"));
406-
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS);
397+
initAndScheduleMonitorExecutor();
407398
}
408399
if (newAgentConnectionsMonitor.isShutdown()) {
409-
final int cleanupTimeInSecs = Wait.value();
410-
newAgentConnectionsMonitor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NewAgentConnectionsMonitor"));
411-
newAgentConnectionsMonitor.scheduleAtFixedRate(new AgentNewConnectionsMonitorTask(), cleanupTimeInSecs, cleanupTimeInSecs, TimeUnit.SECONDS);
400+
initAndScheduleAgentConnectionsMonitor();
412401
}
413402
}
414403

404+
private void initConnectExecutor() {
405+
_connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool"));
406+
// allow core threads to time out even when there are no items in the queue
407+
_connectExecutor.allowCoreThreadTimeOut(true);
408+
}
409+
410+
private void initAndScheduleMonitorExecutor() {
411+
_monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor"));
412+
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS);
413+
}
414+
415+
private void initAndScheduleAgentConnectionsMonitor() {
416+
final int cleanupTimeInSecs = Wait.value();
417+
newAgentConnectionsMonitor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NewAgentConnectionsMonitor"));
418+
newAgentConnectionsMonitor.scheduleAtFixedRate(new AgentNewConnectionsMonitorTask(), cleanupTimeInSecs, cleanupTimeInSecs, TimeUnit.SECONDS);
419+
}
420+
415421
private AgentControlAnswer handleControlCommand(final AgentAttache attache, final AgentControlCommand cmd) {
416422
AgentControlAnswer answer;
417423

@@ -805,12 +811,8 @@ public boolean start() {
805811
}
806812
}
807813

808-
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS);
809-
810-
final int cleanupTimeInSecs = Wait.value();
811-
newAgentConnectionsMonitor.scheduleAtFixedRate(new AgentNewConnectionsMonitorTask(), cleanupTimeInSecs,
812-
cleanupTimeInSecs, TimeUnit.SECONDS);
813-
814+
initAndScheduleMonitorExecutor();
815+
initAndScheduleAgentConnectionsMonitor();
814816
return true;
815817
}
816818

plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImpl.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,7 @@ public ManagementServerMaintenanceResponse prepareForShutdown(PrepareForShutdown
343343
throw new CloudRuntimeException("Management server is not in the right state to prepare for shutdown");
344344
}
345345

346-
final List<ManagementServerHostVO> preparingForMaintenanceOrShutDownMsList = msHostDao.listBy(State.PreparingForMaintenance, State.PreparingForShutDown);
347-
if (CollectionUtils.isNotEmpty(preparingForMaintenanceOrShutDownMsList)) {
348-
throw new CloudRuntimeException("Cannot prepare for shutdown, there are other management servers preparing for maintenance/shutdown");
349-
}
346+
checkAnyMsInPreparingStates("prepare for shutdown");
350347

351348
final Command[] cmds = new Command[1];
352349
cmds[0] = new PrepareForShutdownManagementServerHostCommand(msHost.getMsid());
@@ -370,10 +367,7 @@ public ManagementServerMaintenanceResponse triggerShutdown(TriggerShutdownCmd cm
370367
}
371368

372369
if (State.Up.equals(msHost.getState())) {
373-
final List<ManagementServerHostVO> preparingForMaintenanceOrShutDownMsList = msHostDao.listBy(State.PreparingForMaintenance, State.PreparingForShutDown);
374-
if (CollectionUtils.isNotEmpty(preparingForMaintenanceOrShutDownMsList)) {
375-
throw new CloudRuntimeException("Cannot trigger shutdown now, there are other management servers preparing for maintenance/shutdown");
376-
}
370+
checkAnyMsInPreparingStates("trigger shutdown");
377371
msHostDao.updateState(msHost.getId(), State.PreparingForShutDown);
378372
}
379373

@@ -430,10 +424,7 @@ public ManagementServerMaintenanceResponse prepareForMaintenance(PrepareForMaint
430424
throw new CloudRuntimeException("Management server is not in the right state to prepare for maintenance");
431425
}
432426

433-
final List<ManagementServerHostVO> preparingForMaintenanceOrShutDownMsList = msHostDao.listBy(State.PreparingForMaintenance, State.PreparingForShutDown);
434-
if (CollectionUtils.isNotEmpty(preparingForMaintenanceOrShutDownMsList)) {
435-
throw new CloudRuntimeException("Cannot prepare for maintenance, there are other management servers preparing for maintenance/shutdown");
436-
}
427+
checkAnyMsInPreparingStates("prepare for maintenance");
437428

438429
if (indirectAgentLB.haveAgentBasedHosts(msHost.getMsid())) {
439430
List<String> indirectAgentMsList = indirectAgentLB.getManagementServerList();
@@ -505,6 +496,13 @@ public void cancelPreparingForMaintenance(ManagementServerHostVO msHost) {
505496
msHostDao.updateState(msHost.getId(), State.Up);
506497
}
507498

499+
private void checkAnyMsInPreparingStates(String operation) {
500+
final List<ManagementServerHostVO> preparingForMaintenanceOrShutDownMsList = msHostDao.listBy(State.PreparingForMaintenance, State.PreparingForShutDown);
501+
if (CollectionUtils.isNotEmpty(preparingForMaintenanceOrShutDownMsList)) {
502+
throw new CloudRuntimeException(String.format("Cannot %s, there are other management servers preparing for maintenance/shutdown", operation));
503+
}
504+
}
505+
508506
private ManagementServerMaintenanceResponse prepareMaintenanceResponse(Long managementServerId) {
509507
ManagementServerHostVO msHost;
510508
Long[] msIds;

server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -488,26 +488,32 @@ public boolean migrateAgents(String fromMsUuid, long fromMsId, String lbAlgorith
488488

489489
List<DataCenterVO> dataCenterList = dcDao.listAll();
490490
for (DataCenterVO dc : dataCenterList) {
491-
List<Long> orderedHostIdList = getOrderedHostIdList(dc.getId());
492-
if (!migrateNonRoutingHostAgentsInZone(fromMsUuid, fromMsId, dc, migrationStartTimeInMs,
493-
timeoutDurationInMs, avoidMsList, lbAlgorithm, lbAlgorithmChanged, orderedHostIdList)) {
491+
if (!migrateAgentsInZone(dc, fromMsUuid, fromMsId, avoidMsList, lbAlgorithm, lbAlgorithmChanged,
492+
migrationStartTimeInMs, timeoutDurationInMs)) {
494493
return false;
495494
}
496-
List<Long> clusterIds = clusterDao.listAllClusterIds(dc.getId());
497-
if (CollectionUtils.isEmpty(clusterIds)) {
498-
continue;
499-
}
500-
for (Long clusterId : clusterIds) {
501-
if (!migrateRoutingHostAgentsInCluster(clusterId, fromMsUuid, fromMsId, dc, migrationStartTimeInMs,
502-
timeoutDurationInMs, avoidMsList, lbAlgorithm, lbAlgorithmChanged, orderedHostIdList)) {
503-
return false;
504-
}
505-
}
506495
}
507496

508497
return true;
509498
}
510499

500+
private boolean migrateAgentsInZone(DataCenterVO dc, String fromMsUuid, long fromMsId, List<String> avoidMsList,
501+
String lbAlgorithm, boolean lbAlgorithmChanged, long migrationStartTimeInMs, long timeoutDurationInMs) {
502+
List<Long> orderedHostIdList = getOrderedHostIdList(dc.getId());
503+
if (!migrateNonRoutingHostAgentsInZone(fromMsUuid, fromMsId, dc, migrationStartTimeInMs,
504+
timeoutDurationInMs, avoidMsList, lbAlgorithm, lbAlgorithmChanged, orderedHostIdList)) {
505+
return false;
506+
}
507+
List<Long> clusterIds = clusterDao.listAllClusterIds(dc.getId());
508+
for (Long clusterId : clusterIds) {
509+
if (!migrateRoutingHostAgentsInCluster(clusterId, fromMsUuid, fromMsId, dc, migrationStartTimeInMs,
510+
timeoutDurationInMs, avoidMsList, lbAlgorithm, lbAlgorithmChanged, orderedHostIdList)) {
511+
return false;
512+
}
513+
}
514+
return true;
515+
}
516+
511517
private final class MigrateAgentConnectionTask extends ManagedContextRunnable {
512518
private long fromMsId;
513519
Long hostId;

utils/src/main/java/com/cloud/utils/nio/NioConnection.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,8 @@ public NioConnection(final String name, final int port, final int workers, final
9494
_workers = workers;
9595
_factory = factory;
9696
this.factoryMaxNewConnectionsCount = factory.getMaxConcurrentNewConnectionsCount();
97-
_executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS,
98-
new LinkedBlockingQueue<>(5 * _workers), new NamedThreadFactory(_name + "-Handler"),
99-
new ThreadPoolExecutor.AbortPolicy());
100-
String sslHandshakeHandlerName = _name + "-SSLHandshakeHandler";
101-
if (factoryMaxNewConnectionsCount > 0) {
102-
_sslHandshakeExecutor = new ThreadPoolExecutor(0, this.factoryMaxNewConnectionsCount, 30,
103-
TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory(sslHandshakeHandlerName),
104-
new ThreadPoolExecutor.AbortPolicy());
105-
} else {
106-
_sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(sslHandshakeHandlerName));
107-
}
97+
initWorkersExecutor();
98+
initSSLHandshakeExecutor();
10899
}
109100

110101
public void setCAService(final CAService caService) {
@@ -129,19 +120,10 @@ public void start() throws NioConnectionException {
129120
_isStartup = true;
130121

131122
if (_executor.isShutdown()) {
132-
_executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS,
133-
new LinkedBlockingQueue<>(5 * _workers), new NamedThreadFactory(_name + "-Handler"),
134-
new ThreadPoolExecutor.AbortPolicy());
123+
initWorkersExecutor();
135124
}
136125
if (_sslHandshakeExecutor.isShutdown()) {
137-
String sslHandshakeHandlerName = _name + "-SSLHandshakeHandler";
138-
if (factoryMaxNewConnectionsCount > 0) {
139-
_sslHandshakeExecutor = new ThreadPoolExecutor(0, this.factoryMaxNewConnectionsCount, 30,
140-
TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory(sslHandshakeHandlerName),
141-
new ThreadPoolExecutor.AbortPolicy());
142-
} else {
143-
_sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(sslHandshakeHandlerName));
144-
}
126+
initSSLHandshakeExecutor();
145127
}
146128
_threadExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(this._name + "-NioConnectionHandler"));
147129
_isRunning = true;
@@ -160,6 +142,23 @@ public void stop() {
160142
}
161143
}
162144

145+
private void initWorkersExecutor() {
146+
_executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS,
147+
new LinkedBlockingQueue<>(5 * _workers), new NamedThreadFactory(_name + "-Handler"),
148+
new ThreadPoolExecutor.AbortPolicy());
149+
}
150+
151+
private void initSSLHandshakeExecutor() {
152+
String sslHandshakeHandlerName = _name + "-SSLHandshakeHandler";
153+
if (factoryMaxNewConnectionsCount > 0) {
154+
_sslHandshakeExecutor = new ThreadPoolExecutor(0, this.factoryMaxNewConnectionsCount, 30,
155+
TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory(sslHandshakeHandlerName),
156+
new ThreadPoolExecutor.AbortPolicy());
157+
} else {
158+
_sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(sslHandshakeHandlerName));
159+
}
160+
}
161+
163162
public boolean isRunning() {
164163
return !_futureTask.isDone();
165164
}

0 commit comments

Comments
 (0)