1313import  org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotResponse ;
1414import  org .elasticsearch .action .admin .cluster .snapshots .restore .RestoreSnapshotResponse ;
1515import  org .elasticsearch .action .admin .indices .stats .ShardStats ;
16+ import  org .elasticsearch .action .support .ActionTestUtils ;
1617import  org .elasticsearch .cluster .ClusterInfoService ;
1718import  org .elasticsearch .cluster .ClusterInfoServiceUtils ;
1819import  org .elasticsearch .cluster .DiskUsageIntegTestCase ;
3435import  org .elasticsearch .snapshots .RestoreInfo ;
3536import  org .elasticsearch .snapshots .SnapshotInfo ;
3637import  org .elasticsearch .snapshots .SnapshotState ;
38+ import  org .elasticsearch .test .ClusterServiceUtils ;
3739import  org .elasticsearch .test .ESIntegTestCase ;
38- import  org .elasticsearch .test .junit .annotations .TestIssueLogging ;
3940import  org .hamcrest .Matcher ;
4041
4142import  java .util .Arrays ;
4243import  java .util .Comparator ;
4344import  java .util .HashSet ;
4445import  java .util .List ;
4546import  java .util .Set ;
47+ import  java .util .concurrent .CountDownLatch ;
4648import  java .util .concurrent .TimeUnit ;
4749import  java .util .concurrent .atomic .AtomicBoolean ;
4850
5456import  static  org .hamcrest .Matchers .contains ;
5557import  static  org .hamcrest .Matchers .empty ;
5658import  static  org .hamcrest .Matchers .equalTo ;
59+ import  static  org .hamcrest .Matchers .hasSize ;
5760import  static  org .hamcrest .Matchers .in ;
5861import  static  org .hamcrest .Matchers .is ;
62+ import  static  org .hamcrest .Matchers .lessThanOrEqualTo ;
5963
6064@ ESIntegTestCase .ClusterScope (scope  = ESIntegTestCase .Scope .TEST , numDataNodes  = 0 )
6165public  class  DiskThresholdDeciderIT  extends  DiskUsageIntegTestCase  {
@@ -163,20 +167,10 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
163167        assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , contains (in (shardSizes .getSmallestShardIds ())));
164168    }
165169
166-     @ AwaitsFix (bugUrl  = "https://github.com/elastic/elasticsearch/issues/105331" )
167-     @ TestIssueLogging (
168-         value  = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE," 
169-             + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG," 
170-             + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE," 
171-             + "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE," 
172-             + "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE," 
173-             + "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE" ,
174-         issueUrl  = "https://github.com/elastic/elasticsearch/issues/105331" 
175-     )
176-     public  void  testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards () throws  Exception  {
170+     public  void  testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores () {
177171        internalCluster ().startMasterOnlyNode ();
178-         internalCluster ().startDataOnlyNode ();
179172        final  String  dataNodeName  = internalCluster ().startDataOnlyNode ();
173+         internalCluster ().startDataOnlyNode ();
180174        ensureStableCluster (3 );
181175
182176        assertAcked (
@@ -185,26 +179,16 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
185179                .setSettings (Settings .builder ().put ("location" , randomRepoPath ()).put ("compress" , randomBoolean ()))
186180        );
187181
188-         final  AtomicBoolean  allowRelocations  = new  AtomicBoolean (true );
189182        final  InternalClusterInfoService  clusterInfoService  = getInternalClusterInfoService ();
190-         internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event  -> {
191-             ClusterInfoServiceUtils .refresh (clusterInfoService );
192-             if  (allowRelocations .get () == false ) {
193-                 assertThat (
194-                     "Expects no relocating shards but got: "  + event .state ().getRoutingNodes (),
195-                     numberOfShardsWithState (event .state ().getRoutingNodes (), ShardRoutingState .RELOCATING ),
196-                     equalTo (0 )
197-                 );
198-             }
199-         });
200- 
201-         final  String  dataNode0Id  = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
183+         internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
184+             .addListener (event  -> ClusterInfoServiceUtils .refresh (clusterInfoService ));
202185
203186        final  String  indexName  = randomIdentifier ();
204187        createIndex (indexName , indexSettings (6 , 0 ).put (INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING .getKey (), "0ms" ).build ());
205-         var  shardSizes  = createReasonableSizedShards (indexName );
188+         final   var  shardSizes  = createReasonableSizedShards (indexName );
206189
207190        final  CreateSnapshotResponse  createSnapshotResponse  = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
191+             .setIndices (indexName )
208192            .setWaitForCompletion (true )
209193            .get ();
210194        final  SnapshotInfo  snapshotInfo  = createSnapshotResponse .getSnapshotInfo ();
@@ -213,21 +197,82 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
213197
214198        assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
215199        updateClusterSettings (Settings .builder ().put (CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), Rebalance .NONE .toString ()));
216-         allowRelocations .set (false );
217200
218-         // reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated 
219-         var  usableSpace  = shardSizes .sizes ().get (1 ).size ();
201+         // Verify that from this point on we do not do any rebalancing 
202+         internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event  -> {
203+             assertThat (
204+                 "Expects no relocating shards but got: "  + event .state ().getRoutingNodes (),
205+                 numberOfShardsWithState (event .state ().getRoutingNodes (), ShardRoutingState .RELOCATING ),
206+                 equalTo (0 )
207+             );
208+         });
209+ 
210+         // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the 
211+         // other data node 
212+         final  var  usableSpace  = randomLongBetween (shardSizes .getSmallestShardSize (), shardSizes .getSmallestShardSize () * 2  - 1L );
220213        getTestFileStore (dataNodeName ).setTotalSpace (usableSpace  + WATERMARK_BYTES );
221214        refreshDiskUsage ();
222215
216+         // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the 
217+         // chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we 
218+         // trigger the whole chain by starting the first restore. 
219+         final  var  copyIndexName  = indexName  + "-copy" ;
220+ 
221+         // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node 
222+         final  var  dataNodeId  = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
223+         final  var  allShardsActiveListener  = ClusterServiceUtils .addTemporaryStateListener (cs  -> {
224+             assertThat (cs .getRoutingNodes ().toString (), cs .getRoutingNodes ().node (dataNodeId ).size (), lessThanOrEqualTo (1 ));
225+             var  seenCopy  = false ;
226+             for  (final  IndexRoutingTable  indexRoutingTable  : cs .routingTable ()) {
227+                 if  (indexRoutingTable .getIndex ().getName ().equals (copyIndexName )) {
228+                     seenCopy  = true ;
229+                 }
230+                 if  (indexRoutingTable .allShardsActive () == false ) {
231+                     return  false ;
232+                 }
233+             }
234+             return  seenCopy ; // only remove this listener when we've started both restores and all the resulting shards are complete 
235+         });
236+ 
237+         // set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore 
238+         final  var  secondRestoreCompleteLatch  = new  CountDownLatch (1 );
239+         final  var  secondRestoreStartedListener  = ClusterServiceUtils .addTemporaryStateListener (cs  -> {
240+             final  var  indexRoutingTable  = cs .routingTable ().index (indexName );
241+             if  (indexRoutingTable  != null  && indexRoutingTable .shardsWithState (ShardRoutingState .INITIALIZING ).isEmpty () == false ) {
242+                 clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
243+                     .setWaitForCompletion (true )
244+                     .setRenamePattern (indexName )
245+                     .setRenameReplacement (indexName  + "-copy" )
246+                     .execute (ActionTestUtils .assertNoFailureListener (restoreSnapshotResponse  -> {
247+                         final  RestoreInfo  restoreInfo  = restoreSnapshotResponse .getRestoreInfo ();
248+                         assertThat (restoreInfo .successfulShards (), is (snapshotInfo .totalShards ()));
249+                         assertThat (restoreInfo .failedShards (), is (0 ));
250+                         secondRestoreCompleteLatch .countDown ();
251+                     }));
252+                 return  true ;
253+             }
254+             return  false ;
255+         });
256+ 
257+         // now set the ball rolling by doing the first restore, waiting for it to complete 
223258        final  RestoreSnapshotResponse  restoreSnapshotResponse  = clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
224259            .setWaitForCompletion (true )
225260            .get ();
226261        final  RestoreInfo  restoreInfo  = restoreSnapshotResponse .getRestoreInfo ();
227262        assertThat (restoreInfo .successfulShards (), is (snapshotInfo .totalShards ()));
228263        assertThat (restoreInfo .failedShards (), is (0 ));
229264
230-         assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , contains (in (shardSizes .getShardIdsWithSizeSmallerOrEqual (usableSpace ))));
265+         // wait for the second restore to complete too 
266+         safeAwait (secondRestoreStartedListener );
267+         safeAwait (secondRestoreCompleteLatch );
268+ 
269+         // wait for all the shards to finish moving 
270+         safeAwait (allShardsActiveListener );
271+         ensureGreen (indexName , indexName  + "-copy" );
272+ 
273+         final  var  tinyNodeShardIds  = getShardIds (dataNodeId , indexName );
274+         assertThat (tinyNodeShardIds , hasSize (1 ));
275+         assertThat (tinyNodeShardIds .iterator ().next (), in (shardSizes .getShardIdsWithSizeSmallerOrEqual (usableSpace )));
231276    }
232277
233278    private  Set <ShardId > getShardIds (final  String  nodeId , final  String  indexName ) {
0 commit comments