|
27 | 27 | import java.util.HashMap; |
28 | 28 | import java.util.List; |
29 | 29 | import java.util.Map; |
| 30 | +import java.util.Objects; |
30 | 31 | import java.util.Set; |
31 | 32 | import java.util.concurrent.ConcurrentHashMap; |
32 | 33 | import java.util.concurrent.ExecutorService; |
@@ -758,15 +759,15 @@ public void notifyMonitorsOfNewlyAddedHost(long hostId) { |
758 | 759 | } |
759 | 760 | } |
760 | 761 |
|
761 | | - protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmd, final boolean forRebalance) throws ConnectionException { |
| 762 | + protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmds, final boolean forRebalance) throws ConnectionException { |
762 | 763 | final long hostId = attache.getId(); |
763 | 764 | final HostVO host = _hostDao.findById(hostId); |
764 | 765 | for (final Pair<Integer, Listener> monitor : _hostMonitors) { |
765 | 766 | logger.debug("Sending Connect to listener: {}, for rebalance: {}", monitor.second().getClass().getSimpleName(), forRebalance); |
766 | | - for (int i = 0; i < cmd.length; i++) { |
| 767 | + for (StartupCommand cmd : cmds) { |
767 | 768 | try { |
768 | | - logger.debug("process connection to issue: {} for host: {}, forRebalance: {}, connection transferred: {}", ReflectionToStringBuilderUtils.reflectCollection(cmd[i]), hostId, forRebalance, cmd[i].isConnectionTransferred()); |
769 | | - monitor.second().processConnect(host, cmd[i], forRebalance); |
| 769 | + logger.debug("process connection to issue: {} for host: {}, forRebalance: {}", ReflectionToStringBuilderUtils.reflectOnlySelectedFields(cmd, "id", "type", "msHostList", "connectionTransferred"), hostId, forRebalance); |
| 770 | + monitor.second().processConnect(host, cmd, forRebalance); |
770 | 771 | } catch (final ConnectionException ce) { |
771 | 772 | if (ce.isSetupError()) { |
772 | 773 | logger.warn("Monitor {} says there is an error in the connect process for {} due to {}", monitor.second().getClass().getSimpleName(), hostId, ce.getMessage()); |
@@ -1040,39 +1041,50 @@ protected Status getNextStatusOnDisconnection(Host host, final Status.Event even |
1040 | 1041 |
|
1041 | 1042 | protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) { |
1042 | 1043 | final long hostId = attache.getId(); |
1043 | | - |
| 1044 | + final HostVO host = _hostDao.findById(hostId); |
1044 | 1045 | boolean result = false; |
1045 | 1046 | GlobalLock joinLock = getHostJoinLock(hostId); |
1046 | | - if (joinLock.lock(60)) { |
1047 | | - try { |
1048 | | - logger.info("Host {} is disconnecting with event {}", |
1049 | | - attache, event); |
1050 | | - Status nextStatus; |
1051 | | - final HostVO host = _hostDao.findById(hostId); |
1052 | | - if (host == null) { |
1053 | | - logger.warn("Can't find host with {} ({})", hostId, attache); |
1054 | | - nextStatus = Status.Removed; |
1055 | | - } else { |
1056 | | - nextStatus = getNextStatusOnDisconnection(host, event); |
1057 | | - caService.purgeHostCertificate(host); |
1058 | | - } |
1059 | | - logger.debug("Deregistering link for {} with state {}", attache, nextStatus); |
1060 | | - |
1061 | | - removeAgent(attache, nextStatus); |
1062 | | - |
1063 | | - if (host != null && transitState) { |
1064 | | - // update the state for host in DB as per the event |
1065 | | - disconnectAgent(host, event, _nodeId); |
1066 | | - } |
1067 | | - } finally { |
1068 | | - joinLock.unlock(); |
| 1047 | + try { |
| 1048 | + if (!joinLock.lock(60)) { |
| 1049 | + logger.debug("Unable to acquire lock on host {} to process agent disconnection", Objects.toString(host, String.valueOf(hostId))); |
| 1050 | + return result; |
1069 | 1051 | } |
| 1052 | + |
| 1053 | + logger.debug("Acquired lock on host {}, to process agent disconnection", Objects.toString(host, String.valueOf(hostId))); |
| 1054 | + disconnectHostAgent(attache, event, host, transitState, joinLock); |
1070 | 1055 | result = true; |
| 1056 | + } finally { |
| 1057 | + joinLock.releaseRef(); |
1071 | 1058 | } |
1072 | | - joinLock.releaseRef(); |
| 1059 | + |
1073 | 1060 | return result; |
1074 | 1061 | } |
1075 | 1062 |
|
| 1063 | + private void disconnectHostAgent(final AgentAttache attache, final Status.Event event, final HostVO host, final boolean transitState, final GlobalLock joinLock) { |
| 1064 | + try { |
| 1065 | + logger.info("Host {} is disconnecting with event {}", attache, event); |
| 1066 | + final long hostId = attache.getId(); |
| 1067 | + Status nextStatus; |
| 1068 | + if (host == null) { |
| 1069 | + logger.warn("Can't find host with {} ({})", hostId, attache); |
| 1070 | + nextStatus = Status.Removed; |
| 1071 | + } else { |
| 1072 | + nextStatus = getNextStatusOnDisconnection(host, event); |
| 1073 | + caService.purgeHostCertificate(host); |
| 1074 | + } |
| 1075 | + logger.debug("Deregistering link for {} with state {}", attache, nextStatus); |
| 1076 | + |
| 1077 | + removeAgent(attache, nextStatus); |
| 1078 | + |
| 1079 | + if (host != null && transitState) { |
| 1080 | + // update the state for host in DB as per the event |
| 1081 | + disconnectAgent(host, event, _nodeId); |
| 1082 | + } |
| 1083 | + } finally { |
| 1084 | + joinLock.unlock(); |
| 1085 | + } |
| 1086 | + } |
| 1087 | + |
1076 | 1088 | protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) { |
1077 | 1089 | final long hostId = attache.getId(); |
1078 | 1090 | HostVO host = _hostDao.findById(hostId); |
@@ -1341,45 +1353,58 @@ protected AgentAttache createAttacheForConnect(final HostVO host, final Link lin |
1341 | 1353 | return attache; |
1342 | 1354 | } |
1343 | 1355 |
|
1344 | | - private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException { |
1345 | | - final List<String> agentMSHostList = new ArrayList<>(); |
1346 | | - String lbAlgorithm = null; |
1347 | | - if (startup != null && startup.length > 0) { |
1348 | | - final String agentMSHosts = startup[0].getMsHostList(); |
1349 | | - if (StringUtils.isNotEmpty(agentMSHosts)) { |
1350 | | - String[] msHosts = agentMSHosts.split("@"); |
1351 | | - if (msHosts.length > 1) { |
1352 | | - lbAlgorithm = msHosts[1]; |
1353 | | - } |
1354 | | - agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); |
1355 | | - } |
1356 | | - } |
1357 | | - ready.setArch(host.getArch().getType()); |
| 1356 | + private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startupCmds) throws ConnectionException { |
1358 | 1357 | AgentAttache attache; |
1359 | 1358 | GlobalLock joinLock = getHostJoinLock(host.getId()); |
1360 | | - if (joinLock.lock(60)) { |
1361 | | - try { |
| 1359 | + try { |
| 1360 | + if (!joinLock.lock(60)) { |
| 1361 | + throw new ConnectionException(true, String.format("Unable to acquire lock on host %s, to process agent connection", host)); |
| 1362 | + } |
| 1363 | + |
| 1364 | + logger.debug("Acquired lock on host {}, to process agent connection", host); |
| 1365 | + attache = connectHostAgent(host, ready, link, startupCmds, joinLock); |
| 1366 | + } finally { |
| 1367 | + joinLock.releaseRef(); |
| 1368 | + } |
1362 | 1369 |
|
1363 | | - if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { |
1364 | | - final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); |
1365 | | - ready.setMsHostList(newMSList); |
1366 | | - final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs(); |
1367 | | - ready.setAvoidMsHostList(avoidMsList); |
1368 | | - ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName()); |
1369 | | - ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId())); |
1370 | | - logger.debug("Agent's management server host list is not up to date, sending list update: {}", newMSList); |
| 1370 | + return attache; |
| 1371 | + } |
| 1372 | + |
| 1373 | + private AgentAttache connectHostAgent(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startupCmds, GlobalLock joinLock) throws ConnectionException { |
| 1374 | + AgentAttache attache; |
| 1375 | + try { |
| 1376 | + final List<String> agentMSHostList = new ArrayList<>(); |
| 1377 | + String lbAlgorithm = null; |
| 1378 | + if (startupCmds != null && startupCmds.length > 0) { |
| 1379 | + final String agentMSHosts = startupCmds[0].getMsHostList(); |
| 1380 | + if (StringUtils.isNotEmpty(agentMSHosts)) { |
| 1381 | + String[] msHosts = agentMSHosts.split("@"); |
| 1382 | + if (msHosts.length > 1) { |
| 1383 | + lbAlgorithm = msHosts[1]; |
| 1384 | + } |
| 1385 | + agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); |
1371 | 1386 | } |
| 1387 | + } |
1372 | 1388 |
|
1373 | | - attache = createAttacheForConnect(host, link); |
1374 | | - attache = notifyMonitorsOfConnection(attache, startup, false); |
1375 | | - } finally { |
1376 | | - joinLock.unlock(); |
| 1389 | + if (!indirectAgentLB.compareManagementServerListAndLBAlgorithm(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { |
| 1390 | + final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); |
| 1391 | + ready.setMsHostList(newMSList); |
| 1392 | + String newLBAlgorithm = indirectAgentLB.getLBAlgorithmName(); |
| 1393 | + ready.setLbAlgorithm(newLBAlgorithm); |
| 1394 | + logger.debug("Agent's management server host list or lb algorithm is not up to date, sending list and algorithm update: {}, {}", newMSList, newLBAlgorithm); |
1377 | 1395 | } |
1378 | | - } else { |
1379 | | - throw new ConnectionException(true, |
1380 | | - String.format("Unable to acquire lock on host %s", host)); |
| 1396 | + |
| 1397 | + final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs(); |
| 1398 | + ready.setAvoidMsHostList(avoidMsList); |
| 1399 | + ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId())); |
| 1400 | + ready.setArch(host.getArch().getType()); |
| 1401 | + |
| 1402 | + attache = createAttacheForConnect(host, link); |
| 1403 | + attache = notifyMonitorsOfConnection(attache, startupCmds, false); |
| 1404 | + } finally { |
| 1405 | + joinLock.unlock(); |
1381 | 1406 | } |
1382 | | - joinLock.releaseRef(); |
| 1407 | + |
1383 | 1408 | return attache; |
1384 | 1409 | } |
1385 | 1410 |
|
@@ -1666,7 +1691,7 @@ protected void processRequest(final Link link, final Request request) { |
1666 | 1691 | logger.debug("Not processing {} for agent id={}; can't find the host in the DB", PingRoutingCommand.class.getSimpleName(), cmdHostId); |
1667 | 1692 | } |
1668 | 1693 | } |
1669 | | - if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) { |
| 1694 | + if (host != null && host.getStatus() != Status.Up && gatewayAccessible) { |
1670 | 1695 | requestStartupCommand = true; |
1671 | 1696 | } |
1672 | 1697 | final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs(); |
@@ -1821,11 +1846,11 @@ protected boolean isHostOwnerSwitched(final HostVO host) { |
1821 | 1846 | return false; |
1822 | 1847 | } |
1823 | 1848 |
|
1824 | | - private void disconnectInternal(final long hostId, final Status.Event event, final boolean invstigate) { |
| 1849 | + private void disconnectInternal(final long hostId, final Status.Event event, final boolean investigate) { |
1825 | 1850 | final AgentAttache attache = findAttache(hostId); |
1826 | 1851 |
|
1827 | 1852 | if (attache != null) { |
1828 | | - if (!invstigate) { |
| 1853 | + if (!investigate) { |
1829 | 1854 | disconnectWithoutInvestigation(attache, event); |
1830 | 1855 | } else { |
1831 | 1856 | disconnectWithInvestigation(attache, event); |
|
0 commit comments