99
1010package org .elasticsearch .repositories ;
1111
12+ import org .elasticsearch .action .admin .indices .stats .IndexStats ;
13+ import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
14+ import org .elasticsearch .action .admin .indices .stats .ShardStats ;
15+ import org .elasticsearch .common .unit .ByteSizeValue ;
1216import org .elasticsearch .common .util .CollectionUtils ;
1317import org .elasticsearch .plugins .Plugin ;
1418import org .elasticsearch .plugins .PluginsService ;
@@ -37,17 +41,34 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3741 }
3842
3943 public void testSnapshotAPMMetrics () throws Exception {
40- final String repositoryName = randomIdentifier ();
41-
42- createRepository (repositoryName , "mock" );
43-
4444 final String indexName = randomIdentifier ();
4545 final int numShards = randomIntBetween (1 , 10 );
4646 final int numReplicas = randomIntBetween (0 , 1 );
4747 createIndex (indexName , numShards , numReplicas );
4848
4949 indexRandom (true , indexName , randomIntBetween (100 , 300 ));
5050
51+ IndicesStatsResponse indicesStats = indicesAdmin ().prepareStats (indexName ).get ();
52+ IndexStats indexStats = indicesStats .getIndex (indexName );
53+ long totalSizeInBytes = 0 ;
54+ for (ShardStats shard : indexStats .getShards ()) {
55+ totalSizeInBytes += shard .getStats ().getStore ().sizeInBytes ();
56+ }
57+ logger .info ("--> total shards size: {} bytes" , totalSizeInBytes );
58+
59+ final String repositoryName = randomIdentifier ();
60+
61+ // we want to ensure some throttling, but not too much that it slows the test down. 5 seemed a reasonable multiple to ensure that.
62+ int shardSizeMultipleToEnsureThrottling = 5 ;
63+ createRepository (
64+ repositoryName ,
65+ "mock" ,
66+ randomRepositorySettings ().put (
67+ "max_snapshot_bytes_per_sec" ,
68+ ByteSizeValue .ofBytes (totalSizeInBytes * shardSizeMultipleToEnsureThrottling )
69+ ).put ("max_restore_bytes_per_sec" , ByteSizeValue .ofBytes (totalSizeInBytes * shardSizeMultipleToEnsureThrottling ))
70+ );
71+
5172 // Block the snapshot to test "snapshot shards in progress"
5273 blockAllDataNodes (repositoryName );
5374 final String snapshotName = randomIdentifier ();
@@ -70,9 +91,11 @@ public void testSnapshotAPMMetrics() throws Exception {
7091 final long snapshotElapsedTimeNanos = System .nanoTime () - beforeCreateSnapshotNanos ;
7192 collectMetrics ();
7293
73- // sanity check blobs and bytes metrics
94+ // sanity check blobs, bytes and throttling metrics
7495 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_BLOBS_UPLOADED ), greaterThan (0L ));
7596 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_BYTES_UPLOADED ), greaterThan (0L ));
97+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_CREATE_THROTTLE_DURATION ), greaterThan (0L ));
98+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), equalTo (0L ));
7699
77100 // sanity check duration values
78101 final long upperBoundTimeSpentOnSnapshotThingsNanos = internalCluster ().numDataNodes () * snapshotElapsedTimeNanos ;
@@ -89,6 +112,18 @@ public void testSnapshotAPMMetrics() throws Exception {
89112 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_COMPLETED ), equalTo ((long ) numShards ));
90113
91114 assertShardsInProgressMetricIs (everyItem (equalTo (0L )));
115+
116+ // Restore the snapshot
117+ clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , repositoryName , snapshotName )
118+ .setIndices (indexName )
119+ .setWaitForCompletion (true )
120+ .setRenamePattern ("(.+)" )
121+ .setRenameReplacement ("restored-$1" )
122+ .get ();
123+ collectMetrics ();
124+
125+ // assert we throttled on restore
126+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), greaterThan (0L ));
92127 }
93128
94129 private static void assertShardsInProgressMetricIs (Matcher <? super List <Long >> matcher ) {
0 commit comments