diff --git a/agent/src/main/java/com/cloud/agent/Agent.java b/agent/src/main/java/com/cloud/agent/Agent.java index 0a76bfbb4f8e..ad480fef4e51 100644 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@ -342,7 +342,7 @@ public void start() { logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...", e); } } - shell.updateConnectedHost(); + shell.updateConnectedHost(((NioClient)connection).getHost()); scavengeOldAgentObjects(); } @@ -617,15 +617,11 @@ public Task create(final Task.Type type, final Link link, final byte[] data) { } protected void reconnect(final Link link) { - reconnect(link, null, null, false); + reconnect(link, null, false); } - protected void reconnect(final Link link, String preferredHost, List avoidHostList, boolean forTransfer) { + protected void reconnect(final Link link, String preferredMSHost, boolean forTransfer) { if (!(forTransfer || reconnectAllowed)) { - return; - } - - if (!reconnectAllowed) { logger.debug("Reconnect requested but it is not allowed {}", () -> getLinkLog(link)); return; } @@ -637,19 +633,26 @@ protected void reconnect(final Link link, String preferredHost, List avo serverResource.disconnected(); logger.info("Lost connection to host: {}. Attempting reconnection while we still have {} commands in progress.", shell.getConnectedHost(), commandsInProgress.get()); stopAndCleanupConnection(true); + String host = preferredMSHost; + if (org.apache.commons.lang3.StringUtils.isBlank(host)) { + host = shell.getNextHost(); + } + List avoidMSHostList = shell.getAvoidHosts(); do { - final String host = shell.getNextHost(); - connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this); - logger.info("Reconnecting to host: {}", host); - try { - connection.start(); - } catch (final NioConnectionException e) { - logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e); - stopAndCleanupConnection(false); + if (CollectionUtils.isEmpty(avoidMSHostList) || !avoidMSHostList.contains(host)) { + connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this); + logger.info("Reconnecting to host: {}", host); + try { + connection.start(); + } catch (final NioConnectionException e) { + logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e); + stopAndCleanupConnection(false); + } } shell.getBackoffAlgorithm().waitBeforeRetry(); + host = shell.getNextHost(); } while (!connection.isStartup()); - shell.updateConnectedHost(); + shell.updateConnectedHost(((NioClient)connection).getHost()); logger.info("Connected to the host: {}", shell.getConnectedHost()); } @@ -922,7 +925,7 @@ private Answer setupAgentCertificate(final SetupCertificateCommand cmd) { return new SetupCertificateAnswer(true); } - private void processManagementServerList(final List msList, final String lbAlgorithm, final Long lbCheckInterval) { + private void processManagementServerList(final List msList, final List avoidMsList, final String lbAlgorithm, final Long lbCheckInterval) { if (CollectionUtils.isNotEmpty(msList) && StringUtils.isNotEmpty(lbAlgorithm)) { try { final String newMSHosts = String.format("%s%s%s", com.cloud.utils.StringUtils.toCSVList(msList), IAgentShell.hostLbAlgorithmSeparator, lbAlgorithm); @@ -934,6 +937,7 @@ private void processManagementServerList(final List msList, final String throw new CloudRuntimeException("Could not persist received management servers list", e); } } + shell.setAvoidHosts(avoidMsList); if ("shuffle".equals(lbAlgorithm)) { scheduleHostLBCheckerTask(0); } else { @@ -942,16 +946,18 @@ private void processManagementServerList(final List msList, final String } private Answer setupManagementServerList(final SetupMSListCommand cmd) { - processManagementServerList(cmd.getMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval()); + processManagementServerList(cmd.getMsList(), cmd.getAvoidMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval()); return new SetupMSListAnswer(true); } private Answer migrateAgentToOtherMS(final MigrateAgentConnectionCommand cmd) { try { if (CollectionUtils.isNotEmpty(cmd.getMsList())) { - processManagementServerList(cmd.getMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval()); + processManagementServerList(cmd.getMsList(), cmd.getAvoidMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval()); } - migrateAgentConnection(cmd.getAvoidMsList()); + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("MigrateAgentConnection-Job")).schedule(() -> { + migrateAgentConnection(cmd.getAvoidMsList()); + }, 3, TimeUnit.SECONDS); } catch (Exception e) { String errMsg = "Migrate agent connection failed, due to " + e.getMessage(); logger.debug(errMsg, e); @@ -972,25 +978,26 @@ private void migrateAgentConnection(List avoidMsList) { throw new CloudRuntimeException("No other Management Server hosts to migrate"); } - String preferredHost = null; + String preferredMSHost = null; for (String msHost : msHostsList) { try (final Socket socket = new Socket()) { socket.connect(new InetSocketAddress(msHost, shell.getPort()), 5000); - preferredHost = msHost; + preferredMSHost = msHost; break; } catch (final IOException e) { throw new CloudRuntimeException("Management server host: " + msHost + " is not reachable, to migrate connection"); } } - if (preferredHost == null) { + if (preferredMSHost == null) { throw new CloudRuntimeException("Management server host(s) are not reachable, to migrate connection"); } - logger.debug("Management server host " + preferredHost + " is found to be reachable, trying to reconnect"); + logger.debug("Management server host " + preferredMSHost + " is found to be reachable, trying to reconnect"); shell.resetHostCounter(); + shell.setAvoidHosts(avoidMsList); shell.setConnectionTransfer(true); - reconnect(link, preferredHost, avoidMsList, true); + reconnect(link, preferredMSHost, true); } public void processResponse(final Response response, final Link link) { @@ -1003,14 +1010,21 @@ public void processResponse(final Response response, final Link link) { for (final IAgentControlListener listener : controlListeners) { listener.processControlResponse(response, (AgentControlAnswer)answer); } - } else if (answer instanceof PingAnswer && (((PingAnswer) answer).isSendStartup()) && reconnectAllowed) { - logger.info("Management server requested startup command to reinitialize the agent"); - sendStartup(link); + } else if (answer instanceof PingAnswer) { + processPingAnswer((PingAnswer) answer); } else { updateLastPingResponseTime(); } } + private void processPingAnswer(final PingAnswer answer) { + if ((answer.isSendStartup()) && reconnectAllowed) { + logger.info("Management server requested startup command to reinitialize the agent"); + sendStartup(link); + } + shell.setAvoidHosts(answer.getAvoidMsList()); + } + public void processReadyCommand(final Command cmd) { final ReadyCommand ready = (ReadyCommand)cmd; // Set human readable sizes; @@ -1027,7 +1041,7 @@ public void processReadyCommand(final Command cmd) { } verifyAgentArch(ready.getArch()); - processManagementServerList(ready.getMsHostList(), ready.getLbAlgorithm(), ready.getLbCheckInterval()); + processManagementServerList(ready.getMsHostList(), ready.getAvoidMsHostList(), ready.getLbAlgorithm(), ready.getLbCheckInterval()); logger.info("Ready command is processed for agent [id: {}, uuid: {}, name: {}]", getId(), getUuid(), getName()); } @@ -1374,26 +1388,26 @@ protected void runInContext() { if (msList == null || msList.length < 1) { return; } - final String preferredHost = msList[0]; + final String preferredMSHost = msList[0]; final String connectedHost = shell.getConnectedHost(); logger.debug("Running preferred host checker task, connected host={}, preferred host={}", - connectedHost, preferredHost); - if (preferredHost == null || preferredHost.equals(connectedHost) || link == null) { + connectedHost, preferredMSHost); + if (preferredMSHost == null || preferredMSHost.equals(connectedHost) || link == null) { return; } boolean isHostUp = false; try (final Socket socket = new Socket()) { - socket.connect(new InetSocketAddress(preferredHost, shell.getPort()), 5000); + socket.connect(new InetSocketAddress(preferredMSHost, shell.getPort()), 5000); isHostUp = true; } catch (final IOException e) { - logger.debug("Host: {} is not reachable", preferredHost); + logger.debug("Host: {} is not reachable", preferredMSHost); } if (isHostUp && link != null && commandsInProgress.get() == 0) { if (logger.isDebugEnabled()) { - logger.debug("Preferred host {} is found to be reachable, trying to reconnect", preferredHost); + logger.debug("Preferred host {} is found to be reachable, trying to reconnect", preferredMSHost); } shell.resetHostCounter(); - reconnect(link); + reconnect(link, preferredMSHost, false); } } catch (Throwable t) { logger.error("Error caught while attempting to connect to preferred host", t); diff --git a/agent/src/main/java/com/cloud/agent/AgentShell.java b/agent/src/main/java/com/cloud/agent/AgentShell.java index aea7fd3a8de8..4862e7e001e3 100644 --- a/agent/src/main/java/com/cloud/agent/AgentShell.java +++ b/agent/src/main/java/com/cloud/agent/AgentShell.java @@ -66,6 +66,7 @@ public class AgentShell implements IAgentShell, Daemon { private String _zone; private String _pod; private String _host; + private List _avoidHosts; private String _privateIp; private int _port; private int _proxyPort; @@ -76,7 +77,6 @@ public class AgentShell implements IAgentShell, Daemon { private volatile boolean _exit = false; private int _pingRetries; private final List _agents = new ArrayList(); - private String hostToConnect; private String connectedHost; private Long preferredHostCheckInterval; private boolean connectionTransfer = false; @@ -121,7 +121,7 @@ public String getNextHost() { if (_hostCounter >= hosts.length) { _hostCounter = 0; } - hostToConnect = hosts[_hostCounter % hosts.length]; + String hostToConnect = hosts[_hostCounter % hosts.length]; _hostCounter++; return hostToConnect; } @@ -143,11 +143,10 @@ public long getLbCheckerInterval(final Long receivedLbInterval) { } @Override - public void updateConnectedHost() { - connectedHost = hostToConnect; + public void updateConnectedHost(String connectedHost) { + this.connectedHost = connectedHost; } - @Override public void resetHostCounter() { _hostCounter = 0; @@ -166,6 +165,16 @@ public void setHosts(final String host) { } } + @Override + public void setAvoidHosts(List avoidHosts) { + _avoidHosts = avoidHosts; + } + + @Override + public List getAvoidHosts() { + return _avoidHosts; + } + @Override public String getPrivateIp() { return _privateIp; diff --git a/agent/src/main/java/com/cloud/agent/IAgentShell.java b/agent/src/main/java/com/cloud/agent/IAgentShell.java index c0ecd90ae69d..9eefa6d2eeee 100644 --- a/agent/src/main/java/com/cloud/agent/IAgentShell.java +++ b/agent/src/main/java/com/cloud/agent/IAgentShell.java @@ -16,6 +16,7 @@ // under the License. package com.cloud.agent; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -63,9 +64,13 @@ public interface IAgentShell { String[] getHosts(); + void setAvoidHosts(List hosts); + + List getAvoidHosts(); + long getLbCheckerInterval(Long receivedLbInterval); - void updateConnectedHost(); + void updateConnectedHost(String connectedHost); String getConnectedHost(); diff --git a/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java b/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java index 61cd27fff772..feb1845d84b2 100644 --- a/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java +++ b/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java @@ -816,7 +816,7 @@ public Property getWorkers() { * Data type: Integer.
* Default value: null */ - public static final Property SSL_HANDSHAKE_TIMEOUT = new Property<>("ssl.handshake.timeout", null, Integer.class); + public static final Property SSL_HANDSHAKE_TIMEOUT = new Property<>("ssl.handshake.timeout", 30, Integer.class); public static class Property { private String name; diff --git a/agent/src/test/java/com/cloud/agent/AgentShellTest.java b/agent/src/test/java/com/cloud/agent/AgentShellTest.java index 6d9758cc3dc8..d8def24a603a 100644 --- a/agent/src/test/java/com/cloud/agent/AgentShellTest.java +++ b/agent/src/test/java/com/cloud/agent/AgentShellTest.java @@ -358,7 +358,7 @@ public void updateAndGetConnectedHost() { AgentShell shell = new AgentShell(); shell.setHosts("test"); shell.getNextHost(); - shell.updateConnectedHost(); + shell.updateConnectedHost("test"); Assert.assertEquals(expected, shell.getConnectedHost()); } diff --git a/api/src/main/java/com/cloud/event/EventTypes.java b/api/src/main/java/com/cloud/event/EventTypes.java index 862a6e21fa82..815bd2363d5a 100644 --- a/api/src/main/java/com/cloud/event/EventTypes.java +++ b/api/src/main/java/com/cloud/event/EventTypes.java @@ -739,6 +739,13 @@ public class EventTypes { //Purge resources public static final String EVENT_PURGE_EXPUNGED_RESOURCES = "PURGE.EXPUNGED.RESOURCES"; + // Management Server + public static final String EVENT_MS_MAINTENANCE_PREPARE = "MS.MAINTENANCE.PREPARE"; + public static final String EVENT_MS_MAINTENANCE_CANCEL = "MS.MAINTENANCE.CANCEL"; + public static final String EVENT_MS_SHUTDOWN_PREPARE = "MS.SHUTDOWN.PREPARE"; + public static final String EVENT_MS_SHUTDOWN_CANCEL = "MS.SHUTDOWN.CANCEL"; + public static final String EVENT_MS_SHUTDOWN = "MS.SHUTDOWN"; + // OBJECT STORE public static final String EVENT_OBJECT_STORE_CREATE = "OBJECT.STORE.CREATE"; public static final String EVENT_OBJECT_STORE_DELETE = "OBJECT.STORE.DELETE"; @@ -1233,6 +1240,12 @@ public class EventTypes { entityEventDetails.put(EVENT_UPDATE_IMAGE_STORE_ACCESS_STATE, ImageStore.class); entityEventDetails.put(EVENT_LIVE_PATCH_SYSTEMVM, "SystemVMs"); + entityEventDetails.put(EVENT_MS_MAINTENANCE_PREPARE, "ManagementServer"); + entityEventDetails.put(EVENT_MS_MAINTENANCE_CANCEL, "ManagementServer"); + entityEventDetails.put(EVENT_MS_SHUTDOWN_PREPARE, "ManagementServer"); + entityEventDetails.put(EVENT_MS_SHUTDOWN_CANCEL, "ManagementServer"); + entityEventDetails.put(EVENT_MS_SHUTDOWN, "ManagementServer"); + //Object Store entityEventDetails.put(EVENT_OBJECT_STORE_CREATE, ObjectStore.class); entityEventDetails.put(EVENT_OBJECT_STORE_UPDATE, ObjectStore.class); diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java index 3e8b329cac78..627e7395e1e1 100644 --- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java +++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java @@ -1150,6 +1150,7 @@ public class ApiConstants { public static final String PENDING_JOBS_COUNT = "pendingjobscount"; public static final String AGENTS_COUNT = "agentscount"; public static final String AGENTS = "agents"; + public static final String LAST_AGENTS = "lastagents"; public static final String PUBLIC_MTU = "publicmtu"; public static final String PRIVATE_MTU = "privatemtu"; diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiErrorCode.java b/api/src/main/java/org/apache/cloudstack/api/ApiErrorCode.java index d4fdeddc9a95..03dc37325d4b 100644 --- a/api/src/main/java/org/apache/cloudstack/api/ApiErrorCode.java +++ b/api/src/main/java/org/apache/cloudstack/api/ApiErrorCode.java @@ -30,6 +30,7 @@ public enum ApiErrorCode { UNSUPPORTED_ACTION_ERROR(432), API_LIMIT_EXCEED(429), + SERVICE_UNAVAILABLE(503), INTERNAL_ERROR(530), ACCOUNT_ERROR(531), ACCOUNT_RESOURCE_LIMIT_ERROR(532), diff --git a/api/src/main/java/org/apache/cloudstack/api/response/ManagementServerResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/ManagementServerResponse.java index 729fb5ff3bca..e6cad482fe56 100644 --- a/api/src/main/java/org/apache/cloudstack/api/response/ManagementServerResponse.java +++ b/api/src/main/java/org/apache/cloudstack/api/response/ManagementServerResponse.java @@ -82,6 +82,14 @@ public class ManagementServerResponse extends BaseResponse { @Param(description = "the Management Server Peers") private List peers; + @SerializedName(ApiConstants.LAST_AGENTS) + @Param(description = "the last agents this Management Server is responsible for, before shutdown or preparing for maintenance", since = "4.21.0.0") + private List lastAgents; + + @SerializedName(ApiConstants.AGENTS) + @Param(description = "the agents this Management Server is responsible for", since = "4.21.0.0") + private List agents; + @SerializedName(ApiConstants.AGENTS_COUNT) @Param(description = "the number of host agents this Management Server is responsible for", since = "4.21.0.0") private Long agentsCount; @@ -134,6 +142,14 @@ public String getIpAddress() { return ipAddress; } + public List getLastAgents() { + return lastAgents; + } + + public List getAgents() { + return agents; + } + public Long getAgentsCount() { return this.agentsCount; } @@ -190,6 +206,14 @@ public void setIpAddress(String ipAddress) { this.ipAddress = ipAddress; } + public void setLastAgents(List lastAgents) { + this.lastAgents = lastAgents; + } + + public void setAgents(List agents) { + this.agents = agents; + } + public void setAgentsCount(Long agentsCount) { this.agentsCount = agentsCount; } diff --git a/core/src/main/java/com/cloud/agent/api/PingAnswer.java b/core/src/main/java/com/cloud/agent/api/PingAnswer.java index 6353b121583a..3a40ad3925f2 100644 --- a/core/src/main/java/com/cloud/agent/api/PingAnswer.java +++ b/core/src/main/java/com/cloud/agent/api/PingAnswer.java @@ -19,18 +19,22 @@ package com.cloud.agent.api; +import java.util.List; + public class PingAnswer extends Answer { private PingCommand _command = null; private boolean sendStartup = false; + private List avoidMsList; protected PingAnswer() { } - public PingAnswer(PingCommand cmd, boolean sendStartup) { + public PingAnswer(PingCommand cmd, List avoidMsList, boolean sendStartup) { super(cmd); _command = cmd; this.sendStartup = sendStartup; + this.avoidMsList = avoidMsList; } public PingCommand getCommand() { @@ -44,4 +48,8 @@ public boolean isSendStartup() { public void setSendStartup(boolean sendStartup) { this.sendStartup = sendStartup; } + + public List getAvoidMsList() { + return avoidMsList; + } } diff --git a/core/src/main/java/com/cloud/agent/api/ReadyCommand.java b/core/src/main/java/com/cloud/agent/api/ReadyCommand.java index e2d974e38786..49768297ad59 100644 --- a/core/src/main/java/com/cloud/agent/api/ReadyCommand.java +++ b/core/src/main/java/com/cloud/agent/api/ReadyCommand.java @@ -35,6 +35,7 @@ public ReadyCommand() { private String hostUuid; private String hostName; private List msHostList; + private List avoidMsHostList; private String lbAlgorithm; private Long lbCheckInterval; private Boolean enableHumanReadableSizes; @@ -90,6 +91,14 @@ public void setMsHostList(List msHostList) { this.msHostList = msHostList; } + public List getAvoidMsHostList() { + return avoidMsHostList; + } + + public void setAvoidMsHostList(List msHostList) { + this.avoidMsHostList = avoidMsHostList; + } + public String getLbAlgorithm() { return lbAlgorithm; } diff --git a/core/src/main/java/org/apache/cloudstack/agent/lb/SetupMSListCommand.java b/core/src/main/java/org/apache/cloudstack/agent/lb/SetupMSListCommand.java index 50cf956c9e75..32f436434c17 100644 --- a/core/src/main/java/org/apache/cloudstack/agent/lb/SetupMSListCommand.java +++ b/core/src/main/java/org/apache/cloudstack/agent/lb/SetupMSListCommand.java @@ -26,12 +26,14 @@ public class SetupMSListCommand extends Command { private List msList; + private List avoidMsList; private String lbAlgorithm; private Long lbCheckInterval; - public SetupMSListCommand(final List msList, final String lbAlgorithm, final Long lbCheckInterval) { + public SetupMSListCommand(final List msList, final List avoidMsList, final String lbAlgorithm, final Long lbCheckInterval) { super(); this.msList = msList; + this.avoidMsList = avoidMsList; this.lbAlgorithm = lbAlgorithm; this.lbCheckInterval = lbCheckInterval; } @@ -40,6 +42,10 @@ public List getMsList() { return msList; } + public List getAvoidMsList() { + return avoidMsList; + } + public String getLbAlgorithm() { return lbAlgorithm; } diff --git a/engine/components-api/src/main/java/com/cloud/agent/AgentManager.java b/engine/components-api/src/main/java/com/cloud/agent/AgentManager.java index 82e2d29f407c..c01345ca21b1 100644 --- a/engine/components-api/src/main/java/com/cloud/agent/AgentManager.java +++ b/engine/components-api/src/main/java/com/cloud/agent/AgentManager.java @@ -16,7 +16,6 @@ // under the License. package com.cloud.agent; -import java.util.List; import java.util.Map; import org.apache.cloudstack.framework.config.ConfigKey; @@ -173,8 +172,4 @@ public enum TapAgentsAction { void propagateChangeToAgents(Map params); boolean transferDirectAgentsFromMS(String fromMsUuid, long fromMsId, long timeoutDurationInMs); - - List getLastAgents(); - - void setLastAgents(List lastAgents); } diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index ca56446631cd..6d4bcb7b0d9b 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -214,13 +214,13 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected final ConfigKey Workers = new ConfigKey<>("Advanced", Integer.class, "workers", "5", "Number of worker threads handling remote agent connections.", false); - protected final ConfigKey Port = new ConfigKey<>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false); + protected final ConfigKey Port = new ConfigKey<>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote (indirect) agent connections.", false); protected final ConfigKey RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", Integer.class, "agent.ssl.handshake.timeout", "30", - "Seconds after which SSL handshake times out during remote agent connections.", false); + "Seconds after which SSL handshake times out during remote (indirect) agent connections.", false); protected final ConfigKey RemoteAgentMaxConcurrentNewConnections = new ConfigKey<>("Advanced", Integer.class, "agent.max.concurrent.new.connections", "0", - "Number of maximum concurrent new connections server allows for remote agents. " + + "Number of maximum concurrent new connections server allows for remote (indirect) agents. " + "If set to zero (default value) then no limit will be enforced on concurrent new connections", false); protected final ConfigKey AlertWait = new ConfigKey<>("Advanced", Integer.class, "alert.wait", "1800", @@ -255,9 +255,7 @@ public boolean configure(final String name, final Map params) th _executor = new ThreadPoolExecutor(agentTaskThreads, agentTaskThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentTaskPool")); - _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool")); - // allow core threads to time out even when there are no items in the queue - _connectExecutor.allowCoreThreadTimeOut(true); + initConnectExecutor(); maxConcurrentNewAgentConnections = RemoteAgentMaxConcurrentNewConnections.value(); @@ -273,10 +271,6 @@ public boolean configure(final String name, final Map params) th logger.debug("Created DirectAgentAttache pool with size: {}.", directAgentPoolSize); _directAgentThreadCap = Math.round(directAgentPoolSize * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0 - _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); - - newAgentConnectionsMonitor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NewAgentConnectionsMonitor")); - initializeCommandTimeouts(); return true; @@ -351,10 +345,27 @@ public void unregisterForHostEvents(final int id) { _hostMonitors.remove(id); } + @Override + public void onManagementServerPreparingForMaintenance() { + logger.debug("Management server preparing for maintenance"); + if (_connection != null) { + _connection.block(); + } + } + + @Override + public void onManagementServerCancelPreparingForMaintenance() { + logger.debug("Management server cancel preparing for maintenance"); + if (_connection != null) { + _connection.unblock(); + } + } + @Override public void onManagementServerMaintenance() { logger.debug("Management server maintenance enabled"); _monitorExecutor.shutdownNow(); + newAgentConnectionsMonitor.shutdownNow(); if (_connection != null) { _connection.stop(); @@ -371,10 +382,8 @@ public void onManagementServerMaintenance() { public void onManagementServerCancelMaintenance() { logger.debug("Management server maintenance disabled"); if (_connectExecutor.isShutdown()) { - _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool")); - _connectExecutor.allowCoreThreadTimeOut(true); + initConnectExecutor(); } - startDirectlyConnectedHosts(true); if (_connection != null) { try { @@ -385,11 +394,30 @@ public void onManagementServerCancelMaintenance() { } if (_monitorExecutor.isShutdown()) { - _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); - _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS); + initAndScheduleMonitorExecutor(); + } + if (newAgentConnectionsMonitor.isShutdown()) { + initAndScheduleAgentConnectionsMonitor(); } } + private void initConnectExecutor() { + _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool")); + // allow core threads to time out even when there are no items in the queue + _connectExecutor.allowCoreThreadTimeOut(true); + } + + private void initAndScheduleMonitorExecutor() { + _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); + _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS); + } + + private void initAndScheduleAgentConnectionsMonitor() { + final int cleanupTimeInSecs = Wait.value(); + newAgentConnectionsMonitor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NewAgentConnectionsMonitor")); + newAgentConnectionsMonitor.scheduleAtFixedRate(new AgentNewConnectionsMonitorTask(), cleanupTimeInSecs, cleanupTimeInSecs, TimeUnit.SECONDS); + } + private AgentControlAnswer handleControlCommand(final AgentAttache attache, final AgentControlCommand cmd) { AgentControlAnswer answer; @@ -426,16 +454,6 @@ public AgentAttache findAttache(final long hostId) { return attache; } - @Override - public List getLastAgents() { - return lastAgents; - } - - @Override - public void setLastAgents(List lastAgents) { - this.lastAgents = lastAgents; - } - @Override public Answer sendTo(final Long dcId, final HypervisorType type, final Command cmd) { final List clusters = _clusterDao.listByDcHyType(dcId, type.toString()); @@ -779,6 +797,7 @@ public boolean start() { ManagementServerHostVO msHost = _mshostDao.findByMsid(_nodeId); if (msHost != null && (ManagementServerHost.State.Maintenance.equals(msHost.getState()) || ManagementServerHost.State.PreparingForMaintenance.equals(msHost.getState()))) { _monitorExecutor.shutdownNow(); + newAgentConnectionsMonitor.shutdownNow(); return true; } @@ -792,12 +811,8 @@ public boolean start() { } } - _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS); - - final int cleanupTime = Wait.value(); - newAgentConnectionsMonitor.scheduleAtFixedRate(new AgentNewConnectionsMonitorTask(), cleanupTime, - cleanupTime, TimeUnit.MINUTES); - + initAndScheduleMonitorExecutor(); + initAndScheduleAgentConnectionsMonitor(); return true; } @@ -1304,6 +1319,8 @@ private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Lin if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { final List newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); ready.setMsHostList(newMSList); + final List avoidMsList = _mshostDao.listNonUpStateMsIPs(); + ready.setAvoidMsHostList(avoidMsList); ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName()); ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId())); logger.debug("Agent's management server host list is not up to date, sending list update: {}", newMSList); @@ -1608,7 +1625,8 @@ protected void processRequest(final Link link, final Request request) { if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) { requestStartupCommand = true; } - answer = new PingAnswer((PingCommand)cmd, requestStartupCommand); + final List avoidMsList = _mshostDao.listNonUpStateMsIPs(); + answer = new PingAnswer((PingCommand)cmd, avoidMsList, requestStartupCommand); } else if (cmd instanceof ReadyAnswer) { final HostVO host = _hostDao.findById(attache.getId()); if (host == null) { @@ -1929,25 +1947,19 @@ protected void runInContext() { logger.trace("Agent New Connections Monitor is started."); final int cleanupTime = Wait.value(); Set> entrySet = newAgentConnections.entrySet(); - long cutOff = System.currentTimeMillis() - (cleanupTime * 60 * 1000L); - if (logger.isDebugEnabled()) { - List expiredConnections = newAgentConnections.entrySet() - .stream() - .filter(e -> e.getValue() <= cutOff) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - logger.debug("Currently {} active new connections, of which {} have expired - {}", - entrySet.size(), - expiredConnections.size(), - StringUtils.join(expiredConnections)); - } - for (Map.Entry entry : entrySet) { - if (entry.getValue() <= cutOff) { - if (logger.isTraceEnabled()) { - logger.trace("Cleaning up new agent connection for {}", entry.getKey()); - } - newAgentConnections.remove(entry.getKey()); - } + long cutOff = System.currentTimeMillis() - (cleanupTime * 1000L); + List expiredConnections = newAgentConnections.entrySet() + .stream() + .filter(e -> e.getValue() <= cutOff) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + logger.debug("Currently {} active new connections, of which {} have expired - {}", + entrySet.size(), + expiredConnections.size(), + StringUtils.join(expiredConnections)); + for (String connection : expiredConnections) { + logger.trace("Cleaning up new agent connection for {}", connection); + newAgentConnections.remove(connection); } } } diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index c667df5412e0..dad7d401b940 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -151,11 +151,11 @@ protected ClusteredAgentManagerImpl() { super(); } - protected final ConfigKey EnableLB = new ConfigKey<>(Boolean.class, "agent.lb.enabled", "Advanced", "false", "Enable agent load balancing between management server nodes", true); + protected final ConfigKey EnableLB = new ConfigKey<>(Boolean.class, "agent.lb.enabled", "Advanced", "false", "Enable direct agents load balancing between management server nodes", true); protected final ConfigKey ConnectedAgentThreshold = new ConfigKey<>(Double.class, "agent.load.threshold", "Advanced", "0.7", - "What percentage of the agents can be held by one management server before load balancing happens", true, EnableLB.key()); - protected final ConfigKey LoadSize = new ConfigKey<>(Integer.class, "direct.agent.load.size", "Advanced", "16", "How many agents to connect to in each round", true); - protected final ConfigKey ScanInterval = new ConfigKey<>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load agents", false, + "What percentage of the direct agents can be held by one management server before load balancing happens", true, EnableLB.key()); + protected final ConfigKey LoadSize = new ConfigKey<>(Integer.class, "direct.agent.load.size", "Advanced", "16", "How many direct agents to connect to in each round", true); + protected final ConfigKey ScanInterval = new ConfigKey<>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load direct agents", false, ConfigKey.Scope.Global, 1000); @Override @@ -1395,7 +1395,7 @@ public boolean transferDirectAgentsFromMS(String fromMsUuid, long fromMsId, long return false; } - long transferStartTime = System.currentTimeMillis(); + long transferStartTimeInMs = System.currentTimeMillis(); if (CollectionUtils.isEmpty(getDirectAgentHosts(fromMsId))) { logger.info("No direct agent hosts available on management server node {} (id: {}), to transfer", fromMsId, fromMsUuid); return true; @@ -1417,7 +1417,7 @@ public boolean transferDirectAgentsFromMS(String fromMsUuid, long fromMsId, long } logger.debug("Transferring {} direct agents from management server node {} (id: {}) of zone {}", directAgentHostsInDc.size(), fromMsId, fromMsUuid, dc); for (HostVO host : directAgentHostsInDc) { - long transferElapsedTimeInMs = System.currentTimeMillis() - transferStartTime; + long transferElapsedTimeInMs = System.currentTimeMillis() - transferStartTimeInMs; if (transferElapsedTimeInMs >= timeoutDurationInMs) { logger.debug("Stop transferring remaining direct agents from management server node {} (id: {}), timed out", fromMsId, fromMsUuid); return false; @@ -1486,6 +1486,18 @@ private void updateLastManagementServer(long hostId, long msId) { } } + @Override + public void onManagementServerPreparingForMaintenance() { + logger.debug("Management server preparing for maintenance"); + super.onManagementServerPreparingForMaintenance(); + } + + @Override + public void onManagementServerCancelPreparingForMaintenance() { + logger.debug("Management server cancel preparing for maintenance"); + super.onManagementServerPreparingForMaintenance(); + } + @Override public void onManagementServerMaintenance() { logger.debug("Management server maintenance enabled"); diff --git a/engine/schema/src/main/java/com/cloud/configuration/ManagementServiceConfiguration.java b/engine/schema/src/main/java/com/cloud/configuration/ManagementServiceConfiguration.java index 51b7f62f56de..841447de5fd8 100644 --- a/engine/schema/src/main/java/com/cloud/configuration/ManagementServiceConfiguration.java +++ b/engine/schema/src/main/java/com/cloud/configuration/ManagementServiceConfiguration.java @@ -21,7 +21,7 @@ public interface ManagementServiceConfiguration extends Configurable { ConfigKey PingInterval = new ConfigKey("Advanced", Integer.class, "ping.interval", "60", - "Interval to send application level pings to make sure the connection is still working", false); + "Interval in seconds to send application level pings to make sure the connection is still working", false); ConfigKey PingTimeout = new ConfigKey("Advanced", Float.class, "ping.timeout", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", true); public int getPingInterval(); diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java index cfd75b1a94b0..d44e842db8bf 100644 --- a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java +++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java @@ -183,6 +183,13 @@ public interface HostDao extends GenericDao, StateDao listByMs(long msId); + /** + * Retrieves the last host ids/agents this {@see ManagementServer} has responsibility over. + * @param msId the id of the {@see ManagementServer} + * @return the last host ids/agents this {@see ManagementServer} has responsibility over + */ + List listByLastMs(long msId); + /** * Retrieves the hypervisor versions of the hosts in the datacenter which are in Up state in ascending order * @param datacenterId data center id @@ -200,7 +207,7 @@ public interface HostDao extends GenericDao, StateDao findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(final Long zoneId, final Long clusterId, - final List resourceStates, final List types, + final Long msId, final List resourceStates, final List types, final List hypervisorTypes); List listDistinctHypervisorTypes(final Long zoneId); diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java index 54146e55049f..fac895400f32 100644 --- a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java +++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java @@ -129,6 +129,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao protected SearchBuilder ResponsibleMsSearch; protected SearchBuilder ResponsibleMsDcSearch; protected GenericSearchBuilder ResponsibleMsIdSearch; + protected GenericSearchBuilder LastMsIdSearch; protected SearchBuilder HostTypeClusterCountSearch; protected SearchBuilder HostTypeZoneCountSearch; protected SearchBuilder ClusterStatusSearch; @@ -209,6 +210,11 @@ public void init() { ResponsibleMsIdSearch.and("managementServerId", ResponsibleMsIdSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); ResponsibleMsIdSearch.done(); + LastMsIdSearch = createSearchBuilder(String.class); + LastMsIdSearch.selectFields(LastMsIdSearch.entity().getUuid()); + LastMsIdSearch.and("lastManagementServerId", LastMsIdSearch.entity().getLastManagementServerId(), SearchCriteria.Op.EQ); + LastMsIdSearch.done(); + HostTypeClusterCountSearch = createSearchBuilder(); HostTypeClusterCountSearch.and("cluster", HostTypeClusterCountSearch.entity().getClusterId(), SearchCriteria.Op.EQ); HostTypeClusterCountSearch.and("type", HostTypeClusterCountSearch.entity().getType(), SearchCriteria.Op.EQ); @@ -1569,6 +1575,13 @@ public List listByMs(long msId) { return customSearch(sc, null); } + @Override + public List listByLastMs(long msId) { + SearchCriteria sc = LastMsIdSearch.create(); + sc.addAnd("lastManagementServerId", SearchCriteria.Op.EQ, msId); + return customSearch(sc, null); + } + @Override public List listOrderedHostsHypervisorVersionsInDatacenter(long datacenterId, HypervisorType hypervisorType) { PreparedStatement pstmt; @@ -1745,13 +1758,15 @@ public boolean isHostUp(long hostId) { } @Override - public List findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(final Long zoneId, final Long clusterId, + public List findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(final Long zoneId, + final Long clusterId, final Long managementServerId, final List resourceStates, final List types, final List hypervisorTypes) { GenericSearchBuilder sb = createSearchBuilder(Long.class); sb.selectFields(sb.entity().getId()); sb.and("zoneId", sb.entity().getDataCenterId(), SearchCriteria.Op.EQ); sb.and("clusterId", sb.entity().getClusterId(), SearchCriteria.Op.EQ); + sb.and("msId", sb.entity().getManagementServerId(), SearchCriteria.Op.EQ); sb.and("resourceState", sb.entity().getResourceState(), SearchCriteria.Op.IN); sb.and("type", sb.entity().getType(), SearchCriteria.Op.IN); if (CollectionUtils.isNotEmpty(hypervisorTypes)) { @@ -1767,6 +1782,9 @@ public List findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(fin if (clusterId != null) { sc.setParameters("clusterId", clusterId); } + if (managementServerId != null) { + sc.setParameters("msId", managementServerId); + } if (CollectionUtils.isNotEmpty(hypervisorTypes)) { sc.setParameters("hypervisorTypes", hypervisorTypes.toArray()); } diff --git a/engine/schema/src/test/java/com/cloud/host/dao/HostDaoImplTest.java b/engine/schema/src/test/java/com/cloud/host/dao/HostDaoImplTest.java index 81163321c6b8..8f41162f2429 100644 --- a/engine/schema/src/test/java/com/cloud/host/dao/HostDaoImplTest.java +++ b/engine/schema/src/test/java/com/cloud/host/dao/HostDaoImplTest.java @@ -104,6 +104,7 @@ public void testIsHostUp() { public void testFindHostIdsByZoneClusterResourceStateTypeAndHypervisorType() { Long zoneId = 1L; Long clusterId = 2L; + Long msId = 1L; List resourceStates = List.of(ResourceState.Enabled); List types = List.of(Host.Type.Routing); List hypervisorTypes = List.of(Hypervisor.HypervisorType.KVM); @@ -117,10 +118,11 @@ public void testFindHostIdsByZoneClusterResourceStateTypeAndHypervisorType() { Mockito.doReturn(sb).when(hostDao).createSearchBuilder(Long.class); Mockito.doReturn(mockResults).when(hostDao).customSearch(Mockito.any(SearchCriteria.class), Mockito.any()); List hostIds = hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType( - zoneId, clusterId, resourceStates, types, hypervisorTypes); + zoneId, clusterId, msId, resourceStates, types, hypervisorTypes); Assert.assertEquals(mockResults, hostIds); Mockito.verify(sc).setParameters("zoneId", zoneId); Mockito.verify(sc).setParameters("clusterId", clusterId); + Mockito.verify(sc).setParameters("msId", msId); Mockito.verify(sc).setParameters("resourceState", resourceStates.toArray()); Mockito.verify(sc).setParameters("type", types.toArray()); Mockito.verify(sc).setParameters("hypervisorTypes", hypervisorTypes.toArray()); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 448a4eb219cb..41af291bd696 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -237,7 +237,7 @@ private void checkAsyncJobAllowed(AsyncJob job) { } } - throw new CloudRuntimeException("Maintenance or Shutdown has been initiated on this management server. Can not accept new jobs"); + throw new CloudRuntimeException("Maintenance or Shutdown has been initiated on this management server. Can not accept new async jobs"); } private boolean checkSyncQueueItemAllowed(SyncQueueItemVO item) { diff --git a/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceListener.java b/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceListener.java index bd82d1b257df..e0fe49a19ac6 100644 --- a/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceListener.java +++ b/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceListener.java @@ -18,6 +18,10 @@ package org.apache.cloudstack.maintenance; public interface ManagementServerMaintenanceListener { + void onManagementServerPreparingForMaintenance(); + + void onManagementServerCancelPreparingForMaintenance(); + void onManagementServerMaintenance(); void onManagementServerCancelMaintenance(); diff --git a/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManager.java b/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManager.java index d474f7188269..3af19164cc93 100644 --- a/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManager.java +++ b/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManager.java @@ -44,6 +44,10 @@ public interface ManagementServerMaintenanceManager { void unregisterListener(ManagementServerMaintenanceListener listener); + void onPreparingForMaintenance(); + + void onCancelPreparingForMaintenance(); + void onMaintenance(); void onCancelMaintenance(); diff --git a/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImpl.java b/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImpl.java index 0af8a7c114d2..fcfa32d6ce88 100644 --- a/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImpl.java +++ b/plugins/maintenance/src/main/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImpl.java @@ -53,6 +53,9 @@ import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ManagementServerHostVO; import com.cloud.cluster.dao.ManagementServerHostDao; +import com.cloud.event.ActionEvent; +import com.cloud.event.EventTypes; +import com.cloud.host.HostVO; import com.cloud.host.dao.HostDao; import com.cloud.serializer.GsonHelper; import com.cloud.utils.StringUtils; @@ -108,6 +111,25 @@ public boolean start() { return true; } + @Override + public boolean stop() { + ManagementServerHostVO msHost = msHostDao.findByMsid(ManagementServerNode.getManagementServerId()); + if (msHost != null) { + updateLastManagementServerForHosts(msHost.getMsid()); + } + return true; + } + + private void updateLastManagementServerForHosts(long msId) { + List hosts = hostDao.listHostsByMs(msId); + for (HostVO host : hosts) { + if (host != null) { + host.setLastManagementServerId(msId); + hostDao.update(host.getId(), host); + } + } + } + @Override public void registerListener(ManagementServerMaintenanceListener listener) { synchronized (_listeners) { @@ -124,6 +146,26 @@ public void unregisterListener(ManagementServerMaintenanceListener listener) { } } + @Override + public void onPreparingForMaintenance() { + synchronized (_listeners) { + for (final ManagementServerMaintenanceListener listener : _listeners) { + logger.info("Invoke, on preparing for maintenance for listener " + listener.getClass()); + listener.onManagementServerPreparingForMaintenance(); + } + } + } + + @Override + public void onCancelPreparingForMaintenance() { + synchronized (_listeners) { + for (final ManagementServerMaintenanceListener listener : _listeners) { + logger.info("Invoke, on cancel preparing for maintenance for listener " + listener.getClass()); + listener.onManagementServerCancelPreparingForMaintenance(); + } + } + } + @Override public void onMaintenance() { synchronized (_listeners) { @@ -243,6 +285,7 @@ public void prepareForMaintenance(String lbAlorithm) { this.maintenanceStartTime = System.currentTimeMillis(); this.lbAlgorithm = lbAlorithm; jobManager.disableAsyncJobs(); + onPreparingForMaintenance(); waitForPendingJobs(); } @@ -257,8 +300,13 @@ public void cancelMaintenance() { jobManager.enableAsyncJobs(); cancelWaitForPendingJobs(); ManagementServerHostVO msHost = msHostDao.findByMsid(ManagementServerNode.getManagementServerId()); - if (msHost != null && State.Maintenance.equals(msHost.getState())) { - onCancelMaintenance(); + if (msHost != null) { + if (State.PreparingForMaintenance.equals(msHost.getState())) { + onCancelPreparingForMaintenance(); + } + if (State.Maintenance.equals(msHost.getState())) { + onCancelMaintenance(); + } } } @@ -284,6 +332,7 @@ public ManagementServerMaintenanceResponse readyForShutdown(ReadyForShutdownCmd } @Override + @ActionEvent(eventType = EventTypes.EVENT_MS_SHUTDOWN_PREPARE, eventDescription = "preparing for shutdown") public ManagementServerMaintenanceResponse prepareForShutdown(PrepareForShutdownCmd cmd) { ManagementServerHostVO msHost = msHostDao.findById(cmd.getManagementServerId()); if (msHost == null) { @@ -294,19 +343,18 @@ public ManagementServerMaintenanceResponse prepareForShutdown(PrepareForShutdown throw new CloudRuntimeException("Management server is not in the right state to prepare for shutdown"); } + checkAnyMsInPreparingStates("prepare for shutdown"); + final Command[] cmds = new Command[1]; cmds[0] = new PrepareForShutdownManagementServerHostCommand(msHost.getMsid()); - String result = clusterManager.execute(String.valueOf(msHost.getMsid()), 0, gson.toJson(cmds), true); - logger.info("PrepareForShutdownCmd result : " + result); - if (!result.startsWith("Success")) { - throw new CloudRuntimeException(result); - } + executeCmd(msHost, cmds); msHostDao.updateState(msHost.getId(), State.PreparingForShutDown); return prepareMaintenanceResponse(cmd.getManagementServerId()); } @Override + @ActionEvent(eventType = EventTypes.EVENT_MS_SHUTDOWN, eventDescription = "triggering shutdown") public ManagementServerMaintenanceResponse triggerShutdown(TriggerShutdownCmd cmd) { ManagementServerHostVO msHost = msHostDao.findById(cmd.getManagementServerId()); if (msHost == null) { @@ -319,22 +367,20 @@ public ManagementServerMaintenanceResponse triggerShutdown(TriggerShutdownCmd cm } if (State.Up.equals(msHost.getState())) { + checkAnyMsInPreparingStates("trigger shutdown"); msHostDao.updateState(msHost.getId(), State.PreparingForShutDown); } final Command[] cmds = new Command[1]; cmds[0] = new TriggerShutdownManagementServerHostCommand(msHost.getMsid()); - String result = clusterManager.execute(String.valueOf(msHost.getMsid()), 0, gson.toJson(cmds), true); - logger.info("TriggerShutdownCmd result : " + result); - if (!result.startsWith("Success")) { - throw new CloudRuntimeException(result); - } + executeCmd(msHost, cmds); msHostDao.updateState(msHost.getId(), State.ShuttingDown); return prepareMaintenanceResponse(cmd.getManagementServerId()); } @Override + @ActionEvent(eventType = EventTypes.EVENT_MS_SHUTDOWN_CANCEL, eventDescription = "cancelling shutdown") public ManagementServerMaintenanceResponse cancelShutdown(CancelShutdownCmd cmd) { ManagementServerHostVO msHost = msHostDao.findById(cmd.getManagementServerId()); if (msHost == null) { @@ -347,17 +393,14 @@ public ManagementServerMaintenanceResponse cancelShutdown(CancelShutdownCmd cmd) final Command[] cmds = new Command[1]; cmds[0] = new CancelShutdownManagementServerHostCommand(msHost.getMsid()); - String result = clusterManager.execute(String.valueOf(msHost.getMsid()), 0, gson.toJson(cmds), true); - logger.info("CancelShutdownCmd result : " + result); - if (!result.startsWith("Success")) { - throw new CloudRuntimeException(result); - } + executeCmd(msHost, cmds); msHostDao.updateState(msHost.getId(), State.Up); return prepareMaintenanceResponse(cmd.getManagementServerId()); } @Override + @ActionEvent(eventType = EventTypes.EVENT_MS_MAINTENANCE_PREPARE, eventDescription = "preparing for maintenance") public ManagementServerMaintenanceResponse prepareForMaintenance(PrepareForMaintenanceCmd cmd) { if (StringUtils.isNotBlank(cmd.getAlgorithm())) { indirectAgentLB.checkLBAlgorithmName(cmd.getAlgorithm()); @@ -381,10 +424,7 @@ public ManagementServerMaintenanceResponse prepareForMaintenance(PrepareForMaint throw new CloudRuntimeException("Management server is not in the right state to prepare for maintenance"); } - final List preparingForMaintenanceMsList = msHostDao.listBy(State.PreparingForMaintenance); - if (CollectionUtils.isNotEmpty(preparingForMaintenanceMsList)) { - throw new CloudRuntimeException("Cannot prepare for maintenance, there are other management servers preparing for maintenance"); - } + checkAnyMsInPreparingStates("prepare for maintenance"); if (indirectAgentLB.haveAgentBasedHosts(msHost.getMsid())) { List indirectAgentMsList = indirectAgentLB.getManagementServerList(); @@ -396,23 +436,16 @@ public ManagementServerMaintenanceResponse prepareForMaintenance(PrepareForMaint } } - List lastAgents = hostDao.listByMs(cmd.getManagementServerId()); - agentMgr.setLastAgents(lastAgents); - final Command[] cmds = new Command[1]; cmds[0] = new PrepareForMaintenanceManagementServerHostCommand(msHost.getMsid(), cmd.getAlgorithm()); - String result = clusterManager.execute(String.valueOf(msHost.getMsid()), 0, gson.toJson(cmds), true); - logger.info("PrepareForMaintenanceCmd result : " + result); - if (!result.startsWith("Success")) { - agentMgr.setLastAgents(null); - throw new CloudRuntimeException(result); - } + executeCmd(msHost, cmds); msHostDao.updateState(msHost.getId(), State.PreparingForMaintenance); return prepareMaintenanceResponse(cmd.getManagementServerId()); } @Override + @ActionEvent(eventType = EventTypes.EVENT_MS_MAINTENANCE_CANCEL, eventDescription = "cancelling maintenance") public ManagementServerMaintenanceResponse cancelMaintenance(CancelMaintenanceCmd cmd) { ManagementServerHostVO msHost = msHostDao.findById(cmd.getManagementServerId()); if (msHost == null) { @@ -425,17 +458,31 @@ public ManagementServerMaintenanceResponse cancelMaintenance(CancelMaintenanceCm final Command[] cmds = new Command[1]; cmds[0] = new CancelMaintenanceManagementServerHostCommand(msHost.getMsid()); - String result = clusterManager.execute(String.valueOf(msHost.getMsid()), 0, gson.toJson(cmds), true); - logger.info("CancelMaintenanceCmd result : " + result); - if (!result.startsWith("Success")) { - throw new CloudRuntimeException(result); - } + executeCmd(msHost, cmds); msHostDao.updateState(msHost.getId(), State.Up); - agentMgr.setLastAgents(null); return prepareMaintenanceResponse(cmd.getManagementServerId()); } + private void executeCmd(ManagementServerHostVO msHost, Command[] cmds) { + if (msHost == null) { + throw new CloudRuntimeException("Management server node not specified, to execute the cmd"); + } + if (cmds == null || cmds.length <= 0) { + throw new CloudRuntimeException(String.format("Cmd not specified, to execute on the management server node %s", msHost)); + } + String result = clusterManager.execute(String.valueOf(msHost.getMsid()), 0, gson.toJson(cmds), false); + if (result == null) { + String msg = String.format("Unable to reach or execute %s on the management server node: %s", cmds[0], msHost); + logger.warn(msg); + throw new CloudRuntimeException(msg); + } + logger.info(String.format("Cmd %s - result: %s", cmds[0], result)); + if (!result.startsWith("Success")) { + throw new CloudRuntimeException(result); + } + } + @Override public void cancelPreparingForMaintenance(ManagementServerHostVO msHost) { resetPreparingForMaintenance(); @@ -445,9 +492,17 @@ public void cancelPreparingForMaintenance(ManagementServerHostVO msHost) { if (msHost == null) { msHost = msHostDao.findByMsid(ManagementServerNode.getManagementServerId()); } + onCancelPreparingForMaintenance(); msHostDao.updateState(msHost.getId(), State.Up); } + private void checkAnyMsInPreparingStates(String operation) { + final List preparingForMaintenanceOrShutDownMsList = msHostDao.listBy(State.PreparingForMaintenance, State.PreparingForShutDown); + if (CollectionUtils.isNotEmpty(preparingForMaintenanceOrShutDownMsList)) { + throw new CloudRuntimeException(String.format("Cannot %s, there are other management servers preparing for maintenance/shutdown", operation)); + } + } + private ManagementServerMaintenanceResponse prepareMaintenanceResponse(Long managementServerId) { ManagementServerHostVO msHost; Long[] msIds; @@ -465,8 +520,8 @@ private ManagementServerMaintenanceResponse prepareMaintenanceResponse(Long mana boolean maintenanceInitiatedForMS = Arrays.asList(maintenanceStates).contains(msHost.getState()); boolean shutdownTriggeredForMS = Arrays.asList(shutdownStates).contains(msHost.getState()); msIds = new Long[]{msHost.getMsid()}; - List agents = hostDao.listByMs(managementServerId); - long agentsCount = hostDao.countByMs(managementServerId); + List agents = hostDao.listByMs(msHost.getMsid()); + long agentsCount = agents.size(); long pendingJobCount = countPendingJobs(msIds); return new ManagementServerMaintenanceResponse(msHost.getUuid(), msHost.getState(), maintenanceInitiatedForMS, shutdownTriggeredForMS, pendingJobCount == 0, pendingJobCount, agentsCount, agents); } @@ -535,7 +590,6 @@ protected void runInContext() { // No more pending jobs. Good to terminate if (managementServerMaintenanceManager.isShutdownTriggered()) { logger.info("MS is Shutting Down Now"); - // update state to down ? System.exit(0); } if (managementServerMaintenanceManager.isPreparingForMaintenance()) { diff --git a/plugins/maintenance/src/test/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImplTest.java b/plugins/maintenance/src/test/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImplTest.java index 8e1c09bf9959..dc14124d0182 100644 --- a/plugins/maintenance/src/test/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImplTest.java +++ b/plugins/maintenance/src/test/java/org/apache/cloudstack/maintenance/ManagementServerMaintenanceManagerImplTest.java @@ -17,7 +17,23 @@ package org.apache.cloudstack.maintenance; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cloudstack.agent.lb.IndirectAgentLB; +import org.apache.cloudstack.api.command.CancelMaintenanceCmd; +import org.apache.cloudstack.api.command.CancelShutdownCmd; +import org.apache.cloudstack.api.command.PrepareForMaintenanceCmd; +import org.apache.cloudstack.api.command.PrepareForShutdownCmd; +import org.apache.cloudstack.api.command.TriggerShutdownCmd; import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.management.ManagementServerHost; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,6 +43,11 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import com.cloud.agent.AgentManager; +import com.cloud.cluster.ClusterManager; +import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.cluster.dao.ManagementServerHostDao; +import com.cloud.host.dao.HostDao; import com.cloud.utils.exception.CloudRuntimeException; @@ -40,6 +61,21 @@ public class ManagementServerMaintenanceManagerImplTest { @Mock AsyncJobManager jobManagerMock; + @Mock + IndirectAgentLB indirectAgentLBMock; + + @Mock + AgentManager agentManagerMock; + + @Mock + ClusterManager clusterManagerMock; + + @Mock + HostDao hostDao; + + @Mock + ManagementServerHostDao msHostDao; + private long prepareCountPendingJobs() { long expectedCount = 1L; Mockito.doReturn(expectedCount).when(jobManagerMock).countPendingNonPseudoJobs(1L); @@ -53,6 +89,21 @@ public void countPendingJobs() { Assert.assertEquals(expectedCount, count); } + @Test + public void prepareForShutdown() { + Mockito.doNothing().when(jobManagerMock).disableAsyncJobs(); + spy.prepareForShutdown(); + Mockito.verify(jobManagerMock).disableAsyncJobs(); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForShutdown(); + }); + + Mockito.doNothing().when(jobManagerMock).enableAsyncJobs(); + spy.cancelShutdown(); + Mockito.verify(jobManagerMock).enableAsyncJobs(); + } + @Test public void cancelShutdown() { Assert.assertThrows(CloudRuntimeException.class, () -> { @@ -61,17 +112,454 @@ public void cancelShutdown() { } @Test - public void prepareForShutdown() { + public void triggerShutdown() { Mockito.doNothing().when(jobManagerMock).disableAsyncJobs(); - spy.prepareForShutdown(); + Mockito.lenient().when(spy.isShutdownTriggered()).thenReturn(false); + spy.triggerShutdown(); Mockito.verify(jobManagerMock).disableAsyncJobs(); Assert.assertThrows(CloudRuntimeException.class, () -> { - spy.prepareForShutdown(); + spy.triggerShutdown(); + }); + } + + @Test + public void prepareForShutdownCmdNoMsHost() { + Mockito.when(msHostDao.findById(1L)).thenReturn(null); + PrepareForShutdownCmd cmd = mock(PrepareForShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForShutdown(cmd); + }); + } + + @Test + public void prepareForShutdownCmdMsHostWithNonUpState() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Maintenance); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + PrepareForShutdownCmd cmd = mock(PrepareForShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForShutdown(cmd); + }); + } + + @Test + public void prepareForShutdownCmdOtherMsHostsInPreparingState() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(any())).thenReturn(msHostList); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + PrepareForShutdownCmd cmd = mock(PrepareForShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForShutdown(cmd); + }); + } + + @Test + public void prepareForShutdownCmdNullResponseFromClusterManager() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Up); + List msHostList = new ArrayList<>(); + Mockito.when(msHostDao.listBy(any())).thenReturn(msHostList); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + PrepareForShutdownCmd cmd = mock(PrepareForShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn(null); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForShutdown(cmd); + }); + } + + @Test + public void prepareForShutdownCmdFailedResponseFromClusterManager() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Up); + List msHostList = new ArrayList<>(); + Mockito.when(msHostDao.listBy(any())).thenReturn(msHostList); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + PrepareForShutdownCmd cmd = mock(PrepareForShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Failed"); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForShutdown(cmd); + }); + } + + @Test + public void prepareForShutdownCmdSuccessResponseFromClusterManager() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Up); + Mockito.when(msHostDao.listBy(any())).thenReturn(new ArrayList<>()); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + Mockito.when(hostDao.listByMs(anyLong())).thenReturn(new ArrayList<>()); + PrepareForShutdownCmd cmd = mock(PrepareForShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Success"); + + spy.prepareForShutdown(cmd); + Mockito.verify(clusterManagerMock, Mockito.times(1)).execute(anyString(), anyLong(), anyString(), anyBoolean()); + } + + @Test + public void cancelShutdownCmdNoMsHost() { + Mockito.when(msHostDao.findById(1L)).thenReturn(null); + CancelShutdownCmd cmd = mock(CancelShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.cancelShutdown(cmd); + }); + } + + @Test + public void cancelShutdownCmdMsHostNotInShutdownState() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Up); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + CancelShutdownCmd cmd = mock(CancelShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.cancelShutdown(cmd); + }); + } + + @Test + public void cancelShutdownCmd() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.ReadyToShutDown); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + CancelShutdownCmd cmd = mock(CancelShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Success"); + + spy.cancelShutdown(cmd); + Mockito.verify(clusterManagerMock, Mockito.times(1)).execute(anyString(), anyLong(), anyString(), anyBoolean()); + } + + @Test + public void triggerShutdownCmdNoMsHost() { + Mockito.when(msHostDao.findById(1L)).thenReturn(null); + TriggerShutdownCmd cmd = mock(TriggerShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.triggerShutdown(cmd); + }); + } + + @Test + public void triggerShutdownCmdMsHostWithNotRightState() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.PreparingForMaintenance); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + TriggerShutdownCmd cmd = mock(TriggerShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.triggerShutdown(cmd); + }); + } + + @Test + public void triggerShutdownCmdMsInUpStateAndOtherMsHostsInPreparingState() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(any())).thenReturn(msHostList); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + TriggerShutdownCmd cmd = mock(TriggerShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.triggerShutdown(cmd); + }); + } + + @Test + public void triggerShutdownCmd() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.ReadyToShutDown); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + TriggerShutdownCmd cmd = mock(TriggerShutdownCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Success"); + + spy.triggerShutdown(cmd); + Mockito.verify(clusterManagerMock, Mockito.times(1)).execute(anyString(), anyLong(), anyString(), anyBoolean()); + } + + @Test + public void prepareForMaintenanceAndCancelFromMaintenanceState() { + Mockito.doNothing().when(jobManagerMock).disableAsyncJobs(); + spy.prepareForMaintenance("static"); + Mockito.verify(jobManagerMock).disableAsyncJobs(); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance("static"); }); + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Maintenance); + Mockito.when(msHostDao.findByMsid(anyLong())).thenReturn(msHost); Mockito.doNothing().when(jobManagerMock).enableAsyncJobs(); - spy.cancelShutdown(); + spy.cancelMaintenance(); + Mockito.verify(jobManagerMock).enableAsyncJobs(); + Mockito.verify(spy, Mockito.times(1)).onCancelMaintenance(); + } + + @Test + public void prepareForMaintenanceAndCancelFromPreparingForMaintenanceState() { + Mockito.doNothing().when(jobManagerMock).disableAsyncJobs(); + spy.prepareForMaintenance("static"); + Mockito.verify(jobManagerMock).disableAsyncJobs(); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance("static"); + }); + + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.PreparingForMaintenance); + Mockito.when(msHostDao.findByMsid(anyLong())).thenReturn(msHost); + Mockito.doNothing().when(jobManagerMock).enableAsyncJobs(); + spy.cancelMaintenance(); + Mockito.verify(jobManagerMock).enableAsyncJobs(); + Mockito.verify(spy, Mockito.times(1)).onCancelPreparingForMaintenance(); + } + + @Test + public void cancelMaintenance() { + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.cancelMaintenance(); + }); + } + + @Test + public void cancelPreparingForMaintenance() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHostDao.findByMsid(anyLong())).thenReturn(msHost); + + spy.cancelPreparingForMaintenance(null); Mockito.verify(jobManagerMock).enableAsyncJobs(); + Mockito.verify(spy, Mockito.times(1)).onCancelPreparingForMaintenance(); + } + + @Test + public void prepareForMaintenanceCmdNoOtherMsHostsWithUpState() { + Mockito.when(msHostDao.listBy(any())).thenReturn(new ArrayList<>()); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getAlgorithm()).thenReturn("test algorithm"); + Mockito.doNothing().when(indirectAgentLBMock).checkLBAlgorithmName(anyString()); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdOnlyOneMsHostsWithUpState() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getAlgorithm()).thenReturn("test algorithm"); + Mockito.doNothing().when(indirectAgentLBMock).checkLBAlgorithmName(anyString()); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdNoMsHost() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost1); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + Mockito.when(msHostDao.findById(1L)).thenReturn(null); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdMsHostWithNonUpState() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Maintenance); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost1); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdOtherMsHostsInPreparingState() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList1 = new ArrayList<>(); + msHostList1.add(msHost1); + msHostList1.add(msHost2); + ManagementServerHostVO msHost3 = mock(ManagementServerHostVO.class); + List msHostList2 = new ArrayList<>(); + msHostList2.add(msHost3); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList1); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.PreparingForMaintenance, ManagementServerHost.State.PreparingForShutDown)).thenReturn(msHostList2); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdNoIndirectMsHosts() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost1); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.PreparingForMaintenance, ManagementServerHost.State.PreparingForShutDown)).thenReturn(new ArrayList<>()); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + Mockito.when(msHostDao.listNonUpStateMsIPs()).thenReturn(new ArrayList<>()); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(indirectAgentLBMock.haveAgentBasedHosts(anyLong())).thenReturn(true); + Mockito.when(indirectAgentLBMock.getManagementServerList()).thenReturn(new ArrayList<>()); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdNullResponseFromClusterManager() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost1); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.PreparingForMaintenance, ManagementServerHost.State.PreparingForShutDown)).thenReturn(new ArrayList<>()); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(indirectAgentLBMock.haveAgentBasedHosts(anyLong())).thenReturn(false); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn(null); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdFailedResponseFromClusterManager() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost1); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.PreparingForMaintenance, ManagementServerHost.State.PreparingForShutDown)).thenReturn(new ArrayList<>()); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(indirectAgentLBMock.haveAgentBasedHosts(anyLong())).thenReturn(false); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Failed"); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.prepareForMaintenance(cmd); + }); + } + + @Test + public void prepareForMaintenanceCmdSuccessResponseFromClusterManager() { + ManagementServerHostVO msHost1 = mock(ManagementServerHostVO.class); + Mockito.when(msHost1.getState()).thenReturn(ManagementServerHost.State.Up); + ManagementServerHostVO msHost2 = mock(ManagementServerHostVO.class); + List msHostList = new ArrayList<>(); + msHostList.add(msHost1); + msHostList.add(msHost2); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.Up)).thenReturn(msHostList); + Mockito.when(msHostDao.listBy(ManagementServerHost.State.PreparingForMaintenance, ManagementServerHost.State.PreparingForShutDown)).thenReturn(new ArrayList<>()); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost1); + PrepareForMaintenanceCmd cmd = mock(PrepareForMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(indirectAgentLBMock.haveAgentBasedHosts(anyLong())).thenReturn(false); + Mockito.when(hostDao.listByMs(anyLong())).thenReturn(new ArrayList<>()); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Success"); + + spy.prepareForMaintenance(cmd); + Mockito.verify(clusterManagerMock, Mockito.times(1)).execute(anyString(), anyLong(), anyString(), anyBoolean()); + } + + @Test + public void cancelMaintenanceCmdNoMsHost() { + Mockito.when(msHostDao.findById(1L)).thenReturn(null); + CancelMaintenanceCmd cmd = mock(CancelMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.cancelMaintenance(cmd); + }); + } + + @Test + public void cancelMaintenanceCmdMsHostNotInMaintenanceState() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Up); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + CancelMaintenanceCmd cmd = mock(CancelMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + + Assert.assertThrows(CloudRuntimeException.class, () -> { + spy.cancelMaintenance(cmd); + }); + } + + @Test + public void cancelMaintenanceCmd() { + ManagementServerHostVO msHost = mock(ManagementServerHostVO.class); + Mockito.when(msHost.getState()).thenReturn(ManagementServerHost.State.Maintenance); + Mockito.when(msHostDao.findById(1L)).thenReturn(msHost); + CancelMaintenanceCmd cmd = mock(CancelMaintenanceCmd.class); + Mockito.when(cmd.getManagementServerId()).thenReturn(1L); + Mockito.when(clusterManagerMock.execute(anyString(), anyLong(), anyString(), anyBoolean())).thenReturn("Success"); + + spy.cancelMaintenance(cmd); + Mockito.verify(clusterManagerMock, Mockito.times(1)).execute(anyString(), anyLong(), anyString(), anyBoolean()); } } diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/response/ManagementServerMetricsResponse.java b/plugins/metrics/src/main/java/org/apache/cloudstack/response/ManagementServerMetricsResponse.java index d96f5b14f0da..83c6f3dc7d4d 100644 --- a/plugins/metrics/src/main/java/org/apache/cloudstack/response/ManagementServerMetricsResponse.java +++ b/plugins/metrics/src/main/java/org/apache/cloudstack/response/ManagementServerMetricsResponse.java @@ -31,11 +31,11 @@ public class ManagementServerMetricsResponse extends ManagementServerResponse { private Integer availableProcessors; @SerializedName(MetricConstants.LAST_AGENTS) - @Param(description = "the last agents this Management Server is responsible for, before preparing for maintenance", since = "4.18.1") + @Param(description = "the last agents this Management Server is responsible for, before shutdown or preparing for maintenance", since = "4.21.0.0") private List lastAgents; @SerializedName(MetricConstants.AGENTS) - @Param(description = "the agents this Management Server is responsible for", since = "4.18.1") + @Param(description = "the agents this Management Server is responsible for", since = "4.21.0.0") private List agents; @SerializedName(MetricConstants.AGENT_COUNT) diff --git a/server/src/main/java/com/cloud/api/ApiDispatcher.java b/server/src/main/java/com/cloud/api/ApiDispatcher.java index 6a43ff10f31a..90cbb6afc8e5 100644 --- a/server/src/main/java/com/cloud/api/ApiDispatcher.java +++ b/server/src/main/java/com/cloud/api/ApiDispatcher.java @@ -94,7 +94,7 @@ public void dispatchCreateCmd(final BaseAsyncCreateCmd cmd, final Map para // BaseAsyncCreateCmd: cmd params are processed and create() is called, then same workflow as BaseAsyncCmd. // BaseAsyncCmd: cmd is processed and submitted as an AsyncJob, job related info is serialized and returned. if (cmdObj instanceof BaseAsyncCmd) { + if (!asyncMgr.isAsyncJobsEnabled()) { + String msg = "Maintenance or Shutdown has been initiated on this management server. Can not accept new jobs"; + logger.warn(msg); + throw new ServerApiException(ApiErrorCode.SERVICE_UNAVAILABLE, msg); + } Long objectId = null; String objectUuid = null; if (cmdObj instanceof BaseAsyncCreateCmd) { diff --git a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java index 846fc1921992..d32b444a9d24 100644 --- a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java +++ b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java @@ -5436,7 +5436,11 @@ protected ManagementServerResponse createManagementServerResponse(ManagementServ mgmtResponse.addPeer(createPeerManagementServerNodeResponse(peer)); } } - mgmtResponse.setAgentsCount((long) hostDao.countByMs(mgmt.getMsid())); + List lastAgents = hostDao.listByLastMs(mgmt.getMsid()); + mgmtResponse.setLastAgents(lastAgents); + List agents = hostDao.listByMs(mgmt.getMsid()); + mgmtResponse.setAgents(agents); + mgmtResponse.setAgentsCount((long) agents.size()); mgmtResponse.setPendingJobsCount(jobManager.countPendingNonPseudoJobs(mgmt.getMsid())); mgmtResponse.setIpAddress(mgmt.getServiceIP()); mgmtResponse.setObjectName("managementserver"); diff --git a/server/src/main/java/com/cloud/server/StatsCollector.java b/server/src/main/java/com/cloud/server/StatsCollector.java index 31d8c80329c7..9ad6f3e2cbd1 100644 --- a/server/src/main/java/com/cloud/server/StatsCollector.java +++ b/server/src/main/java/com/cloud/server/StatsCollector.java @@ -752,21 +752,21 @@ protected void runInContext() { logger.debug(String.format("%s is running...", this.getClass().getSimpleName())); long msid = ManagementServerNode.getManagementServerId(); ManagementServerHostVO mshost = null; - ManagementServerHostStatsEntry hostStatsEntry = null; + ManagementServerHostStatsEntry msHostStatsEntry = null; try { mshost = managementServerHostDao.findByMsid(msid); // get local data - hostStatsEntry = getDataFrom(mshost); - managementServerHostStats.put(mshost.getUuid(), hostStatsEntry); + msHostStatsEntry = getDataFrom(mshost); + managementServerHostStats.put(mshost.getUuid(), msHostStatsEntry); // send to other hosts - clusterManager.publishStatus(gson.toJson(hostStatsEntry)); + clusterManager.publishStatus(gson.toJson(msHostStatsEntry)); } catch (Throwable t) { // pokemon catch to make sure the thread stays running logger.error("Error trying to retrieve management server host statistics", t); } try { // send to DB - storeStatus(hostStatsEntry, mshost); + storeStatus(msHostStatsEntry, mshost); } catch (Throwable t) { // pokemon catch to make sure the thread stays running logger.error("Error trying to store management server host statistics", t); @@ -834,11 +834,11 @@ private void retrieveSession(ManagementServerHostStatsEntry newEntry) { } private void getDataBaseStatistics(ManagementServerHostStatsEntry newEntry, long msid) { - newEntry.setLastAgents(_agentMgr.getLastAgents()); + List lastAgents = _hostDao.listByLastMs(msid); + newEntry.setLastAgents(lastAgents); List agents = _hostDao.listByMs(msid); newEntry.setAgents(agents); - int count = _hostDao.countByMs(msid); - newEntry.setAgentCount(count); + newEntry.setAgentCount(agents.size()); } private void getMemoryData(@NotNull ManagementServerHostStatsEntry newEntry) { diff --git a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java index 1f0f439d8191..3336d44dba81 100644 --- a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java +++ b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java @@ -19,10 +19,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.EnumSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.naming.ConfigurationException; @@ -33,6 +36,8 @@ import org.apache.cloudstack.config.ApiServiceConfiguration; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.Configurable; +import org.apache.cloudstack.managed.context.ManagedContextRunnable; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import com.cloud.agent.AgentManager; @@ -40,6 +45,7 @@ import com.cloud.agent.api.MigrateAgentConnectionCommand; import com.cloud.cluster.ManagementServerHostVO; import com.cloud.cluster.dao.ManagementServerHostDao; +import com.cloud.dc.DataCenter; import com.cloud.dc.DataCenterVO; import com.cloud.dc.dao.ClusterDao; import com.cloud.dc.dao.DataCenterDao; @@ -49,20 +55,20 @@ import com.cloud.hypervisor.Hypervisor; import com.cloud.resource.ResourceState; import com.cloud.utils.component.ComponentLifecycleBase; +import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.CloudRuntimeException; -import org.apache.commons.collections.CollectionUtils; - public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implements IndirectAgentLB, Configurable { public static final ConfigKey IndirectAgentLBAlgorithm = new ConfigKey<>(String.class, "indirect.agent.lb.algorithm", "Advanced", "static", - "The algorithm to be applied on the provided 'host' management server list that is sent to indirect agents. Allowed values are: static, roundrobin and shuffle.", + "The algorithm to be applied on the provided management server list in the 'host' config that that is sent to indirect agents. Allowed values are: static, roundrobin and shuffle.", true, ConfigKey.Scope.Global, null, null, null, null, null, ConfigKey.Kind.Select, "static,roundrobin,shuffle"); public static final ConfigKey IndirectAgentLBCheckInterval = new ConfigKey<>("Advanced", Long.class, "indirect.agent.lb.check.interval", "0", - "The interval in seconds after which agent should check and try to connect to its preferred host. Set 0 to disable it.", + "The interval in seconds after which indirect agent should check and try to connect to its preferred host (the first management server from the propagated list provided in the 'host' config)." + + " Set 0 to disable it.", true, ConfigKey.Scope.Cluster); private static Map algorithmMap = new HashMap<>(); @@ -85,6 +91,8 @@ public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implement ResourceState.ErrorInMaintenance, ResourceState.PrepareForMaintenance); private static final List agentValidHostTypes = List.of(Host.Type.Routing, Host.Type.ConsoleProxy, Host.Type.SecondaryStorage, Host.Type.SecondaryStorageVM); + private static final List agentNonRoutingHostTypes = List.of(Host.Type.ConsoleProxy, + Host.Type.SecondaryStorage, Host.Type.SecondaryStorageVM); private static final List agentValidHypervisorTypes = List.of( Hypervisor.HypervisorType.KVM, Hypervisor.HypervisorType.LXC); @@ -246,8 +254,18 @@ private void conditionallyAddHost(List agentBasedHosts, Host host) { agentBasedHosts.add(host); } + private List getAllAgentBasedNonRoutingHostsFromDB(final Long zoneId, final Long msId) { + return hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(zoneId, null, msId, + agentValidResourceStates, agentNonRoutingHostTypes, agentValidHypervisorTypes); + } + + private List getAllAgentBasedRoutingHostsFromDB(final Long zoneId, final Long clusterId, final Long msId) { + return hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(zoneId, clusterId, msId, + agentValidResourceStates, List.of(Host.Type.Routing), agentValidHypervisorTypes); + } + private List getAllAgentBasedHostsFromDB(final Long zoneId, final Long clusterId) { - return hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(zoneId, clusterId, + return hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(zoneId, clusterId, null, agentValidResourceStates, agentValidHostTypes, agentValidHypervisorTypes); } @@ -287,31 +305,159 @@ public void checkLBAlgorithmName(String lbAlgorithm) { @Override public void propagateMSListToAgents() { logger.debug("Propagating management server list update to agents"); + ExecutorService setupMSListExecutorService = Executors.newFixedThreadPool(10, new NamedThreadFactory("SetupMSList-Worker")); final String lbAlgorithm = getLBAlgorithmName(); + final Long globalLbCheckInterval = getLBPreferredHostCheckInterval(null); List zones = dataCenterDao.listAll(); for (DataCenterVO zone : zones) { List zoneHostIds = new ArrayList<>(); + List nonRoutingHostIds = getAllAgentBasedNonRoutingHostsFromDB(zone.getId(), null); + zoneHostIds.addAll(nonRoutingHostIds); Map> clusterHostIdsMap = new HashMap<>(); List clusterIds = clusterDao.listAllClusterIds(zone.getId()); for (Long clusterId : clusterIds) { - List hostIds = getAllAgentBasedHostsFromDB(zone.getId(), clusterId); + List hostIds = getAllAgentBasedRoutingHostsFromDB(zone.getId(), clusterId, null); clusterHostIdsMap.put(clusterId, hostIds); zoneHostIds.addAll(hostIds); } zoneHostIds.sort(Comparator.comparingLong(x -> x)); + final List avoidMsList = mshostDao.listNonUpStateMsIPs(); + for (Long nonRoutingHostId : nonRoutingHostIds) { + setupMSListExecutorService.submit(new SetupMSListTask(nonRoutingHostId, zone.getId(), zoneHostIds, avoidMsList, lbAlgorithm, globalLbCheckInterval)); + } for (Long clusterId : clusterIds) { - final Long lbCheckInterval = getLBPreferredHostCheckInterval(clusterId); + final Long clusterLbCheckInterval = getLBPreferredHostCheckInterval(clusterId); List hostIds = clusterHostIdsMap.get(clusterId); for (Long hostId : hostIds) { - final List msList = getManagementServerList(hostId, zone.getId(), zoneHostIds); - final SetupMSListCommand cmd = new SetupMSListCommand(msList, lbAlgorithm, lbCheckInterval); - final Answer answer = agentManager.easySend(hostId, cmd); - if (answer == null || !answer.getResult()) { - logger.warn("Failed to setup management servers list to the agent of ID: {}", hostId); - } + setupMSListExecutorService.submit(new SetupMSListTask(hostId, zone.getId(), zoneHostIds, avoidMsList, lbAlgorithm, clusterLbCheckInterval)); } } } + + setupMSListExecutorService.shutdown(); + try { + if (!setupMSListExecutorService.awaitTermination(300, TimeUnit.SECONDS)) { + setupMSListExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + setupMSListExecutorService.shutdownNow(); + logger.debug(String.format("Force shutdown setup ms list service as it did not shutdown in the desired time due to: %s", e.getMessage())); + } + } + + private final class SetupMSListTask extends ManagedContextRunnable { + private Long hostId; + private Long dcId; + private List orderedHostIdList; + private List avoidMsList; + private String lbAlgorithm; + private Long lbCheckInterval; + + public SetupMSListTask(Long hostId, Long dcId, List orderedHostIdList, List avoidMsList, + String lbAlgorithm, Long lbCheckInterval) { + this.hostId = hostId; + this.dcId = dcId; + this.orderedHostIdList = orderedHostIdList; + this.avoidMsList = avoidMsList; + this.lbAlgorithm = lbAlgorithm; + this.lbCheckInterval = lbCheckInterval; + } + + @Override + protected void runInContext() { + final List msList = getManagementServerList(hostId, dcId, orderedHostIdList); + final SetupMSListCommand cmd = new SetupMSListCommand(msList, avoidMsList, lbAlgorithm, lbCheckInterval); + cmd.setWait(60); + final Answer answer = agentManager.easySend(hostId, cmd); + if (answer == null || !answer.getResult()) { + logger.warn(String.format("Failed to setup management servers list to the agent of ID: %d", hostId)); + } + } + } + + protected boolean migrateNonRoutingHostAgentsInZone(String fromMsUuid, long fromMsId, DataCenter dc, + long migrationStartTimeInMs, long timeoutDurationInMs, final List avoidMsList, String lbAlgorithm, + boolean lbAlgorithmChanged, List orderedHostIdList) { + List systemVmAgentsInDc = getAllAgentBasedNonRoutingHostsFromDB(dc.getId(), fromMsId); + if (CollectionUtils.isEmpty(systemVmAgentsInDc)) { + return true; + } + logger.debug(String.format("Migrating %d non-routing host agents from management server node %d (id: %s) of zone %s", + systemVmAgentsInDc.size(), fromMsId, fromMsUuid, dc)); + ExecutorService migrateAgentsExecutorService = Executors.newFixedThreadPool(5, new NamedThreadFactory("MigrateNonRoutingHostAgent-Worker")); + Long lbCheckInterval = getLBPreferredHostCheckInterval(null); + boolean stopMigration = false; + for (final Long hostId : systemVmAgentsInDc) { + long migrationElapsedTimeInMs = System.currentTimeMillis() - migrationStartTimeInMs; + if (migrationElapsedTimeInMs >= timeoutDurationInMs) { + logger.debug(String.format("Stop migrating remaining non-routing host agents from management server node %d (id: %s), timed out", fromMsId, fromMsUuid)); + stopMigration = true; + break; + } + + migrateAgentsExecutorService.submit(new MigrateAgentConnectionTask(fromMsId, hostId, dc.getId(), orderedHostIdList, avoidMsList, lbCheckInterval, lbAlgorithm, lbAlgorithmChanged)); + } + + if (stopMigration) { + migrateAgentsExecutorService.shutdownNow(); + return false; + } + + migrateAgentsExecutorService.shutdown(); + long pendingTimeoutDurationInMs = timeoutDurationInMs - (System.currentTimeMillis() - migrationStartTimeInMs); + try { + if (pendingTimeoutDurationInMs <= 0 || !migrateAgentsExecutorService.awaitTermination(pendingTimeoutDurationInMs, TimeUnit.MILLISECONDS)) { + migrateAgentsExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + migrateAgentsExecutorService.shutdownNow(); + logger.debug(String.format("Force shutdown migrate non-routing agents service as it did not shutdown in the desired time due to: %s", e.getMessage())); + } + + return true; + } + + protected boolean migrateRoutingHostAgentsInCluster(long clusterId, String fromMsUuid, long fromMsId, DataCenter dc, + long migrationStartTimeInMs, long timeoutDurationInMs, final List avoidMsList, String lbAlgorithm, + boolean lbAlgorithmChanged, List orderedHostIdList) { + + List agentBasedHostsOfMsInDcAndCluster = getAllAgentBasedRoutingHostsFromDB(dc.getId(), clusterId, fromMsId); + if (CollectionUtils.isEmpty(agentBasedHostsOfMsInDcAndCluster)) { + return true; + } + logger.debug(String.format("Migrating %d indirect routing host agents from management server node %d (id: %s) of zone %s, " + + "cluster ID: %d", agentBasedHostsOfMsInDcAndCluster.size(), fromMsId, fromMsUuid, dc, clusterId)); + ExecutorService migrateAgentsExecutorService = Executors.newFixedThreadPool(10, new NamedThreadFactory("MigrateRoutingHostAgent-Worker")); + Long lbCheckInterval = getLBPreferredHostCheckInterval(clusterId); + boolean stopMigration = false; + for (final Long hostId : agentBasedHostsOfMsInDcAndCluster) { + long migrationElapsedTimeInMs = System.currentTimeMillis() - migrationStartTimeInMs; + if (migrationElapsedTimeInMs >= timeoutDurationInMs) { + logger.debug(String.format("Stop migrating remaining indirect routing host agents from management server node %d (id: %s), timed out", fromMsId, fromMsUuid)); + stopMigration = true; + break; + } + + migrateAgentsExecutorService.submit(new MigrateAgentConnectionTask(fromMsId, hostId, dc.getId(), orderedHostIdList, avoidMsList, lbCheckInterval, lbAlgorithm, lbAlgorithmChanged)); + } + + if (stopMigration) { + migrateAgentsExecutorService.shutdownNow(); + return false; + } + + migrateAgentsExecutorService.shutdown(); + long pendingTimeoutDurationInMs = timeoutDurationInMs - (System.currentTimeMillis() - migrationStartTimeInMs); + try { + if (pendingTimeoutDurationInMs <= 0 || !migrateAgentsExecutorService.awaitTermination(pendingTimeoutDurationInMs, TimeUnit.MILLISECONDS)) { + migrateAgentsExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + migrateAgentsExecutorService.shutdownNow(); + logger.debug(String.format("Force shutdown migrate routing agents service as it did not shutdown in the desired time due to: %s", e.getMessage())); + } + + return true; } @Override @@ -322,7 +468,7 @@ public boolean migrateAgents(String fromMsUuid, long fromMsId, String lbAlgorith } logger.debug(String.format("Migrating indirect agents from management server node %d (id: %s) to other nodes", fromMsId, fromMsUuid)); - long migrationStartTime = System.currentTimeMillis(); + long migrationStartTimeInMs = System.currentTimeMillis(); if (!haveAgentBasedHosts(fromMsId)) { logger.info(String.format("No indirect agents available on management server node %d (id: %s), to migrate", fromMsId, fromMsUuid)); return true; @@ -342,35 +488,73 @@ public boolean migrateAgents(String fromMsUuid, long fromMsId, String lbAlgorith List dataCenterList = dcDao.listAll(); for (DataCenterVO dc : dataCenterList) { - Long dcId = dc.getId(); - List orderedHostIdList = getOrderedHostIdList(dcId); - List agentBasedHostsOfMsInDc = getAllAgentBasedHostsInDc(fromMsId, dcId); - if (CollectionUtils.isEmpty(agentBasedHostsOfMsInDc)) { - continue; + if (!migrateAgentsInZone(dc, fromMsUuid, fromMsId, avoidMsList, lbAlgorithm, lbAlgorithmChanged, + migrationStartTimeInMs, timeoutDurationInMs)) { + return false; } - logger.debug(String.format("Migrating %d indirect agents from management server node %d (id: %s) of zone %s", agentBasedHostsOfMsInDc.size(), fromMsId, fromMsUuid, dc)); - for (final Host host : agentBasedHostsOfMsInDc) { - long migrationElapsedTimeInMs = System.currentTimeMillis() - migrationStartTime; - if (migrationElapsedTimeInMs >= timeoutDurationInMs) { - logger.debug(String.format("Stop migrating remaining indirect agents from management server node %d (id: %s), timed out", fromMsId, fromMsUuid)); - return false; - } + } + + return true; + } + private boolean migrateAgentsInZone(DataCenterVO dc, String fromMsUuid, long fromMsId, List avoidMsList, + String lbAlgorithm, boolean lbAlgorithmChanged, long migrationStartTimeInMs, long timeoutDurationInMs) { + List orderedHostIdList = getOrderedHostIdList(dc.getId()); + if (!migrateNonRoutingHostAgentsInZone(fromMsUuid, fromMsId, dc, migrationStartTimeInMs, + timeoutDurationInMs, avoidMsList, lbAlgorithm, lbAlgorithmChanged, orderedHostIdList)) { + return false; + } + List clusterIds = clusterDao.listAllClusterIds(dc.getId()); + for (Long clusterId : clusterIds) { + if (!migrateRoutingHostAgentsInCluster(clusterId, fromMsUuid, fromMsId, dc, migrationStartTimeInMs, + timeoutDurationInMs, avoidMsList, lbAlgorithm, lbAlgorithmChanged, orderedHostIdList)) { + return false; + } + } + return true; + } + + private final class MigrateAgentConnectionTask extends ManagedContextRunnable { + private long fromMsId; + Long hostId; + Long dcId; + List orderedHostIdList; + List avoidMsList; + Long lbCheckInterval; + String lbAlgorithm; + boolean lbAlgorithmChanged; + + public MigrateAgentConnectionTask(long fromMsId, Long hostId, Long dcId, List orderedHostIdList, + List avoidMsList, Long lbCheckInterval, String lbAlgorithm, boolean lbAlgorithmChanged) { + this.fromMsId = fromMsId; + this.hostId = hostId; + this.orderedHostIdList = orderedHostIdList; + this.avoidMsList = avoidMsList; + this.lbCheckInterval = lbCheckInterval; + this.lbAlgorithm = lbAlgorithm; + this.lbAlgorithmChanged = lbAlgorithmChanged; + } + + @Override + protected void runInContext() { + try { List msList = null; - Long lbCheckInterval = 0L; if (lbAlgorithmChanged) { // send new MS list when there is change in lb algorithm - msList = getManagementServerList(host.getId(), dcId, orderedHostIdList, lbAlgorithm); - lbCheckInterval = getLBPreferredHostCheckInterval(host.getClusterId()); + msList = getManagementServerList(hostId, dcId, orderedHostIdList, lbAlgorithm); } final MigrateAgentConnectionCommand cmd = new MigrateAgentConnectionCommand(msList, avoidMsList, lbAlgorithm, lbCheckInterval); - agentManager.easySend(host.getId(), cmd); //answer not received as the agent disconnects and reconnects to other ms - updateLastManagementServer(host.getId(), fromMsId); + cmd.setWait(60); + final Answer answer = agentManager.easySend(hostId, cmd); //may not receive answer when the agent disconnects immediately and try reconnecting to other ms host + if (answer != null && !answer.getResult()) { + logger.warn(String.format("Error while initiating migration of agent connection for host agent ID: %d - %s", hostId, answer.getDetails())); + } + updateLastManagementServer(hostId, fromMsId); + } catch (final Exception e) { + logger.error(String.format("Error migrating agent connection for host %d", hostId), e); } } - - return true; } private void updateLastManagementServer(long hostId, long msId) { diff --git a/server/src/test/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImplTest.java b/server/src/test/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImplTest.java index 0c0097393cae..1b9923ad3ea1 100644 --- a/server/src/test/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImplTest.java +++ b/server/src/test/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImplTest.java @@ -106,7 +106,7 @@ private void configureMocks() throws NoSuchFieldException, IllegalAccessExceptio List hostIds = hosts.stream().map(HostVO::getId).collect(Collectors.toList()); doReturn(hostIds).when(hostDao).findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(Mockito.anyLong(), - Mockito.eq(null), Mockito.anyList(), Mockito.anyList(), Mockito.anyList()); + Mockito.eq(null), Mockito.eq(null), Mockito.anyList(), Mockito.anyList(), Mockito.anyList()); } @Before @@ -203,14 +203,14 @@ public void testExceptionOnEmptyHostSetting() throws NoSuchFieldException, Illeg @Test public void testGetOrderedRunningHostIdsEmptyList() { doReturn(Collections.emptyList()).when(hostDao).findHostIdsByZoneClusterResourceStateTypeAndHypervisorType( - Mockito.eq(DC_1_ID), Mockito.eq(null), Mockito.anyList(), Mockito.anyList(), Mockito.anyList()); + Mockito.eq(DC_1_ID), Mockito.eq(null), Mockito.eq(null), Mockito.anyList(), Mockito.anyList(), Mockito.anyList()); Assert.assertTrue(agentMSLB.getOrderedHostIdList(DC_1_ID).isEmpty()); } @Test public void testGetOrderedRunningHostIdsOrderList() { doReturn(Arrays.asList(host4.getId(), host2.getId(), host1.getId(), host3.getId())).when(hostDao) - .findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(Mockito.eq(DC_1_ID), Mockito.eq(null), + .findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(Mockito.eq(DC_1_ID), Mockito.eq(null), Mockito.eq(null), Mockito.anyList(), Mockito.anyList(), Mockito.anyList()); Assert.assertEquals(Arrays.asList(host1.getId(), host2.getId(), host3.getId(), host4.getId()), agentMSLB.getOrderedHostIdList(DC_1_ID)); diff --git a/utils/src/main/java/com/cloud/utils/nio/HandlerFactory.java b/utils/src/main/java/com/cloud/utils/nio/HandlerFactory.java index 6f0f1945e01f..9493f24b92b4 100644 --- a/utils/src/main/java/com/cloud/utils/nio/HandlerFactory.java +++ b/utils/src/main/java/com/cloud/utils/nio/HandlerFactory.java @@ -25,7 +25,7 @@ * WorkerFactory creates and selects workers. */ public interface HandlerFactory { - public Task create(Task.Type type, Link link, byte[] data); + Task create(Task.Type type, Link link, byte[] data); default int getMaxConcurrentNewConnectionsCount() { return 0; } diff --git a/utils/src/main/java/com/cloud/utils/nio/Link.java b/utils/src/main/java/com/cloud/utils/nio/Link.java index 5404cd153439..4e68554eb499 100644 --- a/utils/src/main/java/com/cloud/utils/nio/Link.java +++ b/utils/src/main/java/com/cloud/utils/nio/Link.java @@ -617,8 +617,8 @@ public static boolean doHandshake(final SocketChannel socketChannel, final SSLEn final long timeTaken = System.currentTimeMillis() - startTimeMills; if (timeTaken > timeoutMillis) { - LOGGER.warn("SSL Handshake has taken more than {}ms to connect to: {}" + - " while status: {}. Please investigate this connection.", socketChannel.getRemoteAddress(), + LOGGER.warn("SSL Handshake has taken more than {} ms to connect to: {}" + + " while status: {}. Please investigate this connection.", timeoutMillis, socketChannel.getRemoteAddress(), handshakeStatus); return false; } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index 46d67feaaf30..d274973a6584 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -115,4 +115,8 @@ public void cleanUp() throws IOException { } logger.info("NioClient connection closed"); } + + public String getHost() { + return host; + } } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 98fa69716cd3..ed6b57482892 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -83,26 +83,19 @@ public abstract class NioConnection implements Callable { protected Set socketChannels = new HashSet<>(); protected Integer sslHandshakeTimeout = null; private final int factoryMaxNewConnectionsCount; + protected boolean blockNewConnections; public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { _name = name; _isRunning = false; + blockNewConnections = false; _selector = null; _port = port; _workers = workers; _factory = factory; this.factoryMaxNewConnectionsCount = factory.getMaxConcurrentNewConnectionsCount(); - _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, - new LinkedBlockingQueue<>(5 * workers), new NamedThreadFactory(name + "-Handler"), - new ThreadPoolExecutor.AbortPolicy()); - String sslHandshakeHandlerName = name + "-SSLHandshakeHandler"; - if (factoryMaxNewConnectionsCount > 0) { - _sslHandshakeExecutor = new ThreadPoolExecutor(0, this.factoryMaxNewConnectionsCount, 30, - TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory(sslHandshakeHandlerName), - new ThreadPoolExecutor.AbortPolicy()); - } else { - _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(sslHandshakeHandlerName)); - } + initWorkersExecutor(); + initSSLHandshakeExecutor(); } public void setCAService(final CAService caService) { @@ -127,10 +120,14 @@ public void start() throws NioConnectionException { _isStartup = true; if (_executor.isShutdown()) { - _executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<>(), new NamedThreadFactory(_name + "-Handler")); + initWorkersExecutor(); + } + if (_sslHandshakeExecutor.isShutdown()) { + initSSLHandshakeExecutor(); } _threadExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(this._name + "-NioConnectionHandler")); _isRunning = true; + blockNewConnections = false; _futureTask = _threadExecutor.submit(this); } @@ -138,12 +135,30 @@ public void stop() { _executor.shutdown(); _sslHandshakeExecutor.shutdown(); _isRunning = false; + blockNewConnections = true; if (_threadExecutor != null) { _futureTask.cancel(false); _threadExecutor.shutdown(); } } + private void initWorkersExecutor() { + _executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS, + new LinkedBlockingQueue<>(5 * _workers), new NamedThreadFactory(_name + "-Handler"), + new ThreadPoolExecutor.AbortPolicy()); + } + + private void initSSLHandshakeExecutor() { + String sslHandshakeHandlerName = _name + "-SSLHandshakeHandler"; + if (factoryMaxNewConnectionsCount > 0) { + _sslHandshakeExecutor = new ThreadPoolExecutor(0, this.factoryMaxNewConnectionsCount, 30, + TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory(sslHandshakeHandlerName), + new ThreadPoolExecutor.AbortPolicy()); + } else { + _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(sslHandshakeHandlerName)); + } + } + public boolean isRunning() { return !_futureTask.isDone(); } @@ -210,6 +225,16 @@ public Boolean call() throws NioConnectionException { abstract void unregisterLink(InetSocketAddress saddr); + protected boolean rejectConnectionIfBlocked(final SocketChannel socketChannel) throws IOException { + if (!blockNewConnections) { + return false; + } + logger.warn("Rejecting new connection as the server is blocked from accepting new connections"); + socketChannel.close(); + _selector.wakeup(); + return true; + } + protected boolean rejectConnectionIfBusy(final SocketChannel socketChannel) throws IOException { if (factoryMaxNewConnectionsCount <= 0 || _factory.getNewConnectionsCount() < factoryMaxNewConnectionsCount) { return false; @@ -226,7 +251,7 @@ protected boolean rejectConnectionIfBusy(final SocketChannel socketChannel) thro protected void accept(final SelectionKey key) throws IOException { final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); - if (rejectConnectionIfBusy(socketChannel)) { + if (rejectConnectionIfBlocked(socketChannel) || rejectConnectionIfBusy(socketChannel)) { return; } socketChannel.configureBlocking(false); @@ -520,6 +545,14 @@ public void cleanUp() throws IOException { } } + public void block() { + blockNewConnections = true; + } + + public void unblock() { + blockNewConnections = false; + } + public class ChangeRequest { public static final int REGISTER = 1; public static final int CHANGEOPS = 2;