2727import java .nio .channels .ClosedChannelException ;
2828import java .nio .charset .Charset ;
2929import java .util .ArrayList ;
30+ import java .util .Arrays ;
3031import java .util .HashMap ;
3132import java .util .List ;
3233import java .util .Map ;
4041
4142import javax .naming .ConfigurationException ;
4243
44+ import com .cloud .agent .api .MigrateAgentConnectionAnswer ;
45+ import com .cloud .agent .api .MigrateAgentConnectionCommand ;
4346import com .cloud .resource .AgentStatusUpdater ;
4447import com .cloud .resource .ResourceStatusUpdater ;
4548import com .cloud .agent .api .PingAnswer ;
@@ -313,7 +316,6 @@ public void start() {
313316 }
314317 _shell .updateConnectedHost ();
315318 scavengeOldAgentObjects ();
316-
317319 }
318320
319321 public void stop (final String reason , final String detail ) {
@@ -477,13 +479,18 @@ public synchronized void lockStartupTask(final Link link) {
477479 }
478480
479481 public void sendStartup (final Link link ) {
482+ sendStartup (link , false );
483+ }
484+
485+ public void sendStartup (final Link link , boolean transfer ) {
480486 final StartupCommand [] startup = _resource .initialize ();
481487 if (startup != null ) {
482488 final String msHostList = _shell .getPersistentProperty (null , "host" );
483489 final Command [] commands = new Command [startup .length ];
484490 for (int i = 0 ; i < startup .length ; i ++) {
485491 setupStartupCommand (startup [i ]);
486492 startup [i ].setMSHostList (msHostList );
493+ startup [i ].setConnectionTransferred (transfer );
487494 commands [i ] = startup [i ];
488495 }
489496 final Request request = new Request (_id != null ? _id : -1 , -1 , commands , false , false );
@@ -541,9 +548,14 @@ public Task create(final Task.Type type, final Link link, final byte[] data) {
541548 }
542549
543550 protected void reconnect (final Link link ) {
544- if (!_reconnectAllowed ) {
551+ reconnect (link , null , null , false );
552+ }
553+
554+ protected void reconnect (final Link link , String preferredHost , List <String > avoidHostList , boolean forTransfer ) {
555+ if (!(forTransfer || _reconnectAllowed )) {
545556 return ;
546557 }
558+
547559 synchronized (this ) {
548560 if (_startup != null ) {
549561 _startup .cancel ();
@@ -575,22 +587,29 @@ protected void reconnect(final Link link) {
575587 _shell .getBackoffAlgorithm ().waitBeforeRetry ();
576588 }
577589
590+ String host = preferredHost ;
591+ if (StringUtils .isEmpty (host )) {
592+ host = _shell .getNextHost ();
593+ }
594+
578595 do {
579- final String host = _shell .getNextHost ();
580- _connection = new NioClient ("Agent" , host , _shell .getPort (), _shell .getWorkers (), this );
581- logger .info ("Reconnecting to host:{}" , host );
582- try {
583- _connection .start ();
584- } catch (final NioConnectionException e ) {
585- logger .info ("Attempted to re-connect to the server, but received an unexpected exception, trying again..." , e );
586- _connection .stop ();
596+ if (CollectionUtils .isEmpty (avoidHostList ) || !avoidHostList .contains (host )) {
597+ _connection = new NioClient ("Agent" , host , _shell .getPort (), _shell .getWorkers (), this );
598+ logger .info ("Reconnecting to host:{}" , host );
587599 try {
588- _connection .cleanUp ();
589- } catch (final IOException ex ) {
590- logger .warn ("Fail to clean up old connection. {}" , ex );
600+ _connection .start ();
601+ } catch (final NioConnectionException e ) {
602+ logger .info ("Attempted to re-connect to the server, but received an unexpected exception, trying again..." , e );
603+ _connection .stop ();
604+ try {
605+ _connection .cleanUp ();
606+ } catch (final IOException ex ) {
607+ logger .warn ("Fail to clean up old connection. {}" , ex );
608+ }
591609 }
592610 }
593611 _shell .getBackoffAlgorithm ().waitBeforeRetry ();
612+ host = _shell .getNextHost ();
594613 } while (!_connection .isStartup ());
595614 _shell .updateConnectedHost ();
596615 logger .info ("Connected to the host: {}" , _shell .getConnectedHost ());
@@ -703,6 +722,8 @@ protected void processRequest(final Request request, final Link link) {
703722 }
704723 } else if (cmd instanceof SetupMSListCommand ) {
705724 answer = setupManagementServerList ((SetupMSListCommand ) cmd );
725+ } else if (cmd instanceof MigrateAgentConnectionCommand ) {
726+ answer = migrateAgentToOtherMS ((MigrateAgentConnectionCommand ) cmd );
706727 } else {
707728 if (cmd instanceof ReadyCommand ) {
708729 processReadyCommand (cmd );
@@ -858,6 +879,53 @@ private Answer setupManagementServerList(final SetupMSListCommand cmd) {
858879 return new SetupMSListAnswer (true );
859880 }
860881
882+ private Answer migrateAgentToOtherMS (final MigrateAgentConnectionCommand cmd ) {
883+ try {
884+ if (CollectionUtils .isNotEmpty (cmd .getMsList ())) {
885+ processManagementServerList (cmd .getMsList (), cmd .getLbAlgorithm (), cmd .getLbCheckInterval ());
886+ }
887+ migrateAgentConnection (cmd .getAvoidMsList ());
888+ } catch (Exception e ) {
889+ String errMsg = "Migrate agent connection failed, due to " + e .getMessage ();
890+ logger .debug (errMsg , e );
891+ return new MigrateAgentConnectionAnswer (errMsg );
892+ }
893+ return new MigrateAgentConnectionAnswer (true );
894+ }
895+
896+ private void migrateAgentConnection (List <String > avoidMsList ) {
897+ final String [] msHosts = _shell .getHosts ();
898+ if (msHosts == null || msHosts .length < 1 ) {
899+ throw new CloudRuntimeException ("Management Server hosts empty, not properly configured in agent" );
900+ }
901+
902+ List <String > msHostsList = new ArrayList <>(Arrays .asList (msHosts ));
903+ msHostsList .removeAll (avoidMsList );
904+ if (msHostsList .isEmpty () || StringUtils .isEmpty (msHostsList .get (0 ))) {
905+ throw new CloudRuntimeException ("No other Management Server hosts to migrate" );
906+ }
907+
908+ String preferredHost = null ;
909+ for (String msHost : msHostsList ) {
910+ try (final Socket socket = new Socket ()) {
911+ socket .connect (new InetSocketAddress (msHost , _shell .getPort ()), 5000 );
912+ preferredHost = msHost ;
913+ break ;
914+ } catch (final IOException e ) {
915+ throw new CloudRuntimeException ("Management server host: " + msHost + " is not reachable, to migrate connection" );
916+ }
917+ }
918+
919+ if (preferredHost == null ) {
920+ throw new CloudRuntimeException ("Management server host(s) are not reachable, to migrate connection" );
921+ }
922+
923+ logger .debug ("Management server host " + preferredHost + " is found to be reachable, trying to reconnect" );
924+ _shell .resetHostCounter ();
925+ _shell .setConnectionTransfer (true );
926+ reconnect (_link , preferredHost , avoidMsList , true );
927+ }
928+
861929 public void processResponse (final Response response , final Link link ) {
862930 final Answer answer = response .getAnswer ();
863931 logger .debug ("Received response: {}" , response .toString ());
@@ -1153,7 +1221,8 @@ public void doTask(final Task task) throws TaskExecutionException {
11531221 if (task .getType () == Task .Type .CONNECT ) {
11541222 _shell .getBackoffAlgorithm ().reset ();
11551223 setLink (task .getLink ());
1156- sendStartup (task .getLink ());
1224+ sendStartup (task .getLink (), _shell .isConnectionTransfer ());
1225+ _shell .setConnectionTransfer (false );
11571226 } else if (task .getType () == Task .Type .DATA ) {
11581227 Request request ;
11591228 try {
@@ -1178,6 +1247,7 @@ public void doTask(final Task task) throws TaskExecutionException {
11781247 Thread .sleep (5000 );
11791248 } catch (InterruptedException e ) {
11801249 }
1250+ _shell .setConnectionTransfer (false );
11811251 reconnect (task .getLink ());
11821252 return ;
11831253 } else if (task .getType () == Task .Type .OTHER ) {
0 commit comments