Skip to content

Commit 3b11663

Browse files
authored
Fix failure on agent reconnection (#8089)
1 parent 6ae3b73 commit 3b11663

File tree

4 files changed

+215
-56
lines changed

4 files changed

+215
-56
lines changed

agent/src/main/java/com/cloud/agent/Agent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import javax.naming.ConfigurationException;
4242

43+
import com.cloud.agent.api.PingAnswer;
4344
import com.cloud.utils.NumbersUtil;
4445
import org.apache.cloudstack.agent.lb.SetupMSListAnswer;
4546
import org.apache.cloudstack.agent.lb.SetupMSListCommand;
@@ -822,6 +823,9 @@ public void processResponse(final Response response, final Link link) {
822823
listener.processControlResponse(response, (AgentControlAnswer)answer);
823824
}
824825
}
826+
} else if (answer instanceof PingAnswer && (((PingAnswer) answer).isSendStartup()) && _reconnectAllowed) {
827+
s_logger.info("Management server requested startup command to reinitialize the agent");
828+
sendStartup(link);
825829
} else {
826830
setLastPingResponseTime();
827831
}

core/src/main/java/com/cloud/agent/api/PingAnswer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,26 @@
2222
public class PingAnswer extends Answer {
2323
private PingCommand _command = null;
2424

25+
private boolean sendStartup = false;
26+
2527
protected PingAnswer() {
2628
}
2729

28-
public PingAnswer(PingCommand cmd) {
30+
public PingAnswer(PingCommand cmd, boolean sendStartup) {
2931
super(cmd);
3032
_command = cmd;
33+
this.sendStartup = sendStartup;
3134
}
3235

3336
public PingCommand getCommand() {
3437
return _command;
3538
}
39+
40+
public boolean isSendStartup() {
41+
return sendStartup;
42+
}
43+
44+
public void setSendStartup(boolean sendStartup) {
45+
this.sendStartup = sendStartup;
46+
}
3647
}

engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java

Lines changed: 97 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import com.cloud.configuration.Config;
4242
import com.cloud.utils.NumbersUtil;
43+
import com.cloud.utils.db.GlobalLock;
4344
import org.apache.cloudstack.agent.lb.IndirectAgentLB;
4445
import org.apache.cloudstack.ca.CAManager;
4546
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@@ -798,49 +799,65 @@ public boolean stop() {
798799
return true;
799800
}
800801

802+
protected Status getNextStatusOnDisconnection(Host host, final Status.Event event) {
803+
final Status currentStatus = host.getStatus();
804+
Status nextStatus;
805+
if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) {
806+
if (s_logger.isDebugEnabled()) {
807+
s_logger.debug(String.format("Host %s is already %s", host.getUuid(), currentStatus));
808+
}
809+
nextStatus = currentStatus;
810+
} else {
811+
try {
812+
nextStatus = currentStatus.getNextStatus(event);
813+
} catch (final NoTransitionException e) {
814+
final String err = String.format("Cannot find next status for %s as current status is %s for agent %s", event, currentStatus, host.getUuid());
815+
s_logger.debug(err);
816+
throw new CloudRuntimeException(err);
817+
}
818+
819+
if (s_logger.isDebugEnabled()) {
820+
s_logger.debug(String.format("The next status of agent %s is %s, current status is %s", host.getUuid(), nextStatus, currentStatus));
821+
}
822+
}
823+
return nextStatus;
824+
}
825+
801826
protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) {
802827
final long hostId = attache.getId();
803828

804-
s_logger.info("Host " + hostId + " is disconnecting with event " + event);
805-
Status nextStatus = null;
806-
final HostVO host = _hostDao.findById(hostId);
807-
if (host == null) {
808-
s_logger.warn("Can't find host with " + hostId);
809-
nextStatus = Status.Removed;
810-
} else {
811-
final Status currentStatus = host.getStatus();
812-
if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) {
813-
if (s_logger.isDebugEnabled()) {
814-
s_logger.debug("Host " + hostId + " is already " + currentStatus);
815-
}
816-
nextStatus = currentStatus;
817-
} else {
818-
try {
819-
nextStatus = currentStatus.getNextStatus(event);
820-
} catch (final NoTransitionException e) {
821-
final String err = "Cannot find next status for " + event + " as current status is " + currentStatus + " for agent " + hostId;
822-
s_logger.debug(err);
823-
throw new CloudRuntimeException(err);
829+
boolean result = false;
830+
GlobalLock joinLock = getHostJoinLock(hostId);
831+
if (joinLock.lock(60)) {
832+
try {
833+
s_logger.info(String.format("Host %d is disconnecting with event %s", hostId, event));
834+
Status nextStatus = null;
835+
final HostVO host = _hostDao.findById(hostId);
836+
if (host == null) {
837+
s_logger.warn(String.format("Can't find host with %d", hostId));
838+
nextStatus = Status.Removed;
839+
} else {
840+
nextStatus = getNextStatusOnDisconnection(host, event);
841+
caService.purgeHostCertificate(host);
824842
}
825843

826844
if (s_logger.isDebugEnabled()) {
827-
s_logger.debug("The next status of agent " + hostId + "is " + nextStatus + ", current status is " + currentStatus);
845+
s_logger.debug(String.format("Deregistering link for %d with state %s", hostId, nextStatus));
828846
}
829-
}
830-
caService.purgeHostCertificate(host);
831-
}
832847

833-
if (s_logger.isDebugEnabled()) {
834-
s_logger.debug("Deregistering link for " + hostId + " with state " + nextStatus);
835-
}
848+
removeAgent(attache, nextStatus);
836849

837-
removeAgent(attache, nextStatus);
838-
// update the DB
839-
if (host != null && transitState) {
840-
disconnectAgent(host, event, _nodeId);
850+
if (host != null && transitState) {
851+
// update the state for host in DB as per the event
852+
disconnectAgent(host, event, _nodeId);
853+
}
854+
} finally {
855+
joinLock.unlock();
856+
}
857+
result = true;
841858
}
842-
843-
return true;
859+
joinLock.releaseRef();
860+
return result;
844861
}
845862

846863
protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) {
@@ -1101,26 +1118,23 @@ protected AgentAttache createAttacheForConnect(final HostVO host, final Link lin
11011118
return attache;
11021119
}
11031120

1104-
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) {
1105-
AgentAttache attache = null;
1106-
ReadyCommand ready = null;
1107-
try {
1108-
final List<String> agentMSHostList = new ArrayList<>();
1109-
String lbAlgorithm = null;
1110-
if (startup != null && startup.length > 0) {
1111-
final String agentMSHosts = startup[0].getMsHostList();
1112-
if (StringUtils.isNotEmpty(agentMSHosts)) {
1113-
String[] msHosts = agentMSHosts.split("@");
1114-
if (msHosts.length > 1) {
1115-
lbAlgorithm = msHosts[1];
1116-
}
1117-
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
1121+
private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException {
1122+
final List<String> agentMSHostList = new ArrayList<>();
1123+
String lbAlgorithm = null;
1124+
if (startup != null && startup.length > 0) {
1125+
final String agentMSHosts = startup[0].getMsHostList();
1126+
if (StringUtils.isNotEmpty(agentMSHosts)) {
1127+
String[] msHosts = agentMSHosts.split("@");
1128+
if (msHosts.length > 1) {
1129+
lbAlgorithm = msHosts[1];
11181130
}
1131+
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
11191132
}
1120-
1121-
final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
1122-
if (host != null) {
1123-
ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes);
1133+
}
1134+
AgentAttache attache = null;
1135+
GlobalLock joinLock = getHostJoinLock(host.getId());
1136+
if (joinLock.lock(60)) {
1137+
try {
11241138

11251139
if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
11261140
final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null);
@@ -1132,6 +1146,24 @@ private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[
11321146

11331147
attache = createAttacheForConnect(host, link);
11341148
attache = notifyMonitorsOfConnection(attache, startup, false);
1149+
} finally {
1150+
joinLock.unlock();
1151+
}
1152+
} else {
1153+
throw new ConnectionException(true, "Unable to acquire lock on host " + host.getUuid());
1154+
}
1155+
joinLock.releaseRef();
1156+
return attache;
1157+
}
1158+
1159+
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) {
1160+
AgentAttache attache = null;
1161+
ReadyCommand ready = null;
1162+
try {
1163+
final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
1164+
if (host != null) {
1165+
ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes);
1166+
attache = sendReadyAndGetAttache(host, ready, link, startup);
11351167
}
11361168
} catch (final Exception e) {
11371169
s_logger.debug("Failed to handle host connection: ", e);
@@ -1265,6 +1297,8 @@ protected void processRequest(final Link link, final Request request) {
12651297
connectAgent(link, cmds, request);
12661298
}
12671299
return;
1300+
} else if (cmd instanceof StartupCommand) {
1301+
connectAgent(link, cmds, request);
12681302
}
12691303

12701304
final long hostId = attache.getId();
@@ -1318,13 +1352,14 @@ protected void processRequest(final Link link, final Request request) {
13181352
handleCommands(attache, request.getSequence(), new Command[] {cmd});
13191353
if (cmd instanceof PingCommand) {
13201354
final long cmdHostId = ((PingCommand)cmd).getHostId();
1355+
boolean requestStartupCommand = false;
13211356

1357+
final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId));
1358+
boolean gatewayAccessible = true;
13221359
// if the router is sending a ping, verify the
13231360
// gateway was pingable
13241361
if (cmd instanceof PingRoutingCommand) {
1325-
final boolean gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible();
1326-
final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId));
1327-
1362+
gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible();
13281363
if (host != null) {
13291364
if (!gatewayAccessible) {
13301365
// alert that host lost connection to
@@ -1342,7 +1377,10 @@ protected void processRequest(final Link link, final Request request) {
13421377
s_logger.debug("Not processing " + PingRoutingCommand.class.getSimpleName() + " for agent id=" + cmdHostId + "; can't find the host in the DB");
13431378
}
13441379
}
1345-
answer = new PingAnswer((PingCommand)cmd);
1380+
if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) {
1381+
requestStartupCommand = true;
1382+
}
1383+
answer = new PingAnswer((PingCommand)cmd, requestStartupCommand);
13461384
} else if (cmd instanceof ReadyAnswer) {
13471385
final HostVO host = _hostDao.findById(attache.getId());
13481386
if (host == null) {
@@ -1864,4 +1902,8 @@ public void propagateChangeToAgents(Map<String, String> params) {
18641902
sendCommandToAgents(hostsPerZone, params);
18651903
}
18661904
}
1905+
1906+
private GlobalLock getHostJoinLock(Long hostId) {
1907+
return GlobalLock.getInternLock(String.format("%s-%s", "Host-Join", hostId));
1908+
}
18671909
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
""" Check state transition of host from Alert to Up on Ping
18+
"""
19+
20+
# Import Local Modules
21+
from marvin.cloudstackTestCase import *
22+
from marvin.lib.common import *
23+
from marvin.lib.utils import *
24+
from nose.plugins.attrib import attr
25+
26+
_multiprocess_shared_ = False
27+
28+
29+
class TestHostPing(cloudstackTestCase):
30+
31+
def setUp(self, handler=logging.StreamHandler()):
32+
self.logger = logging.getLogger('TestHM')
33+
self.stream_handler = handler
34+
self.logger.setLevel(logging.DEBUG)
35+
self.logger.addHandler(self.stream_handler)
36+
self.apiclient = self.testClient.getApiClient()
37+
self.hypervisor = self.testClient.getHypervisorInfo()
38+
self.mgtSvrDetails = self.config.__dict__["mgtSvr"][0].__dict__
39+
self.dbConnection = self.testClient.getDbConnection()
40+
self.services = self.testClient.getParsedTestDataConfig()
41+
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
42+
self.pod = get_pod(self.apiclient, self.zone.id)
43+
self.cleanup = []
44+
45+
def tearDown(self):
46+
super(TestHostPing, self).tearDown()
47+
48+
def checkHostStateInCloudstack(self, state, host_id):
49+
try:
50+
listHost = Host.list(
51+
self.apiclient,
52+
type='Routing',
53+
zoneid=self.zone.id,
54+
podid=self.pod.id,
55+
id=host_id
56+
)
57+
self.assertEqual(
58+
isinstance(listHost, list),
59+
True,
60+
"Check if listHost returns a valid response"
61+
)
62+
63+
self.assertEqual(
64+
len(listHost),
65+
1,
66+
"Check if listHost returns a host"
67+
)
68+
self.logger.debug(" Host state is %s " % listHost[0].state)
69+
if listHost[0].state == state:
70+
return True, 1
71+
else:
72+
return False, 1
73+
except Exception as e:
74+
self.logger.debug("Got exception %s" % e)
75+
return False, 1
76+
77+
@attr(
78+
tags=[
79+
"advanced",
80+
"advancedns",
81+
"smoke",
82+
"basic"],
83+
required_hardware="true")
84+
def test_01_host_ping_on_alert(self):
85+
listHost = Host.list(
86+
self.apiclient,
87+
type='Routing',
88+
zoneid=self.zone.id,
89+
podid=self.pod.id,
90+
)
91+
for host in listHost:
92+
self.logger.debug('Hypervisor = {}'.format(host.id))
93+
94+
hostToTest = listHost[0]
95+
sql_query = "UPDATE host SET status = 'Alert' WHERE uuid = '" + hostToTest.id + "'"
96+
self.dbConnection.execute(sql_query)
97+
98+
hostUpInCloudstack = wait_until(30, 8, self.checkHostStateInCloudstack, "Up", hostToTest.id)
99+
100+
if not (hostUpInCloudstack):
101+
raise self.fail("Host is not up %s, in cloudstack so failing test " % (hostToTest.ipaddress))
102+
return

0 commit comments

Comments
 (0)