1313import org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotResponse ;
1414import org .elasticsearch .action .admin .cluster .snapshots .restore .RestoreSnapshotResponse ;
1515import org .elasticsearch .action .admin .indices .stats .ShardStats ;
16+ import org .elasticsearch .action .support .ActionTestUtils ;
1617import org .elasticsearch .cluster .ClusterInfoService ;
1718import org .elasticsearch .cluster .ClusterInfoServiceUtils ;
1819import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
3435import org .elasticsearch .snapshots .RestoreInfo ;
3536import org .elasticsearch .snapshots .SnapshotInfo ;
3637import org .elasticsearch .snapshots .SnapshotState ;
38+ import org .elasticsearch .test .ClusterServiceUtils ;
3739import org .elasticsearch .test .ESIntegTestCase ;
38- import org .elasticsearch .test .junit .annotations .TestIssueLogging ;
3940import org .hamcrest .Matcher ;
4041
4142import java .util .Arrays ;
4243import java .util .Comparator ;
4344import java .util .HashSet ;
4445import java .util .List ;
4546import java .util .Set ;
47+ import java .util .concurrent .CountDownLatch ;
4648import java .util .concurrent .TimeUnit ;
4749import java .util .concurrent .atomic .AtomicBoolean ;
4850
5456import static org .hamcrest .Matchers .contains ;
5557import static org .hamcrest .Matchers .empty ;
5658import static org .hamcrest .Matchers .equalTo ;
59+ import static org .hamcrest .Matchers .hasSize ;
5760import static org .hamcrest .Matchers .in ;
5861import static org .hamcrest .Matchers .is ;
62+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
5963
6064@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
6165public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
@@ -163,20 +167,10 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
163167 assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , contains (in (shardSizes .getSmallestShardIds ())));
164168 }
165169
166- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/105331" )
167- @ TestIssueLogging (
168- value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE,"
169- + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG,"
170- + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE,"
171- + "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE,"
172- + "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE,"
173- + "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE" ,
174- issueUrl = "https://github.com/elastic/elasticsearch/issues/105331"
175- )
176- public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards () throws Exception {
170+ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores () {
177171 internalCluster ().startMasterOnlyNode ();
178- internalCluster ().startDataOnlyNode ();
179172 final String dataNodeName = internalCluster ().startDataOnlyNode ();
173+ internalCluster ().startDataOnlyNode ();
180174 ensureStableCluster (3 );
181175
182176 assertAcked (
@@ -185,26 +179,16 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
185179 .setSettings (Settings .builder ().put ("location" , randomRepoPath ()).put ("compress" , randomBoolean ()))
186180 );
187181
188- final AtomicBoolean allowRelocations = new AtomicBoolean (true );
189182 final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService ();
190- internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event -> {
191- ClusterInfoServiceUtils .refresh (clusterInfoService );
192- if (allowRelocations .get () == false ) {
193- assertThat (
194- "Expects no relocating shards but got: " + event .state ().getRoutingNodes (),
195- numberOfShardsWithState (event .state ().getRoutingNodes (), ShardRoutingState .RELOCATING ),
196- equalTo (0 )
197- );
198- }
199- });
200-
201- final String dataNode0Id = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
183+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
184+ .addListener (event -> ClusterInfoServiceUtils .refresh (clusterInfoService ));
202185
203186 final String indexName = randomIdentifier ();
204187 createIndex (indexName , indexSettings (6 , 0 ).put (INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING .getKey (), "0ms" ).build ());
205- var shardSizes = createReasonableSizedShards (indexName );
188+ final var shardSizes = createReasonableSizedShards (indexName );
206189
207190 final CreateSnapshotResponse createSnapshotResponse = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
191+ .setIndices (indexName )
208192 .setWaitForCompletion (true )
209193 .get ();
210194 final SnapshotInfo snapshotInfo = createSnapshotResponse .getSnapshotInfo ();
@@ -213,21 +197,82 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
213197
214198 assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
215199 updateClusterSettings (Settings .builder ().put (CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), Rebalance .NONE .toString ()));
216- allowRelocations .set (false );
217200
218- // reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated
219- var usableSpace = shardSizes .sizes ().get (1 ).size ();
201+ // Verify that from this point on we do not do any rebalancing
202+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event -> {
203+ assertThat (
204+ "Expects no relocating shards but got: " + event .state ().getRoutingNodes (),
205+ numberOfShardsWithState (event .state ().getRoutingNodes (), ShardRoutingState .RELOCATING ),
206+ equalTo (0 )
207+ );
208+ });
209+
210+ // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the
211+ // other data node
212+ final var usableSpace = randomLongBetween (shardSizes .getSmallestShardSize (), shardSizes .getSmallestShardSize () * 2 - 1L );
220213 getTestFileStore (dataNodeName ).setTotalSpace (usableSpace + WATERMARK_BYTES );
221214 refreshDiskUsage ();
222215
216+ // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the
217+ // chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we
218+ // trigger the whole chain by starting the first restore.
219+ final var copyIndexName = indexName + "-copy" ;
220+
221+ // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node
222+ final var dataNodeId = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
223+ final var allShardsActiveListener = ClusterServiceUtils .addTemporaryStateListener (cs -> {
224+ assertThat (cs .getRoutingNodes ().toString (), cs .getRoutingNodes ().node (dataNodeId ).size (), lessThanOrEqualTo (1 ));
225+ var seenCopy = false ;
226+ for (final IndexRoutingTable indexRoutingTable : cs .routingTable ()) {
227+ if (indexRoutingTable .getIndex ().getName ().equals (copyIndexName )) {
228+ seenCopy = true ;
229+ }
230+ if (indexRoutingTable .allShardsActive () == false ) {
231+ return false ;
232+ }
233+ }
234+ return seenCopy ; // only remove this listener when we've started both restores and all the resulting shards are complete
235+ });
236+
237+ // set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore
238+ final var secondRestoreCompleteLatch = new CountDownLatch (1 );
239+ final var secondRestoreStartedListener = ClusterServiceUtils .addTemporaryStateListener (cs -> {
240+ final var indexRoutingTable = cs .routingTable ().index (indexName );
241+ if (indexRoutingTable != null && indexRoutingTable .shardsWithState (ShardRoutingState .INITIALIZING ).isEmpty () == false ) {
242+ clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
243+ .setWaitForCompletion (true )
244+ .setRenamePattern (indexName )
245+ .setRenameReplacement (indexName + "-copy" )
246+ .execute (ActionTestUtils .assertNoFailureListener (restoreSnapshotResponse -> {
247+ final RestoreInfo restoreInfo = restoreSnapshotResponse .getRestoreInfo ();
248+ assertThat (restoreInfo .successfulShards (), is (snapshotInfo .totalShards ()));
249+ assertThat (restoreInfo .failedShards (), is (0 ));
250+ secondRestoreCompleteLatch .countDown ();
251+ }));
252+ return true ;
253+ }
254+ return false ;
255+ });
256+
257+ // now set the ball rolling by doing the first restore, waiting for it to complete
223258 final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
224259 .setWaitForCompletion (true )
225260 .get ();
226261 final RestoreInfo restoreInfo = restoreSnapshotResponse .getRestoreInfo ();
227262 assertThat (restoreInfo .successfulShards (), is (snapshotInfo .totalShards ()));
228263 assertThat (restoreInfo .failedShards (), is (0 ));
229264
230- assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , contains (in (shardSizes .getShardIdsWithSizeSmallerOrEqual (usableSpace ))));
265+ // wait for the second restore to complete too
266+ safeAwait (secondRestoreStartedListener );
267+ safeAwait (secondRestoreCompleteLatch );
268+
269+ // wait for all the shards to finish moving
270+ safeAwait (allShardsActiveListener );
271+ ensureGreen (indexName , indexName + "-copy" );
272+
273+ final var tinyNodeShardIds = getShardIds (dataNodeId , indexName );
274+ assertThat (tinyNodeShardIds , hasSize (1 ));
275+ assertThat (tinyNodeShardIds .iterator ().next (), in (shardSizes .getShardIdsWithSizeSmallerOrEqual (usableSpace )));
231276 }
232277
233278 private Set <ShardId > getShardIds (final String nodeId , final String indexName ) {
0 commit comments