Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions agent/src/main/java/com/cloud/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void start() {
logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...", e);
}
}
shell.updateConnectedHost();
shell.updateConnectedHost(((NioClient)connection).getHost());
scavengeOldAgentObjects();
}

Expand Down Expand Up @@ -617,15 +617,11 @@ public Task create(final Task.Type type, final Link link, final byte[] data) {
}

protected void reconnect(final Link link) {
reconnect(link, null, null, false);
reconnect(link, null, false);
}

protected void reconnect(final Link link, String preferredHost, List<String> avoidHostList, boolean forTransfer) {
protected void reconnect(final Link link, String preferredMSHost, boolean forTransfer) {
if (!(forTransfer || reconnectAllowed)) {
return;
}

if (!reconnectAllowed) {
logger.debug("Reconnect requested but it is not allowed {}", () -> getLinkLog(link));
return;
}
Expand All @@ -637,19 +633,26 @@ protected void reconnect(final Link link, String preferredHost, List<String> avo
serverResource.disconnected();
logger.info("Lost connection to host: {}. Attempting reconnection while we still have {} commands in progress.", shell.getConnectedHost(), commandsInProgress.get());
stopAndCleanupConnection(true);
String host = preferredMSHost;
if (org.apache.commons.lang3.StringUtils.isBlank(host)) {
host = shell.getNextHost();
}
List<String> avoidMSHostList = shell.getAvoidHosts();
do {
final String host = shell.getNextHost();
connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this);
logger.info("Reconnecting to host: {}", host);
try {
connection.start();
} catch (final NioConnectionException e) {
logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e);
stopAndCleanupConnection(false);
if (CollectionUtils.isEmpty(avoidMSHostList) || !avoidMSHostList.contains(host)) {
connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this);
logger.info("Reconnecting to host: {}", host);
try {
connection.start();
} catch (final NioConnectionException e) {
logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e);
stopAndCleanupConnection(false);
}
}
shell.getBackoffAlgorithm().waitBeforeRetry();
host = shell.getNextHost();
} while (!connection.isStartup());
shell.updateConnectedHost();
shell.updateConnectedHost(((NioClient)connection).getHost());
logger.info("Connected to the host: {}", shell.getConnectedHost());
}

Expand Down Expand Up @@ -922,7 +925,7 @@ private Answer setupAgentCertificate(final SetupCertificateCommand cmd) {
return new SetupCertificateAnswer(true);
}

private void processManagementServerList(final List<String> msList, final String lbAlgorithm, final Long lbCheckInterval) {
private void processManagementServerList(final List<String> msList, final List<String> avoidMsList, final String lbAlgorithm, final Long lbCheckInterval) {
if (CollectionUtils.isNotEmpty(msList) && StringUtils.isNotEmpty(lbAlgorithm)) {
try {
final String newMSHosts = String.format("%s%s%s", com.cloud.utils.StringUtils.toCSVList(msList), IAgentShell.hostLbAlgorithmSeparator, lbAlgorithm);
Expand All @@ -934,6 +937,7 @@ private void processManagementServerList(final List<String> msList, final String
throw new CloudRuntimeException("Could not persist received management servers list", e);
}
}
shell.setAvoidHosts(avoidMsList);
if ("shuffle".equals(lbAlgorithm)) {
scheduleHostLBCheckerTask(0);
} else {
Expand All @@ -942,16 +946,18 @@ private void processManagementServerList(final List<String> msList, final String
}

private Answer setupManagementServerList(final SetupMSListCommand cmd) {
processManagementServerList(cmd.getMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval());
processManagementServerList(cmd.getMsList(), cmd.getAvoidMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval());
return new SetupMSListAnswer(true);
}

private Answer migrateAgentToOtherMS(final MigrateAgentConnectionCommand cmd) {
try {
if (CollectionUtils.isNotEmpty(cmd.getMsList())) {
processManagementServerList(cmd.getMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval());
processManagementServerList(cmd.getMsList(), cmd.getAvoidMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval());
}
migrateAgentConnection(cmd.getAvoidMsList());
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("MigrateAgentConnection-Job")).schedule(() -> {
migrateAgentConnection(cmd.getAvoidMsList());
}, 3, TimeUnit.SECONDS);
} catch (Exception e) {
String errMsg = "Migrate agent connection failed, due to " + e.getMessage();
logger.debug(errMsg, e);
Expand All @@ -972,25 +978,26 @@ private void migrateAgentConnection(List<String> avoidMsList) {
throw new CloudRuntimeException("No other Management Server hosts to migrate");
}

String preferredHost = null;
String preferredMSHost = null;
for (String msHost : msHostsList) {
try (final Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(msHost, shell.getPort()), 5000);
preferredHost = msHost;
preferredMSHost = msHost;
break;
} catch (final IOException e) {
throw new CloudRuntimeException("Management server host: " + msHost + " is not reachable, to migrate connection");
}
}

if (preferredHost == null) {
if (preferredMSHost == null) {
throw new CloudRuntimeException("Management server host(s) are not reachable, to migrate connection");
}

logger.debug("Management server host " + preferredHost + " is found to be reachable, trying to reconnect");
logger.debug("Management server host " + preferredMSHost + " is found to be reachable, trying to reconnect");
shell.resetHostCounter();
shell.setAvoidHosts(avoidMsList);
shell.setConnectionTransfer(true);
reconnect(link, preferredHost, avoidMsList, true);
reconnect(link, preferredMSHost, true);
}

public void processResponse(final Response response, final Link link) {
Expand All @@ -1003,9 +1010,12 @@ public void processResponse(final Response response, final Link link) {
for (final IAgentControlListener listener : controlListeners) {
listener.processControlResponse(response, (AgentControlAnswer)answer);
}
} else if (answer instanceof PingAnswer && (((PingAnswer) answer).isSendStartup()) && reconnectAllowed) {
logger.info("Management server requested startup command to reinitialize the agent");
sendStartup(link);
} else if (answer instanceof PingAnswer) {
if ((((PingAnswer) answer).isSendStartup()) && reconnectAllowed) {
logger.info("Management server requested startup command to reinitialize the agent");
sendStartup(link);
}
shell.setAvoidHosts(((PingAnswer) answer).getAvoidMsList());
} else {
updateLastPingResponseTime();
}
Expand All @@ -1027,7 +1037,7 @@ public void processReadyCommand(final Command cmd) {
}

verifyAgentArch(ready.getArch());
processManagementServerList(ready.getMsHostList(), ready.getLbAlgorithm(), ready.getLbCheckInterval());
processManagementServerList(ready.getMsHostList(), ready.getAvoidMsHostList(), ready.getLbAlgorithm(), ready.getLbCheckInterval());

logger.info("Ready command is processed for agent [id: {}, uuid: {}, name: {}]", getId(), getUuid(), getName());
}
Expand Down Expand Up @@ -1374,26 +1384,26 @@ protected void runInContext() {
if (msList == null || msList.length < 1) {
return;
}
final String preferredHost = msList[0];
final String preferredMSHost = msList[0];
final String connectedHost = shell.getConnectedHost();
logger.debug("Running preferred host checker task, connected host={}, preferred host={}",
connectedHost, preferredHost);
if (preferredHost == null || preferredHost.equals(connectedHost) || link == null) {
connectedHost, preferredMSHost);
if (preferredMSHost == null || preferredMSHost.equals(connectedHost) || link == null) {
return;
}
boolean isHostUp = false;
try (final Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(preferredHost, shell.getPort()), 5000);
socket.connect(new InetSocketAddress(preferredMSHost, shell.getPort()), 5000);
isHostUp = true;
} catch (final IOException e) {
logger.debug("Host: {} is not reachable", preferredHost);
logger.debug("Host: {} is not reachable", preferredMSHost);
}
if (isHostUp && link != null && commandsInProgress.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("Preferred host {} is found to be reachable, trying to reconnect", preferredHost);
logger.debug("Preferred host {} is found to be reachable, trying to reconnect", preferredMSHost);
}
shell.resetHostCounter();
reconnect(link);
reconnect(link, preferredMSHost, false);
}
} catch (Throwable t) {
logger.error("Error caught while attempting to connect to preferred host", t);
Expand Down
19 changes: 14 additions & 5 deletions agent/src/main/java/com/cloud/agent/AgentShell.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class AgentShell implements IAgentShell, Daemon {
private String _zone;
private String _pod;
private String _host;
private List<String> _avoidHosts;
private String _privateIp;
private int _port;
private int _proxyPort;
Expand All @@ -76,7 +77,6 @@ public class AgentShell implements IAgentShell, Daemon {
private volatile boolean _exit = false;
private int _pingRetries;
private final List<Agent> _agents = new ArrayList<Agent>();
private String hostToConnect;
private String connectedHost;
private Long preferredHostCheckInterval;
private boolean connectionTransfer = false;
Expand Down Expand Up @@ -121,7 +121,7 @@ public String getNextHost() {
if (_hostCounter >= hosts.length) {
_hostCounter = 0;
}
hostToConnect = hosts[_hostCounter % hosts.length];
String hostToConnect = hosts[_hostCounter % hosts.length];
_hostCounter++;
return hostToConnect;
}
Expand All @@ -143,11 +143,10 @@ public long getLbCheckerInterval(final Long receivedLbInterval) {
}

@Override
public void updateConnectedHost() {
connectedHost = hostToConnect;
public void updateConnectedHost(String connectedHost) {
this.connectedHost = connectedHost;
}


@Override
public void resetHostCounter() {
_hostCounter = 0;
Expand All @@ -166,6 +165,16 @@ public void setHosts(final String host) {
}
}

@Override
public void setAvoidHosts(List<String> avoidHosts) {
_avoidHosts = avoidHosts;
}

@Override
public List<String> getAvoidHosts() {
return _avoidHosts;
}

@Override
public String getPrivateIp() {
return _privateIp;
Expand Down
7 changes: 6 additions & 1 deletion agent/src/main/java/com/cloud/agent/IAgentShell.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
package com.cloud.agent;

import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -63,9 +64,13 @@ public interface IAgentShell {

String[] getHosts();

void setAvoidHosts(List<String> hosts);

List<String> getAvoidHosts();

long getLbCheckerInterval(Long receivedLbInterval);

void updateConnectedHost();
void updateConnectedHost(String connectedHost);

String getConnectedHost();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public Property<Integer> getWorkers() {
* Data type: Integer.<br>
* Default value: <code>null</code>
*/
public static final Property<Integer> SSL_HANDSHAKE_TIMEOUT = new Property<>("ssl.handshake.timeout", null, Integer.class);
public static final Property<Integer> SSL_HANDSHAKE_TIMEOUT = new Property<>("ssl.handshake.timeout", 30, Integer.class);

public static class Property <T>{
private String name;
Expand Down
2 changes: 1 addition & 1 deletion agent/src/test/java/com/cloud/agent/AgentShellTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void updateAndGetConnectedHost() {
AgentShell shell = new AgentShell();
shell.setHosts("test");
shell.getNextHost();
shell.updateConnectedHost();
shell.updateConnectedHost("test");

Assert.assertEquals(expected, shell.getConnectedHost());
}
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/com/cloud/event/EventTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,13 @@ public class EventTypes {
//Purge resources
public static final String EVENT_PURGE_EXPUNGED_RESOURCES = "PURGE.EXPUNGED.RESOURCES";

// Management Server
public static final String EVENT_MS_MAINTENANCE_PREPARE = "MS.MAINTENANCE.PREPARE";
public static final String EVENT_MS_MAINTENANCE_CANCEL = "MS.MAINTENANCE.CANCEL";
public static final String EVENT_MS_SHUTDOWN_PREPARE = "MS.SHUTDOWN.PREPARE";
public static final String EVENT_MS_SHUTDOWN_CANCEL = "MS.SHUTDOWN.CANCEL";
public static final String EVENT_MS_SHUTDOWN = "MS.SHUTDOWN";

// OBJECT STORE
public static final String EVENT_OBJECT_STORE_CREATE = "OBJECT.STORE.CREATE";
public static final String EVENT_OBJECT_STORE_DELETE = "OBJECT.STORE.DELETE";
Expand Down Expand Up @@ -1233,6 +1240,12 @@ public class EventTypes {
entityEventDetails.put(EVENT_UPDATE_IMAGE_STORE_ACCESS_STATE, ImageStore.class);
entityEventDetails.put(EVENT_LIVE_PATCH_SYSTEMVM, "SystemVMs");

entityEventDetails.put(EVENT_MS_MAINTENANCE_PREPARE, "ManagementServer");
entityEventDetails.put(EVENT_MS_MAINTENANCE_CANCEL, "ManagementServer");
entityEventDetails.put(EVENT_MS_SHUTDOWN_PREPARE, "ManagementServer");
entityEventDetails.put(EVENT_MS_SHUTDOWN_CANCEL, "ManagementServer");
entityEventDetails.put(EVENT_MS_SHUTDOWN, "ManagementServer");

//Object Store
entityEventDetails.put(EVENT_OBJECT_STORE_CREATE, ObjectStore.class);
entityEventDetails.put(EVENT_OBJECT_STORE_UPDATE, ObjectStore.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,7 @@ public class ApiConstants {
public static final String PENDING_JOBS_COUNT = "pendingjobscount";
public static final String AGENTS_COUNT = "agentscount";
public static final String AGENTS = "agents";
public static final String LAST_AGENTS = "lastagents";

public static final String PUBLIC_MTU = "publicmtu";
public static final String PRIVATE_MTU = "privatemtu";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum ApiErrorCode {
UNSUPPORTED_ACTION_ERROR(432),
API_LIMIT_EXCEED(429),

SERVICE_UNAVAILABLE(503),
INTERNAL_ERROR(530),
ACCOUNT_ERROR(531),
ACCOUNT_RESOURCE_LIMIT_ERROR(532),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public class ManagementServerResponse extends BaseResponse {
@Param(description = "the Management Server Peers")
private List<PeerManagementServerNodeResponse> peers;

@SerializedName(ApiConstants.LAST_AGENTS)
@Param(description = "the last agents this Management Server is responsible for, before shutdown or preparing for maintenance", since = "4.21.0.0")
private List<String> lastAgents;

@SerializedName(ApiConstants.AGENTS)
@Param(description = "the agents this Management Server is responsible for", since = "4.21.0.0")
private List<String> agents;

@SerializedName(ApiConstants.AGENTS_COUNT)
@Param(description = "the number of host agents this Management Server is responsible for", since = "4.21.0.0")
private Long agentsCount;
Expand Down Expand Up @@ -134,6 +142,14 @@ public String getIpAddress() {
return ipAddress;
}

public List<String> getLastAgents() {
return lastAgents;
}

public List<String> getAgents() {
return agents;
}

public Long getAgentsCount() {
return this.agentsCount;
}
Expand Down Expand Up @@ -190,6 +206,14 @@ public void setIpAddress(String ipAddress) {
this.ipAddress = ipAddress;
}

public void setLastAgents(List<String> lastAgents) {
this.lastAgents = lastAgents;
}

public void setAgents(List<String> agents) {
this.agents = agents;
}

public void setAgentsCount(Long agentsCount) {
this.agentsCount = agentsCount;
}
Expand Down
Loading
Loading