@@ -210,7 +210,7 @@ private void runDirectAgentScanTimerTask() {
210210 scanDirectAgentToLoad ();
211211 }
212212
213- private void scanDirectAgentToLoad () {
213+ protected void scanDirectAgentToLoad () {
214214 logger .trace ("Begin scanning directly connected hosts" );
215215
216216 // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
@@ -231,11 +231,21 @@ private void scanDirectAgentToLoad() {
231231 logger .info ("{} is detected down, but we have a forward attache running, disconnect this one before launching the host" , host );
232232 removeAgent (agentattache , Status .Disconnected );
233233 } else {
234- continue ;
234+ logger .debug ("Host {} status is {} but has an AgentAttache which is not forForward, try to load directly" , host , host .getStatus ());
235+ Status hostStatus = investigate (agentattache );
236+ if (Status .Up == hostStatus ) {
237+ /* Got ping response from host, bring it back */
238+ logger .info ("After investigation, Agent for host {} is determined to be up and running" , host );
239+ agentStatusTransitTo (host , Event .Ping , _nodeId );
240+ } else {
241+ logger .debug ("After investigation, AgentAttache is not null but host status is {}, try to load directly {}" , hostStatus , host );
242+ loadDirectlyConnectedHost (host , false );
243+ }
235244 }
245+ } else {
246+ logger .debug ("AgentAttache is null, loading directly connected {}" , host );
247+ loadDirectlyConnectedHost (host , false );
236248 }
237- logger .debug ("Loading directly connected {}" , host );
238- loadDirectlyConnectedHost (host , false );
239249 } catch (final Throwable e ) {
240250 logger .warn (" can not load directly connected {} due to " , host , e );
241251 }
@@ -381,20 +391,20 @@ public void reconnect(final long hostId) throws CloudRuntimeException, AgentUnav
381391 return ;
382392 }
383393 if (!result ) {
384- throw new CloudRuntimeException ("Failed to propagate agent change request event:" + Event . ShutdownRequested + " to host:" + hostId );
394+ throw new CloudRuntimeException (String . format ( "Failed to propagate agent change request event: %s to host: %s" , Event . ShutdownRequested , hostId ) );
385395 }
386396 }
387397
388398 public void notifyNodesInCluster (final AgentAttache attache ) {
389399 logger .debug ("Notifying other nodes of to disconnect" );
390- final Command [] cmds = new Command [] {new ChangeAgentCommand (attache .getId (), Event .AgentDisconnected )};
400+ final Command [] cmds = new Command []{new ChangeAgentCommand (attache .getId (), Event .AgentDisconnected )};
391401 _clusterMgr .broadcast (attache .getId (), _gson .toJson (cmds ));
392402 }
393403
394404 // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
395405 public void notifyNodesInClusterToScheduleHostScanTask () {
396406 logger .debug ("Notifying other MS nodes to run host scan task" );
397- final Command [] cmds = new Command [] {new ScheduleHostScanTaskCommand ()};
407+ final Command [] cmds = new Command []{new ScheduleHostScanTaskCommand ()};
398408 _clusterMgr .broadcast (0 , _gson .toJson (cmds ));
399409 }
400410
@@ -435,7 +445,7 @@ public boolean routeToPeer(final String peer, final byte[] bytes) {
435445 }
436446 try {
437447 logD (bytes , "Routing to peer" );
438- Link .write (ch , new ByteBuffer [] {ByteBuffer .wrap (bytes )}, sslEngine );
448+ Link .write (ch , new ByteBuffer []{ByteBuffer .wrap (bytes )}, sslEngine );
439449 return true ;
440450 } catch (final IOException e ) {
441451 try {
@@ -644,7 +654,7 @@ protected void doTask(final Task task) throws TaskExecutionException {
644654 }
645655 final Request req = Request .parse (data );
646656 final Command [] cmds = req .getCommands ();
647- final CancelCommand cancel = (CancelCommand )cmds [0 ];
657+ final CancelCommand cancel = (CancelCommand ) cmds [0 ];
648658 logD (data , "Cancel request received" );
649659 agent .cancel (cancel .getSequence ());
650660 final Long current = agent ._currentSequence ;
@@ -671,7 +681,7 @@ protected void doTask(final Task task) throws TaskExecutionException {
671681 return ;
672682 } else {
673683 if (agent instanceof Routable ) {
674- final Routable cluster = (Routable )agent ;
684+ final Routable cluster = (Routable ) agent ;
675685 cluster .routeToAgent (data );
676686 } else {
677687 agent .send (Request .parse (data ));
@@ -688,7 +698,7 @@ protected void doTask(final Task task) throws TaskExecutionException {
688698 if (mgmtId != -1 && mgmtId != _nodeId ) {
689699 routeToPeer (Long .toString (mgmtId ), data );
690700 if (Request .requiresSequentialExecution (data )) {
691- final AgentAttache attache = (AgentAttache )link .attachment ();
701+ final AgentAttache attache = (AgentAttache ) link .attachment ();
692702 if (attache != null ) {
693703 attache .sendNext (Request .getSequence (data ));
694704 }
@@ -961,7 +971,7 @@ protected void runInContext() {
961971 if (_agentToTransferIds .size () > 0 ) {
962972 logger .debug ("Found {} agents to transfer" , _agentToTransferIds .size ());
963973 // for (Long hostId : _agentToTransferIds) {
964- for (final Iterator <Long > iterator = _agentToTransferIds .iterator (); iterator .hasNext ();) {
974+ for (final Iterator <Long > iterator = _agentToTransferIds .iterator (); iterator .hasNext (); ) {
965975 final Long hostId = iterator .next ();
966976 final AgentAttache attache = findAttache (hostId );
967977
@@ -1105,7 +1115,7 @@ protected void finishRebalance(final long hostId, final long futureOwnerId, fina
11051115 return ;
11061116 }
11071117
1108- final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache )attache ;
1118+ final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache ) attache ;
11091119
11101120 if (success ) {
11111121
@@ -1156,10 +1166,10 @@ protected boolean startRebalance(final long hostId) {
11561166 }
11571167
11581168 synchronized (_agents ) {
1159- final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache )_agents .get (hostId );
1169+ final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache ) _agents .get (hostId );
11601170 if (attache != null && attache .getQueueSize () == 0 && attache .getNonRecurringListenersSize () == 0 ) {
11611171 handleDisconnectWithoutInvestigation (attache , Event .StartAgentRebalance , true , true );
1162- final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache )createAttache (host );
1172+ final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache ) createAttache (host );
11631173 if (forwardAttache == null ) {
11641174 logger .warn ("Unable to create a forward attache for the host {} as a part of rebalance process" , host );
11651175 return false ;
@@ -1263,7 +1273,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12631273 }
12641274
12651275 if (cmds .length == 1 && cmds [0 ] instanceof ChangeAgentCommand ) { // intercepted
1266- final ChangeAgentCommand cmd = (ChangeAgentCommand )cmds [0 ];
1276+ final ChangeAgentCommand cmd = (ChangeAgentCommand ) cmds [0 ];
12671277
12681278 logger .debug ("Intercepting command for agent change: agent {} event: {}" , cmd .getAgentId (), cmd .getEvent ());
12691279 boolean result = false ;
@@ -1280,7 +1290,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12801290 answers [0 ] = new ChangeAgentAnswer (cmd , result );
12811291 return _gson .toJson (answers );
12821292 } else if (cmds .length == 1 && cmds [0 ] instanceof TransferAgentCommand ) {
1283- final TransferAgentCommand cmd = (TransferAgentCommand )cmds [0 ];
1293+ final TransferAgentCommand cmd = (TransferAgentCommand ) cmds [0 ];
12841294
12851295 logger .debug ("Intercepting command for agent rebalancing: agent: {}, event: {}, connection transfer: {}" , cmd .getAgentId (), cmd .getEvent (), cmd .isConnectionTransfer ());
12861296 boolean result = false ;
@@ -1299,7 +1309,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12991309 answers [0 ] = new Answer (cmd , result , null );
13001310 return _gson .toJson (answers );
13011311 } else if (cmds .length == 1 && cmds [0 ] instanceof PropagateResourceEventCommand ) {
1302- final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand )cmds [0 ];
1312+ final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand ) cmds [0 ];
13031313
13041314 logger .debug ("Intercepting command to propagate event {} for host {} ({})" , () -> cmd .getEvent ().name (), cmd ::getHostId , () -> _hostDao .findById (cmd .getHostId ()));
13051315
@@ -1316,10 +1326,10 @@ public String dispatch(final ClusterServicePdu pdu) {
13161326 answers [0 ] = new Answer (cmd , result , null );
13171327 return _gson .toJson (answers );
13181328 } else if (cmds .length == 1 && cmds [0 ] instanceof ScheduleHostScanTaskCommand ) {
1319- final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand )cmds [0 ];
1329+ final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand ) cmds [0 ];
13201330 return handleScheduleHostScanTaskCommand (cmd );
13211331 } else if (cmds .length == 1 && cmds [0 ] instanceof BaseShutdownManagementServerHostCommand ) {
1322- final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand )cmds [0 ];
1332+ final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand ) cmds [0 ];
13231333 return handleShutdownManagementServerHostCommand (cmd );
13241334 }
13251335
@@ -1372,7 +1382,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13721382 try {
13731383 managementServerMaintenanceManager .prepareForShutdown ();
13741384 return "Successfully prepared for shutdown" ;
1375- } catch (CloudRuntimeException e ) {
1385+ } catch (CloudRuntimeException e ) {
13761386 return e .getMessage ();
13771387 }
13781388 }
@@ -1381,7 +1391,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13811391 try {
13821392 managementServerMaintenanceManager .triggerShutdown ();
13831393 return "Successfully triggered shutdown" ;
1384- } catch (CloudRuntimeException e ) {
1394+ } catch (CloudRuntimeException e ) {
13851395 return e .getMessage ();
13861396 }
13871397 }
0 commit comments