@@ -5206,161 +5206,105 @@ private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) {
52065206
52075207 cctx .exchange ().onCoordinatorInitialized ();
52085208
5209- if (newCrdFut .restoreState ()) {
5210- GridDhtPartitionsFullMessage fullMsg = newCrdFut .fullMessage ();
5209+ GridDhtPartitionsFullMessage fullMsg = newCrdFut .fullMessage ();
52115210
5212- assert msgs .isEmpty () : msgs ;
5211+ assert msgs .isEmpty () : msgs ;
52135212
5214- if (fullMsg != null ) {
5215- if (log .isInfoEnabled ()) {
5216- log .info ("New coordinator restored state [ver=" + initialVersion () +
5217- ", resVer=" + fullMsg .resultTopologyVersion () + ']' );
5218- }
5219-
5220- synchronized (mux ) {
5221- state = ExchangeLocalState .DONE ;
5213+ if (fullMsg != null ) {
5214+ if (log .isInfoEnabled ()) {
5215+ log .info ("New coordinator restored state [ver=" + initialVersion () +
5216+ ", resVer=" + fullMsg .resultTopologyVersion () + ']' );
5217+ }
52225218
5223- finishState = new FinishState ( crd . id (), fullMsg . resultTopologyVersion (), fullMsg );
5224- }
5219+ synchronized ( mux ) {
5220+ state = ExchangeLocalState . DONE ;
52255221
5226- fullMsg .exchangeId (exchId );
5222+ finishState = new FinishState (crd .id (), fullMsg .resultTopologyVersion (), fullMsg );
5223+ }
52275224
5228- processFullMessage ( false , null , fullMsg );
5225+ fullMsg . exchangeId ( exchId );
52295226
5230- Map < ClusterNode , GridDhtPartitionsSingleMessage > msgs = newCrdFut . messages ( );
5227+ processFullMessage ( false , null , fullMsg );
52315228
5232- if (!F .isEmpty (msgs )) {
5233- Map <Integer , CacheGroupAffinityMessage > joinedNodeAff = new ConcurrentHashMap <>();
5229+ Map <ClusterNode , GridDhtPartitionsSingleMessage > msgs = newCrdFut .messages ();
52345230
5235- // Reserve at least 2 threads for system operations.
5236- int parallelismLvl = U . availableThreadCount ( cctx . kernalContext (), GridIoPolicy . SYSTEM_POOL , 2 );
5231+ if (! F . isEmpty ( msgs )) {
5232+ Map < Integer , CacheGroupAffinityMessage > joinedNodeAff = new ConcurrentHashMap <>( );
52375233
5238- try {
5239- U .doInParallel (
5240- parallelismLvl ,
5241- cctx .kernalContext ().pools ().getSystemExecutorService (),
5242- msgs .entrySet (),
5243- entry -> {
5244- this .msgs .put (entry .getKey ().id (), entry .getValue ());
5245-
5246- GridDhtPartitionsSingleMessage msg = entry .getValue ();
5247-
5248- Collection <Integer > affReq = msg .cacheGroupsAffinityRequest ();
5249-
5250- if (!F .isEmpty (affReq )) {
5251- CacheGroupAffinityMessage .createAffinityMessages (
5252- cctx ,
5253- fullMsg .resultTopologyVersion (),
5254- affReq ,
5255- joinedNodeAff
5256- );
5257- }
5234+ // Reserve at least 2 threads for system operations.
5235+ int parallelismLvl = U .availableThreadCount (cctx .kernalContext (), GridIoPolicy .SYSTEM_POOL , 2 );
52585236
5259- return null ;
5237+ try {
5238+ U .doInParallel (
5239+ parallelismLvl ,
5240+ cctx .kernalContext ().pools ().getSystemExecutorService (),
5241+ msgs .entrySet (),
5242+ entry -> {
5243+ this .msgs .put (entry .getKey ().id (), entry .getValue ());
5244+
5245+ GridDhtPartitionsSingleMessage msg = entry .getValue ();
5246+
5247+ Collection <Integer > affReq = msg .cacheGroupsAffinityRequest ();
5248+
5249+ if (!F .isEmpty (affReq )) {
5250+ CacheGroupAffinityMessage .createAffinityMessages (
5251+ cctx ,
5252+ fullMsg .resultTopologyVersion (),
5253+ affReq ,
5254+ joinedNodeAff
5255+ );
52605256 }
5261- );
5262- }
5263- catch (IgniteCheckedException e ) {
5264- throw new IgniteException (e );
5265- }
5266-
5267- Map <UUID , GridDhtPartitionsSingleMessage > mergedJoins = newCrdFut .mergedJoinExchangeMessages ();
5268-
5269- if (log .isInfoEnabled ()) {
5270- log .info ("New coordinator sends full message [ver=" + initialVersion () +
5271- ", resVer=" + fullMsg .resultTopologyVersion () +
5272- ", nodes=" + F .nodeIds (msgs .keySet ()) +
5273- ", mergedJoins=" + (mergedJoins != null ? mergedJoins .keySet () : null ) + ']' );
5274- }
52755257
5276- sendAllPartitions (fullMsg , msgs .keySet (), mergedJoins , joinedNodeAff );
5258+ return null ;
5259+ }
5260+ );
52775261 }
5278-
5279- return ;
5280- }
5281- else {
5282- if (log .isInfoEnabled ())
5283- log .info ("New coordinator restore state finished [ver=" + initialVersion () + ']' );
5284-
5285- for (Map .Entry <ClusterNode , GridDhtPartitionsSingleMessage > e : newCrdFut .messages ().entrySet ()) {
5286- GridDhtPartitionsSingleMessage msg = e .getValue ();
5287-
5288- if (!msg .client ()) {
5289- msgs .put (e .getKey ().id (), e .getValue ());
5290-
5291- if (dynamicCacheStartExchange () && msg .getError () != null )
5292- exchangeGlobalExceptions .put (e .getKey ().id (), msg .getError ());
5293-
5294- updatePartitionSingleMap (e .getKey ().id (), msg );
5295- }
5262+ catch (IgniteCheckedException e ) {
5263+ throw new IgniteException (e );
52965264 }
5297- }
5298-
5299- allRcvd = true ;
53005265
5301- synchronized (mux ) {
5302- remaining .clear (); // Do not process messages.
5303-
5304- assert crd != null && crd .isLocal ();
5266+ Map <UUID , GridDhtPartitionsSingleMessage > mergedJoins = newCrdFut .mergedJoinExchangeMessages ();
53055267
5306- state = ExchangeLocalState .CRD ;
5268+ if (log .isInfoEnabled ()) {
5269+ log .info ("New coordinator sends full message [ver=" + initialVersion () +
5270+ ", resVer=" + fullMsg .resultTopologyVersion () +
5271+ ", nodes=" + F .nodeIds (msgs .keySet ()) +
5272+ ", mergedJoins=" + (mergedJoins != null ? mergedJoins .keySet () : null ) + ']' );
5273+ }
53075274
5308- assert mergedJoinExchMsgs == null ;
5275+ sendAllPartitions ( fullMsg , msgs . keySet (), mergedJoins , joinedNodeAff ) ;
53095276 }
5277+
5278+ return ;
53105279 }
53115280 else {
5312- Set <UUID > remaining0 = null ;
5281+ if (log .isInfoEnabled ())
5282+ log .info ("New coordinator restore state finished [ver=" + initialVersion () + ']' );
53135283
5314- synchronized ( mux ) {
5315- assert crd != null && crd . isLocal ();
5284+ for ( Map . Entry < ClusterNode , GridDhtPartitionsSingleMessage > e : newCrdFut . messages (). entrySet () ) {
5285+ GridDhtPartitionsSingleMessage msg = e . getValue ();
53165286
5317- state = ExchangeLocalState .CRD ;
5287+ if (!msg .client ()) {
5288+ msgs .put (e .getKey ().id (), e .getValue ());
53185289
5319- assert mergedJoinExchMsgs == null ;
5290+ if (dynamicCacheStartExchange () && msg .getError () != null )
5291+ exchangeGlobalExceptions .put (e .getKey ().id (), msg .getError ());
53205292
5321- if (log .isInfoEnabled ()) {
5322- log .info ("New coordinator initialization finished [ver=" + initialVersion () +
5323- ", remaining=" + remaining + ']' );
5293+ updatePartitionSingleMap (e .getKey ().id (), msg );
53245294 }
5325-
5326- if (!remaining .isEmpty ())
5327- remaining0 = new HashSet <>(remaining );
53285295 }
5296+ }
53295297
5330- if (remaining0 != null ) {
5331- // It is possible that some nodes finished exchange with previous coordinator.
5332- GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest (exchId );
5298+ allRcvd = true ;
53335299
5334- for (UUID nodeId : remaining0 ) {
5335- try {
5336- if (!pendingSingleMsgs .containsKey (nodeId )) {
5337- if (log .isInfoEnabled ()) {
5338- log .info ("New coordinator sends request [ver=" + initialVersion () +
5339- ", node=" + nodeId + ']' );
5340- }
5300+ synchronized (mux ) {
5301+ remaining .clear (); // Do not process messages.
53415302
5342- cctx .io ().send (nodeId , req , SYSTEM_POOL );
5343- }
5344- }
5345- catch (ClusterTopologyCheckedException ignored ) {
5346- if (log .isDebugEnabled ())
5347- log .debug ("Node left during partition exchange [nodeId=" + nodeId +
5348- ", exchId=" + exchId + ']' );
5349- }
5350- catch (IgniteCheckedException e ) {
5351- U .error (log , "Failed to request partitions from node: " + nodeId , e );
5352- }
5353- }
5303+ assert crd != null && crd .isLocal ();
53545304
5355- for (Map .Entry <UUID , GridDhtPartitionsSingleMessage > m : pendingSingleMsgs .entrySet ()) {
5356- if (log .isInfoEnabled ()) {
5357- log .info ("New coordinator process pending message [ver=" + initialVersion () +
5358- ", node=" + m .getKey () + ']' );
5359- }
5305+ state = ExchangeLocalState .CRD ;
53605306
5361- processSingleMessage (m .getKey (), m .getValue ());
5362- }
5363- }
5307+ assert mergedJoinExchMsgs == null ;
53645308 }
53655309
53665310 if (allRcvd ) {
0 commit comments