99
1010package org .elasticsearch .repositories ;
1111
12- import org .elasticsearch .action .admin . indices . stats . IndexStats ;
13- import org .elasticsearch .action .admin .indices . stats . IndicesStatsResponse ;
14- import org .elasticsearch .action .admin .indices . stats . ShardStats ;
12+ import org .elasticsearch .action .ActionFuture ;
13+ import org .elasticsearch .action .admin .cluster . snapshots . create . CreateSnapshotResponse ;
14+ import org .elasticsearch .action .admin .cluster . snapshots . restore . RestoreSnapshotResponse ;
1515import org .elasticsearch .cluster .SnapshotsInProgress ;
1616import org .elasticsearch .cluster .metadata .IndexMetadata ;
17- import org .elasticsearch .cluster .metadata .ProjectId ;
1817import org .elasticsearch .cluster .service .ClusterService ;
1918import org .elasticsearch .common .settings .Settings ;
2019import org .elasticsearch .common .unit .ByteSizeValue ;
@@ -83,47 +82,38 @@ public void testSnapshotAPMMetrics() throws Exception {
8382 final int numReplicas = randomIntBetween (0 , 1 );
8483 createIndex (indexName , numShards , numReplicas );
8584
86- indexRandom (true , indexName , randomIntBetween (100 , 300 ));
87-
88- final IndicesStatsResponse indicesStats = indicesAdmin ().prepareStats (indexName ).get ();
89- final IndexStats indexStats = indicesStats .getIndex (indexName );
90- long totalSizeInBytes = 0 ;
91- for (ShardStats shard : indexStats .getShards ()) {
92- totalSizeInBytes += shard .getStats ().getStore ().sizeInBytes ();
93- }
94- logger .info ("--> total shards size: {} bytes" , totalSizeInBytes );
85+ indexRandom (true , indexName , randomIntBetween (3000 , 5000 ));
9586
9687 final String repositoryName = randomIdentifier ();
97-
98- // we want to ensure some throttling, but not so much that it makes the test excessively slow.
99- final int shardSizeMultipleToEnsureThrottling = 2 ;
10088 createRepository (
10189 repositoryName ,
10290 "mock" ,
103- randomRepositorySettings ().put (
104- BlobStoreRepository .MAX_SNAPSHOT_BYTES_PER_SEC .getKey (),
105- ByteSizeValue .ofBytes (totalSizeInBytes * shardSizeMultipleToEnsureThrottling )
106- )
107- .put (
108- BlobStoreRepository .MAX_RESTORE_BYTES_PER_SEC .getKey (),
109- ByteSizeValue .ofBytes (totalSizeInBytes * shardSizeMultipleToEnsureThrottling )
110- )
91+ Settings .builder ()
92+ .put (randomRepositorySettings ().build ())
93+ // Making chunk size small and adding throttling increases the likelihood of upload duration being non-zero
94+ .put ("chunk_size" , ByteSizeValue .ofKb (1 ))
95+ .put (BlobStoreRepository .MAX_SNAPSHOT_BYTES_PER_SEC .getKey (), ByteSizeValue .ofMb (1 ))
96+ .put (BlobStoreRepository .MAX_RESTORE_BYTES_PER_SEC .getKey (), ByteSizeValue .ofMb (1 ))
11197 );
11298
11399 // Block the snapshot to test "snapshot shards in progress"
114100 blockAllDataNodes (repositoryName );
115101 final String snapshotName = randomIdentifier ();
116102 final long beforeCreateSnapshotNanos = System .nanoTime ();
103+ final ActionFuture <CreateSnapshotResponse > snapshotFuture ;
117104 try {
118- clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
105+ snapshotFuture = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
119106 .setIndices (indexName )
120- .setWaitForCompletion (false )
121- .get ();
122-
123- waitForBlockOnAnyDataNode (repositoryName );
124- collectMetrics ();
125- assertShardsInProgressMetricIs (hasItem (greaterThan (0L )));
126- assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_STARTED ), equalTo (1L ));
107+ .setWaitForCompletion (true )
108+ .execute ();
109+
110+ // We are able to wait for either the creation to complete (`wait_for_completion=false`), or the snapshot to complete
111+ // (`wait_for_completion=true`), but not both. To know when the creation listeners complete, we must assertBusy
112+ assertBusy (() -> {
113+ collectMetrics ();
114+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_STARTED ), equalTo (1L ));
115+ assertShardsInProgressMetricIs (hasItem (greaterThan (0L )));
116+ });
127117 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_COMPLETED ), equalTo (0L ));
128118 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_STARTED ), greaterThan (0L ));
129119 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_COMPLETED ), equalTo (0L ));
@@ -132,15 +122,13 @@ public void testSnapshotAPMMetrics() throws Exception {
132122 }
133123
134124 // wait for snapshot to finish to test the other metrics
135- awaitNumberOfSnapshotsInProgress ( 0 );
125+ safeGet ( snapshotFuture );
136126 final TimeValue snapshotElapsedTime = TimeValue .timeValueNanos (System .nanoTime () - beforeCreateSnapshotNanos );
137127 collectMetrics ();
138128
139129 // sanity check blobs, bytes and throttling metrics
140130 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_BLOBS_UPLOADED ), greaterThan (0L ));
141131 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_BYTES_UPLOADED ), greaterThan (0L ));
142- assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_CREATE_THROTTLE_DURATION ), greaterThan (0L ));
143- assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), equalTo (0L ));
144132
145133 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_STARTED ), equalTo (1L ));
146134 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_COMPLETED ), equalTo (1L ));
@@ -165,37 +153,14 @@ public void testSnapshotAPMMetrics() throws Exception {
165153 getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_UPLOAD_DURATION ),
166154 allOf (greaterThan (0L ), lessThan (upperBoundTimeSpentOnSnapshotThingsMillis ))
167155 );
168- assertThat (
169- getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_UPLOAD_READ_DURATION ),
170- allOf (greaterThan (0L ), lessThan (upperBoundTimeSpentOnSnapshotThingsMillis ))
171- );
172156
173157 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_STARTED ), equalTo ((long ) numShards ));
174158 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_COMPLETED ), equalTo ((long ) numShards ));
175159
176160 assertShardsInProgressMetricIs (everyItem (equalTo (0L )));
177161
178- // Restore the snapshot
179- clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
180- .setIndices (indexName )
181- .setWaitForCompletion (true )
182- .setRenamePattern ("(.+)" )
183- .setRenameReplacement ("restored-$1" )
184- .get ();
185- collectMetrics ();
186-
187- // assert we throttled on restore
188- assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), greaterThan (0L ));
189-
190162 // assert appropriate attributes are present
191- final Map <String , Object > expectedAttrs = Map .of (
192- "project_id" ,
193- ProjectId .DEFAULT .id (),
194- "repo_name" ,
195- repositoryName ,
196- "repo_type" ,
197- "mock"
198- );
163+ final Map <String , Object > expectedAttrs = Map .of ("repo_name" , repositoryName , "repo_type" , "mock" );
199164 final Map <String , Object > expectedAttrsWithShardStage = Maps .copyMapWithAddedEntry (
200165 expectedAttrs ,
201166 "stage" ,
@@ -215,14 +180,121 @@ public void testSnapshotAPMMetrics() throws Exception {
215180 assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_SHARDS_COMPLETED , expectedAttrsWithShardStage );
216181 assertMetricsHaveAttributes (InstrumentType .DOUBLE_HISTOGRAM , SnapshotMetrics .SNAPSHOT_SHARDS_DURATION , expectedAttrsWithShardStage );
217182
218- assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION , expectedAttrs );
219- assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_CREATE_THROTTLE_DURATION , expectedAttrs );
220- assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_UPLOAD_READ_DURATION , expectedAttrs );
221183 assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_UPLOAD_DURATION , expectedAttrs );
222184 assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_BYTES_UPLOADED , expectedAttrs );
223185 assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_BLOBS_UPLOADED , expectedAttrs );
224186 }
225187
188+ public void testThrottlingMetrics () throws Exception {
189+ final String indexName = randomIdentifier ();
190+ final int numShards = randomIntBetween (1 , 10 );
191+ final int numReplicas = randomIntBetween (0 , 1 );
192+ createIndex (indexName , numShards , numReplicas );
193+ indexRandom (true , indexName , randomIntBetween (100 , 120 ));
194+
195+ // Create a repository with restrictive throttling settings
196+ final String repositoryName = randomIdentifier ();
197+ final Settings .Builder repositorySettings = randomRepositorySettings ().put (
198+ BlobStoreRepository .MAX_SNAPSHOT_BYTES_PER_SEC .getKey (),
199+ ByteSizeValue .ofKb (2 )
200+ )
201+ .put (BlobStoreRepository .MAX_RESTORE_BYTES_PER_SEC .getKey (), ByteSizeValue .ofKb (2 ))
202+ // Small chunk size ensures we don't get stuck throttling for too long
203+ .put ("chunk_size" , ByteSizeValue .ofBytes (100 ));
204+ createRepository (repositoryName , "mock" , repositorySettings , false );
205+
206+ final String snapshotName = randomIdentifier ();
207+ final ActionFuture <CreateSnapshotResponse > snapshotFuture ;
208+
209+ // Kick off a snapshot
210+ final long snapshotStartTime = System .currentTimeMillis ();
211+ snapshotFuture = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
212+ .setIndices (indexName )
213+ .setWaitForCompletion (true )
214+ .execute ();
215+
216+ // Poll until we see some throttling occurring
217+ final long snap_ts0 = System .currentTimeMillis ();
218+ assertBusy (() -> {
219+ collectMetrics ();
220+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_CREATE_THROTTLE_DURATION ), greaterThan (0L ));
221+ });
222+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), equalTo (0L ));
223+
224+ // Remove create throttling
225+ final long snap_ts1 = System .currentTimeMillis ();
226+ createRepository (
227+ repositoryName ,
228+ "mock" ,
229+ repositorySettings .put (BlobStoreRepository .MAX_SNAPSHOT_BYTES_PER_SEC .getKey (), ByteSizeValue .ZERO ),
230+ false
231+ );
232+ final long snap_ts2 = System .currentTimeMillis ();
233+
234+ // wait for the snapshot to finish
235+ safeGet (snapshotFuture );
236+ final long snap_ts3 = System .currentTimeMillis ();
237+
238+ logger .info (
239+ "saw throttling in [{}] remove throttling took [{}], snapshot took [{}]" ,
240+ TimeValue .timeValueMillis (snap_ts1 - snap_ts0 ),
241+ TimeValue .timeValueMillis (snap_ts2 - snap_ts1 ),
242+ TimeValue .timeValueMillis (snap_ts3 - snap_ts2 )
243+ );
244+
245+ // Work out the maximum amount of concurrency per node
246+ final ThreadPool tp = internalCluster ().getDataNodeInstance (ThreadPool .class );
247+ final int snapshotThreadPoolSize = tp .info (ThreadPool .Names .SNAPSHOT ).getMax ();
248+ final int maximumPerNodeConcurrency = Math .max (snapshotThreadPoolSize , numShards );
249+
250+ // we should also have incurred some read duration due to the throttling
251+ final long upperBoundTimeSpentOnSnapshotThingsMillis = internalCluster ().numDataNodes () * maximumPerNodeConcurrency * (System
252+ .currentTimeMillis () - snapshotStartTime );
253+ assertThat (
254+ getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_UPLOAD_READ_DURATION ),
255+ allOf (greaterThan (0L ), lessThan (upperBoundTimeSpentOnSnapshotThingsMillis ))
256+ );
257+
258+ // Restore the snapshot
259+ final long restore_ts0 = System .currentTimeMillis ();
260+ ActionFuture <RestoreSnapshotResponse > restoreFuture = clusterAdmin ().prepareRestoreSnapshot (
261+ TEST_REQUEST_TIMEOUT ,
262+ repositoryName ,
263+ snapshotName
264+ ).setIndices (indexName ).setWaitForCompletion (true ).setRenamePattern ("(.+)" ).setRenameReplacement ("restored-$1" ).execute ();
265+
266+ final long restore_ts1 = System .currentTimeMillis ();
267+ // assert we throttled on restore
268+ assertBusy (() -> {
269+ collectMetrics ();
270+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), greaterThan (0L ));
271+ });
272+
273+ final long restore_ts2 = System .currentTimeMillis ();
274+ // Remove restore throttling
275+ createRepository (
276+ repositoryName ,
277+ "mock" ,
278+ repositorySettings .put (BlobStoreRepository .MAX_RESTORE_BYTES_PER_SEC .getKey (), ByteSizeValue .ZERO ),
279+ false
280+ );
281+ safeGet (restoreFuture );
282+ final long restore_ts3 = System .currentTimeMillis ();
283+
284+ logger .info (
285+ "saw throttling in [{}] remove throttling took [{}], restore took [{}]" ,
286+ TimeValue .timeValueMillis (restore_ts1 - restore_ts0 ),
287+ TimeValue .timeValueMillis (restore_ts2 - restore_ts1 ),
288+ TimeValue .timeValueMillis (restore_ts3 - restore_ts2 )
289+ );
290+
291+ // assert appropriate attributes are present
292+ final Map <String , Object > expectedAttrs = Map .of ("repo_name" , repositoryName , "repo_type" , "mock" );
293+ assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_UPLOAD_READ_DURATION , expectedAttrs );
294+ assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION , expectedAttrs );
295+ assertMetricsHaveAttributes (InstrumentType .LONG_COUNTER , SnapshotMetrics .SNAPSHOT_CREATE_THROTTLE_DURATION , expectedAttrs );
296+ }
297+
226298 public void testByStateCounts_InitAndQueuedShards () throws Exception {
227299 final String indexName = randomIdentifier ();
228300 final int numShards = randomIntBetween (2 , 10 );
@@ -237,11 +309,13 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception {
237309 blockAllDataNodes (repositoryName );
238310
239311 final String snapshotName = randomIdentifier ();
312+ final ActionFuture <CreateSnapshotResponse > firstSnapshotFuture ;
313+ final ActionFuture <CreateSnapshotResponse > secondSnapshotFuture ;
240314 try {
241- clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
315+ firstSnapshotFuture = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
242316 .setIndices (indexName )
243- .setWaitForCompletion (false )
244- .get ();
317+ .setWaitForCompletion (true )
318+ .execute ();
245319
246320 waitForBlockOnAnyDataNode (repositoryName );
247321
@@ -252,10 +326,12 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception {
252326 assertThat (snapshotStates .get (SnapshotsInProgress .State .STARTED ), equalTo (1L ));
253327
254328 // Queue up another snapshot
255- clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , randomIdentifier ())
329+ secondSnapshotFuture = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , randomIdentifier ())
256330 .setIndices (indexName )
257- .setWaitForCompletion (false )
258- .get ();
331+ .setWaitForCompletion (true )
332+ .execute ();
333+
334+ awaitNumberOfSnapshotsInProgress (2 );
259335
260336 // Should be {numShards} in QUEUED and INIT states, and 2 STARTED snapshots
261337 shardStates = getShardStates ();
@@ -268,7 +344,8 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception {
268344 }
269345
270346 // All statuses should return to zero when the snapshots complete
271- awaitNumberOfSnapshotsInProgress (0 );
347+ safeGet (firstSnapshotFuture );
348+ safeGet (secondSnapshotFuture );
272349 getShardStates ().forEach ((key , value ) -> assertThat (value , equalTo (0L )));
273350 getSnapshotStates ().forEach ((key , value ) -> assertThat (value , equalTo (0L )));
274351
@@ -309,12 +386,13 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception {
309386 blockNodeOnAnyFiles (repositoryName , nodeForRemoval );
310387
311388 final ClusterService clusterService = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class );
389+ final ActionFuture <CreateSnapshotResponse > snapshotFuture ;
312390 try {
313391 // Kick off a snapshot
314- clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , randomIdentifier ())
392+ snapshotFuture = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , randomIdentifier ())
315393 .setIndices (indexName )
316- .setWaitForCompletion (false )
317- .get ();
394+ .setWaitForCompletion (true )
395+ .execute ();
318396
319397 // Wait till we're blocked
320398 waitForBlock (nodeForRemoval , repositoryName );
@@ -337,7 +415,7 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception {
337415 clearShutdownMetadata (clusterService );
338416
339417 // All statuses should return to zero when the snapshot completes
340- awaitNumberOfSnapshotsInProgress ( 0 );
418+ safeGet ( snapshotFuture );
341419 getShardStates ().forEach ((key , value ) -> assertThat (value , equalTo (0L )));
342420 getSnapshotStates ().forEach ((key , value ) -> assertThat (value , equalTo (0L )));
343421
@@ -395,10 +473,14 @@ public void testByStateCounts_WaitingShards() throws Exception {
395473 safeAwait (handoffRequestBarrier );
396474
397475 // Kick off a snapshot
398- clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , randomIdentifier ())
399- .setIndices (indexName )
400- .setWaitForCompletion (false )
401- .get ();
476+ final ActionFuture <CreateSnapshotResponse > snapshotFuture = clusterAdmin ().prepareCreateSnapshot (
477+ TEST_REQUEST_TIMEOUT ,
478+ repositoryName ,
479+ randomIdentifier ()
480+ ).setIndices (indexName ).setWaitForCompletion (true ).execute ();
481+
482+ // Wait for the snapshot to start
483+ awaitNumberOfSnapshotsInProgress (1 );
402484
403485 // Wait till we see a shard in WAITING state
404486 createSnapshotInStateListener (clusterService (), repositoryName , indexName , 1 , SnapshotsInProgress .ShardState .WAITING );
@@ -413,7 +495,7 @@ public void testByStateCounts_WaitingShards() throws Exception {
413495 safeAwait (handoffRequestBarrier );
414496
415497 // All statuses should return to zero when the snapshot completes
416- awaitNumberOfSnapshotsInProgress ( 0 );
498+ safeGet ( snapshotFuture );
417499 getShardStates ().forEach ((key , value ) -> assertThat (value , equalTo (0L )));
418500 getSnapshotStates ().forEach ((key , value ) -> assertThat (value , equalTo (0L )));
419501
0 commit comments