Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
scanDirectAgentToLoad();
}

private void scanDirectAgentToLoad() {
protected void scanDirectAgentToLoad() {
logger.trace("Begin scanning directly connected hosts");

// for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
Expand All @@ -212,11 +212,21 @@
logger.info("{} is detected down, but we have a forward attache running, disconnect this one before launching the host", host);
removeAgent(agentattache, Status.Disconnected);
} else {
continue;
logger.debug("Host {} status is {} but has an AgentAttache which is not forForward, try to load directly", host, host.getStatus());
Status hostStatus = investigate(agentattache);
if (Status.Up == hostStatus) {
/* Got ping response from host, bring it back */
logger.info("After investigation, Agent for host {} is determined to be up and running", host);
agentStatusTransitTo(host, Event.Ping, _nodeId);
} else {
logger.debug("After investigation, AgentAttache is not null but host status is {}, try to load directly {}", hostStatus, host);
loadDirectlyConnectedHost(host, false);
}
}
} else {
logger.debug("AgentAttache is null, loading directly connected {}", host);
loadDirectlyConnectedHost(host, false);
}
logger.debug("Loading directly connected {}", host);
loadDirectlyConnectedHost(host, false);
} catch (final Throwable e) {
logger.warn(" can not load directly connected {} due to ", host, e);
}
Expand Down Expand Up @@ -362,20 +372,20 @@
return;
}
if (!result) {
throw new CloudRuntimeException("Failed to propagate agent change request event:" + Event.ShutdownRequested + " to host:" + hostId);
throw new CloudRuntimeException(String.format("Failed to propagate agent change request event: %s to host: %s", Event.ShutdownRequested, hostId));

Check warning on line 375 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L375

Added line #L375 was not covered by tests
}
}

public void notifyNodesInCluster(final AgentAttache attache) {
logger.debug("Notifying other nodes of to disconnect");
final Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
final Command[] cmds = new Command[]{new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};

Check warning on line 381 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L381

Added line #L381 was not covered by tests
_clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
}

// notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
public void notifyNodesInClusterToScheduleHostScanTask() {
logger.debug("Notifying other MS nodes to run host scan task");
final Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()};
final Command[] cmds = new Command[]{new ScheduleHostScanTaskCommand()};

Check warning on line 388 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L388

Added line #L388 was not covered by tests
_clusterMgr.broadcast(0, _gson.toJson(cmds));
}

Expand Down Expand Up @@ -416,7 +426,7 @@
}
try {
logD(bytes, "Routing to peer");
Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine);
Link.write(ch, new ByteBuffer[]{ByteBuffer.wrap(bytes)}, sslEngine);

Check warning on line 429 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L429

Added line #L429 was not covered by tests
return true;
} catch (final IOException e) {
try {
Expand Down Expand Up @@ -625,7 +635,7 @@
}
final Request req = Request.parse(data);
final Command[] cmds = req.getCommands();
final CancelCommand cancel = (CancelCommand)cmds[0];
final CancelCommand cancel = (CancelCommand) cmds[0];

Check warning on line 638 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L638

Added line #L638 was not covered by tests
logD(data, "Cancel request received");
agent.cancel(cancel.getSequence());
final Long current = agent._currentSequence;
Expand All @@ -652,7 +662,7 @@
return;
} else {
if (agent instanceof Routable) {
final Routable cluster = (Routable)agent;
final Routable cluster = (Routable) agent;

Check warning on line 665 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L665

Added line #L665 was not covered by tests
cluster.routeToAgent(data);
} else {
agent.send(Request.parse(data));
Expand All @@ -669,7 +679,7 @@
if (mgmtId != -1 && mgmtId != _nodeId) {
routeToPeer(Long.toString(mgmtId), data);
if (Request.requiresSequentialExecution(data)) {
final AgentAttache attache = (AgentAttache)link.attachment();
final AgentAttache attache = (AgentAttache) link.attachment();

Check warning on line 682 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L682

Added line #L682 was not covered by tests
if (attache != null) {
attache.sendNext(Request.getSequence(data));
}
Expand Down Expand Up @@ -933,7 +943,7 @@
if (_agentToTransferIds.size() > 0) {
logger.debug("Found {} agents to transfer", _agentToTransferIds.size());
// for (Long hostId : _agentToTransferIds) {
for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext(); ) {
final Long hostId = iterator.next();
final AgentAttache attache = findAttache(hostId);

Expand Down Expand Up @@ -1074,7 +1084,7 @@
return;
}

final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) attache;

Check warning on line 1087 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1087

Added line #L1087 was not covered by tests

if (success) {

Expand Down Expand Up @@ -1125,10 +1135,10 @@
}

synchronized (_agents) {
final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache) _agents.get(hostId);

Check warning on line 1138 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1138

Added line #L1138 was not covered by tests
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(host);
final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) createAttache(host);

Check warning on line 1141 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1141

Added line #L1141 was not covered by tests
if (forwardAttache == null) {
logger.warn("Unable to create a forward attache for the host {} as a part of rebalance process", host);
return false;
Expand Down Expand Up @@ -1232,7 +1242,7 @@
}

if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted
final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
final ChangeAgentCommand cmd = (ChangeAgentCommand) cmds[0];

Check warning on line 1245 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1245

Added line #L1245 was not covered by tests

logger.debug("Intercepting command for agent change: agent {} event: {}", cmd.getAgentId(), cmd.getEvent());
boolean result = false;
Expand All @@ -1249,7 +1259,7 @@
answers[0] = new ChangeAgentAnswer(cmd, result);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
final TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];

Check warning on line 1262 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1262

Added line #L1262 was not covered by tests

logger.debug("Intercepting command for agent rebalancing: agent {} event: {}", cmd.getAgentId(), cmd.getEvent());
boolean result = false;
Expand All @@ -1268,7 +1278,7 @@
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];

Check warning on line 1281 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1281

Added line #L1281 was not covered by tests

logger.debug("Intercepting command to propagate event {} for host {} ({})", () -> cmd.getEvent().name(), cmd::getHostId, () -> _hostDao.findById(cmd.getHostId()));

Expand All @@ -1285,10 +1295,10 @@
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];

Check warning on line 1298 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1298

Added line #L1298 was not covered by tests
return handleScheduleHostScanTaskCommand(cmd);
} else if (cmds.length == 1 && cmds[0] instanceof BaseShutdownManagementServerHostCommand) {
final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand)cmds[0];
final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand) cmds[0];

Check warning on line 1301 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1301

Added line #L1301 was not covered by tests
return handleShutdownManagementServerHostCommand(cmd);
}

Expand Down Expand Up @@ -1323,7 +1333,7 @@
try {
shutdownManager.prepareForShutdown();
return "Successfully prepared for shutdown";
} catch(CloudRuntimeException e) {
} catch (CloudRuntimeException e) {

Check warning on line 1336 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1336

Added line #L1336 was not covered by tests
return e.getMessage();
}
}
Expand All @@ -1332,7 +1342,7 @@
try {
shutdownManager.triggerShutdown();
return "Successfully triggered shutdown";
} catch(CloudRuntimeException e) {
} catch (CloudRuntimeException e) {

Check warning on line 1345 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1345

Added line #L1345 was not covered by tests
return e.getMessage();
}
}
Expand All @@ -1341,7 +1351,7 @@
try {
shutdownManager.cancelShutdown();
return "Successfully prepared for shutdown";
} catch(CloudRuntimeException e) {
} catch (CloudRuntimeException e) {

Check warning on line 1354 in engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java#L1354

Added line #L1354 was not covered by tests
return e.getMessage();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.cloud.agent.manager;

import com.cloud.configuration.ManagementServiceConfiguration;
import com.cloud.ha.HighAvailabilityManagerImpl;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.resource.ResourceManagerImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ClusteredAgentManagerImplTest {

private HostDao _hostDao;
@Mock
ManagementServiceConfiguration _mgmtServiceConf;

@Before
public void setUp() throws Exception {
_hostDao = mock(HostDao.class);
}

@Test
public void scanDirectAgentToLoadNoHostsTest() {
ClusteredAgentManagerImpl clusteredAgentManagerImpl = mock(ClusteredAgentManagerImpl.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl, never()).findAttache(anyLong());
verify(clusteredAgentManagerImpl, never()).loadDirectlyConnectedHost(any(), anyBoolean());
}

@Test
public void scanDirectAgentToLoadHostWithoutAttacheTest() {
// Arrange
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class);
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(1L);
List hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
}

@Test
public void scanDirectAgentToLoadHostWithForwardAttacheTest() {
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(1L);
List hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
when(agentAttache.forForward()).thenReturn(Boolean.TRUE);
when(clusteredAgentManagerImpl.findAttache(1L)).thenReturn(agentAttache);

clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl).removeAgent(agentAttache, Status.Disconnected);
}

@Test
public void scanDirectAgentToLoadHostWithNonForwardAttacheTest() {
// Arrange
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(new ClusteredAgentManagerImpl());
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class);
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(0L);
List hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);

AgentAttache agentAttache = mock(AgentAttache.class);
when(agentAttache.forForward()).thenReturn(Boolean.FALSE);
when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache);
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).agentStatusTransitTo(hostVO, Status.Event.Ping, clusteredAgentManagerImpl._nodeId);
doReturn(Status.Up).when(clusteredAgentManagerImpl).investigate(agentAttache);

clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl).investigate(agentAttache);
verify(clusteredAgentManagerImpl).agentStatusTransitTo(hostVO, Status.Event.Ping, clusteredAgentManagerImpl._nodeId);
}

@Test
public void scanDirectAgentToLoadHostWithNonForwardAttacheAndDisconnectedTest() {
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class);
clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class);
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(0L);
List hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
when(agentAttache.forForward()).thenReturn(Boolean.FALSE);
when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache);
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl).investigate(agentAttache);
verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
}
}
Loading