Skip to content

Commit 95489b8

Browse files
Direct agents rebalance improvements with multiple management server nodes (apache#10674)
Sometimes hypervisor hosts (direct agents) stuck with Disconnect state during agent rebalancing activity across multiple management server nodes. This issue was noticed during frequent restart of the management server nodes in the cluster. When there are multiple management server nodes in a cluster, if one or more nodes are shutdown/start/restart, CloudStack will rebalance the hosts among the remaining nodes or move the nodes to the newly joined management server nodes. During the rebalancing period multiple operations could happen including: - DirectAgentScan at interval of configured direct.agent.scan.interval - AgentRebalanceScan to identify and schedule rebalance agents - TransferAgentScan to transfer the host from original owner to future owner **Current Rebalance behavior** 1. For hosts that have AgentAttache && not forForward but in Disconnect state, CloudStack simply ignore these hosts without trying to ping again or update the status of the host. 2. For hosts that have AgentAttache && forForward, CloudStack removes the agent but still try to loadDirectlyConnectedHost. **Improved Rebalance behavior** During DirectAgentScan: scanDirectAgentToLoad(), identify hosts that for self-managed hosts that are in Disconnect state (disconnected after pingtimeout). 1. For hosts that have AgentAttache and is forForward, CloudStack should remove the agent 2. For hosts that have AgentAttache and is not forForward but in Disconnect state, CloudStack should try to investigate and update the status to Up if host is pingable. 3. For hosts that don't have AgentAttache, CloudStack should try to loadDirectlyConnectedHost.
1 parent 0648d00 commit 95489b8

File tree

2 files changed

+183
-23
lines changed

2 files changed

+183
-23
lines changed

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private void runDirectAgentScanTimerTask() {
191191
scanDirectAgentToLoad();
192192
}
193193

194-
private void scanDirectAgentToLoad() {
194+
protected void scanDirectAgentToLoad() {
195195
logger.trace("Begin scanning directly connected hosts");
196196

197197
// for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
@@ -212,11 +212,21 @@ private void scanDirectAgentToLoad() {
212212
logger.info("{} is detected down, but we have a forward attache running, disconnect this one before launching the host", host);
213213
removeAgent(agentattache, Status.Disconnected);
214214
} else {
215-
continue;
215+
logger.debug("Host {} status is {} but has an AgentAttache which is not forForward, try to load directly", host, host.getStatus());
216+
Status hostStatus = investigate(agentattache);
217+
if (Status.Up == hostStatus) {
218+
/* Got ping response from host, bring it back */
219+
logger.info("After investigation, Agent for host {} is determined to be up and running", host);
220+
agentStatusTransitTo(host, Event.Ping, _nodeId);
221+
} else {
222+
logger.debug("After investigation, AgentAttache is not null but host status is {}, try to load directly {}", hostStatus, host);
223+
loadDirectlyConnectedHost(host, false);
224+
}
216225
}
226+
} else {
227+
logger.debug("AgentAttache is null, loading directly connected {}", host);
228+
loadDirectlyConnectedHost(host, false);
217229
}
218-
logger.debug("Loading directly connected {}", host);
219-
loadDirectlyConnectedHost(host, false);
220230
} catch (final Throwable e) {
221231
logger.warn(" can not load directly connected {} due to ", host, e);
222232
}
@@ -362,20 +372,20 @@ public void reconnect(final long hostId) throws CloudRuntimeException, AgentUnav
362372
return;
363373
}
364374
if (!result) {
365-
throw new CloudRuntimeException("Failed to propagate agent change request event:" + Event.ShutdownRequested + " to host:" + hostId);
375+
throw new CloudRuntimeException(String.format("Failed to propagate agent change request event: %s to host: %s", Event.ShutdownRequested, hostId));
366376
}
367377
}
368378

369379
public void notifyNodesInCluster(final AgentAttache attache) {
370380
logger.debug("Notifying other nodes of to disconnect");
371-
final Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
381+
final Command[] cmds = new Command[]{new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
372382
_clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
373383
}
374384

375385
// notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
376386
public void notifyNodesInClusterToScheduleHostScanTask() {
377387
logger.debug("Notifying other MS nodes to run host scan task");
378-
final Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()};
388+
final Command[] cmds = new Command[]{new ScheduleHostScanTaskCommand()};
379389
_clusterMgr.broadcast(0, _gson.toJson(cmds));
380390
}
381391

@@ -416,7 +426,7 @@ public boolean routeToPeer(final String peer, final byte[] bytes) {
416426
}
417427
try {
418428
logD(bytes, "Routing to peer");
419-
Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine);
429+
Link.write(ch, new ByteBuffer[]{ByteBuffer.wrap(bytes)}, sslEngine);
420430
return true;
421431
} catch (final IOException e) {
422432
try {
@@ -625,7 +635,7 @@ protected void doTask(final Task task) throws TaskExecutionException {
625635
}
626636
final Request req = Request.parse(data);
627637
final Command[] cmds = req.getCommands();
628-
final CancelCommand cancel = (CancelCommand)cmds[0];
638+
final CancelCommand cancel = (CancelCommand) cmds[0];
629639
logD(data, "Cancel request received");
630640
agent.cancel(cancel.getSequence());
631641
final Long current = agent._currentSequence;
@@ -652,7 +662,7 @@ protected void doTask(final Task task) throws TaskExecutionException {
652662
return;
653663
} else {
654664
if (agent instanceof Routable) {
655-
final Routable cluster = (Routable)agent;
665+
final Routable cluster = (Routable) agent;
656666
cluster.routeToAgent(data);
657667
} else {
658668
agent.send(Request.parse(data));
@@ -669,7 +679,7 @@ protected void doTask(final Task task) throws TaskExecutionException {
669679
if (mgmtId != -1 && mgmtId != _nodeId) {
670680
routeToPeer(Long.toString(mgmtId), data);
671681
if (Request.requiresSequentialExecution(data)) {
672-
final AgentAttache attache = (AgentAttache)link.attachment();
682+
final AgentAttache attache = (AgentAttache) link.attachment();
673683
if (attache != null) {
674684
attache.sendNext(Request.getSequence(data));
675685
}
@@ -933,7 +943,7 @@ protected void runInContext() {
933943
if (_agentToTransferIds.size() > 0) {
934944
logger.debug("Found {} agents to transfer", _agentToTransferIds.size());
935945
// for (Long hostId : _agentToTransferIds) {
936-
for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
946+
for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext(); ) {
937947
final Long hostId = iterator.next();
938948
final AgentAttache attache = findAttache(hostId);
939949

@@ -1074,7 +1084,7 @@ protected void finishRebalance(final long hostId, final long futureOwnerId, fina
10741084
return;
10751085
}
10761086

1077-
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
1087+
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) attache;
10781088

10791089
if (success) {
10801090

@@ -1125,10 +1135,10 @@ protected boolean startRebalance(final long hostId) {
11251135
}
11261136

11271137
synchronized (_agents) {
1128-
final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
1138+
final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache) _agents.get(hostId);
11291139
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
11301140
handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
1131-
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(host);
1141+
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) createAttache(host);
11321142
if (forwardAttache == null) {
11331143
logger.warn("Unable to create a forward attache for the host {} as a part of rebalance process", host);
11341144
return false;
@@ -1232,7 +1242,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12321242
}
12331243

12341244
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted
1235-
final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
1245+
final ChangeAgentCommand cmd = (ChangeAgentCommand) cmds[0];
12361246

12371247
logger.debug("Intercepting command for agent change: agent {} event: {}", cmd.getAgentId(), cmd.getEvent());
12381248
boolean result = false;
@@ -1249,7 +1259,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12491259
answers[0] = new ChangeAgentAnswer(cmd, result);
12501260
return _gson.toJson(answers);
12511261
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
1252-
final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
1262+
final TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
12531263

12541264
logger.debug("Intercepting command for agent rebalancing: agent {} event: {}", cmd.getAgentId(), cmd.getEvent());
12551265
boolean result = false;
@@ -1268,7 +1278,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12681278
answers[0] = new Answer(cmd, result, null);
12691279
return _gson.toJson(answers);
12701280
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
1271-
final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
1281+
final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
12721282

12731283
logger.debug("Intercepting command to propagate event {} for host {} ({})", () -> cmd.getEvent().name(), cmd::getHostId, () -> _hostDao.findById(cmd.getHostId()));
12741284

@@ -1285,10 +1295,10 @@ public String dispatch(final ClusterServicePdu pdu) {
12851295
answers[0] = new Answer(cmd, result, null);
12861296
return _gson.toJson(answers);
12871297
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
1288-
final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
1298+
final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];
12891299
return handleScheduleHostScanTaskCommand(cmd);
12901300
} else if (cmds.length == 1 && cmds[0] instanceof BaseShutdownManagementServerHostCommand) {
1291-
final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand)cmds[0];
1301+
final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand) cmds[0];
12921302
return handleShutdownManagementServerHostCommand(cmd);
12931303
}
12941304

@@ -1323,7 +1333,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13231333
try {
13241334
shutdownManager.prepareForShutdown();
13251335
return "Successfully prepared for shutdown";
1326-
} catch(CloudRuntimeException e) {
1336+
} catch (CloudRuntimeException e) {
13271337
return e.getMessage();
13281338
}
13291339
}
@@ -1332,7 +1342,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13321342
try {
13331343
shutdownManager.triggerShutdown();
13341344
return "Successfully triggered shutdown";
1335-
} catch(CloudRuntimeException e) {
1345+
} catch (CloudRuntimeException e) {
13361346
return e.getMessage();
13371347
}
13381348
}
@@ -1341,7 +1351,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13411351
try {
13421352
shutdownManager.cancelShutdown();
13431353
return "Successfully prepared for shutdown";
1344-
} catch(CloudRuntimeException e) {
1354+
} catch (CloudRuntimeException e) {
13451355
return e.getMessage();
13461356
}
13471357
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package com.cloud.agent.manager;
19+
20+
import com.cloud.configuration.ManagementServiceConfiguration;
21+
import com.cloud.ha.HighAvailabilityManagerImpl;
22+
import com.cloud.host.HostVO;
23+
import com.cloud.host.Status;
24+
import com.cloud.host.dao.HostDao;
25+
import com.cloud.resource.ResourceManagerImpl;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.junit.runner.RunWith;
29+
import org.mockito.Mock;
30+
import org.mockito.Mockito;
31+
import org.mockito.junit.MockitoJUnitRunner;
32+
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
import static org.mockito.ArgumentMatchers.any;
37+
import static org.mockito.ArgumentMatchers.anyBoolean;
38+
import static org.mockito.ArgumentMatchers.anyLong;
39+
import static org.mockito.Mockito.doReturn;
40+
import static org.mockito.Mockito.mock;
41+
import static org.mockito.Mockito.never;
42+
import static org.mockito.Mockito.verify;
43+
import static org.mockito.Mockito.when;
44+
45+
@RunWith(MockitoJUnitRunner.class)
46+
public class ClusteredAgentManagerImplTest {
47+
48+
private HostDao _hostDao;
49+
@Mock
50+
ManagementServiceConfiguration _mgmtServiceConf;
51+
52+
@Before
53+
public void setUp() throws Exception {
54+
_hostDao = mock(HostDao.class);
55+
}
56+
57+
@Test
58+
public void scanDirectAgentToLoadNoHostsTest() {
59+
ClusteredAgentManagerImpl clusteredAgentManagerImpl = mock(ClusteredAgentManagerImpl.class);
60+
clusteredAgentManagerImpl._hostDao = _hostDao;
61+
clusteredAgentManagerImpl.scanDirectAgentToLoad();
62+
verify(clusteredAgentManagerImpl, never()).findAttache(anyLong());
63+
verify(clusteredAgentManagerImpl, never()).loadDirectlyConnectedHost(any(), anyBoolean());
64+
}
65+
66+
@Test
67+
public void scanDirectAgentToLoadHostWithoutAttacheTest() {
68+
// Arrange
69+
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
70+
HostVO hostVO = mock(HostVO.class);
71+
clusteredAgentManagerImpl._hostDao = _hostDao;
72+
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
73+
clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class);
74+
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
75+
when(hostVO.getId()).thenReturn(1L);
76+
List hosts = new ArrayList<>();
77+
hosts.add(hostVO);
78+
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
79+
AgentAttache agentAttache = mock(AgentAttache.class);
80+
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
81+
clusteredAgentManagerImpl.scanDirectAgentToLoad();
82+
verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
83+
}
84+
85+
@Test
86+
public void scanDirectAgentToLoadHostWithForwardAttacheTest() {
87+
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
88+
HostVO hostVO = mock(HostVO.class);
89+
clusteredAgentManagerImpl._hostDao = _hostDao;
90+
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
91+
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
92+
when(hostVO.getId()).thenReturn(1L);
93+
List hosts = new ArrayList<>();
94+
hosts.add(hostVO);
95+
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
96+
AgentAttache agentAttache = mock(AgentAttache.class);
97+
when(agentAttache.forForward()).thenReturn(Boolean.TRUE);
98+
when(clusteredAgentManagerImpl.findAttache(1L)).thenReturn(agentAttache);
99+
100+
clusteredAgentManagerImpl.scanDirectAgentToLoad();
101+
verify(clusteredAgentManagerImpl).removeAgent(agentAttache, Status.Disconnected);
102+
}
103+
104+
@Test
105+
public void scanDirectAgentToLoadHostWithNonForwardAttacheTest() {
106+
// Arrange
107+
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(new ClusteredAgentManagerImpl());
108+
HostVO hostVO = mock(HostVO.class);
109+
clusteredAgentManagerImpl._hostDao = _hostDao;
110+
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
111+
clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class);
112+
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
113+
when(hostVO.getId()).thenReturn(0L);
114+
List hosts = new ArrayList<>();
115+
hosts.add(hostVO);
116+
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
117+
118+
AgentAttache agentAttache = mock(AgentAttache.class);
119+
when(agentAttache.forForward()).thenReturn(Boolean.FALSE);
120+
when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache);
121+
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).agentStatusTransitTo(hostVO, Status.Event.Ping, clusteredAgentManagerImpl._nodeId);
122+
doReturn(Status.Up).when(clusteredAgentManagerImpl).investigate(agentAttache);
123+
124+
clusteredAgentManagerImpl.scanDirectAgentToLoad();
125+
verify(clusteredAgentManagerImpl).investigate(agentAttache);
126+
verify(clusteredAgentManagerImpl).agentStatusTransitTo(hostVO, Status.Event.Ping, clusteredAgentManagerImpl._nodeId);
127+
}
128+
129+
@Test
130+
public void scanDirectAgentToLoadHostWithNonForwardAttacheAndDisconnectedTest() {
131+
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
132+
HostVO hostVO = mock(HostVO.class);
133+
clusteredAgentManagerImpl._hostDao = _hostDao;
134+
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
135+
clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class);
136+
clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class);
137+
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
138+
when(hostVO.getId()).thenReturn(0L);
139+
List hosts = new ArrayList<>();
140+
hosts.add(hostVO);
141+
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
142+
AgentAttache agentAttache = mock(AgentAttache.class);
143+
when(agentAttache.forForward()).thenReturn(Boolean.FALSE);
144+
when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache);
145+
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
146+
clusteredAgentManagerImpl.scanDirectAgentToLoad();
147+
verify(clusteredAgentManagerImpl).investigate(agentAttache);
148+
verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
149+
}
150+
}

0 commit comments

Comments
 (0)