From d73d2994cfd286c94334c96b9f1fad4fa36d3408 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 13 Aug 2025 12:02:18 +1000 Subject: [PATCH 01/11] Fix SnapshotMetricsIT race conditions Co-authored-by: Jeremy Dahlgren --- muted-tests.yml | 3 -- .../repositories/SnapshotMetricsIT.java | 53 ++++++++++++------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 8023d2cff90fa..41516c22eac19 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -504,9 +504,6 @@ tests: - class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT method: testRevertModelSnapshot issue: https://github.com/elastic/elasticsearch/issues/132733 -- class: org.elasticsearch.repositories.SnapshotMetricsIT - method: testSnapshotAPMMetrics - issue: https://github.com/elastic/elasticsearch/issues/132731 - class: org.elasticsearch.index.codec.vectors.cluster.HierarchicalKMeansTests method: testHKmeans issue: https://github.com/elastic/elasticsearch/issues/132771 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 91cf76c6f07e4..5eb23a79d732f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -9,6 +9,8 @@ package org.elasticsearch.repositories; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -114,11 +116,12 @@ public void testSnapshotAPMMetrics() throws Exception { blockAllDataNodes(repositoryName); final String snapshotName = randomIdentifier(); final long beforeCreateSnapshotNanos = System.nanoTime(); + final ActionFuture snapshotFuture; try { - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) + snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); waitForBlockOnAnyDataNode(repositoryName); collectMetrics(); @@ -132,7 +135,7 @@ public void testSnapshotAPMMetrics() throws Exception { } // wait for snapshot to finish to test the other metrics - awaitNumberOfSnapshotsInProgress(0); + safeGet(snapshotFuture); final TimeValue snapshotElapsedTime = TimeValue.timeValueNanos(System.nanoTime() - beforeCreateSnapshotNanos); collectMetrics(); @@ -237,11 +240,13 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception { blockAllDataNodes(repositoryName); final String snapshotName = randomIdentifier(); + final ActionFuture firstSnapshotFuture; + final ActionFuture secondSnapshotFuture; try { - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) + firstSnapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); waitForBlockOnAnyDataNode(repositoryName); @@ -252,10 +257,12 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception { assertThat(snapshotStates.get(SnapshotsInProgress.State.STARTED), equalTo(1L)); // Queue up another snapshot - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) + secondSnapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); + + awaitNumberOfSnapshotsInProgress(2); // Should be {numShards} in QUEUED and INIT states, and 2 STARTED snapshots shardStates = getShardStates(); @@ -268,7 +275,8 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception { } // All statuses should return to zero when the snapshots complete - awaitNumberOfSnapshotsInProgress(0); + safeGet(firstSnapshotFuture); + safeGet(secondSnapshotFuture); getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L))); getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L))); @@ -309,12 +317,13 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception { blockNodeOnAnyFiles(repositoryName, nodeForRemoval); final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final ActionFuture snapshotFuture; try { // Kick off a snapshot - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) + snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); // Wait till we're blocked waitForBlock(nodeForRemoval, repositoryName); @@ -337,7 +346,7 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception { clearShutdownMetadata(clusterService); // All statuses should return to zero when the snapshot completes - awaitNumberOfSnapshotsInProgress(0); + safeGet(snapshotFuture); getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L))); getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L))); @@ -395,10 +404,14 @@ public void testByStateCounts_WaitingShards() throws Exception { safeAwait(handoffRequestBarrier); // Kick off a snapshot - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) - .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + ActionFuture snapshotFuture = clusterAdmin().prepareCreateSnapshot( + TEST_REQUEST_TIMEOUT, + repositoryName, + randomIdentifier() + ).setIndices(indexName).setWaitForCompletion(true).execute(); + + // Wait for the snapshot to start + awaitNumberOfSnapshotsInProgress(1); // Wait till we see a shard in WAITING state createSnapshotInStateListener(clusterService(), repositoryName, indexName, 1, SnapshotsInProgress.ShardState.WAITING); @@ -413,7 +426,7 @@ public void testByStateCounts_WaitingShards() throws Exception { safeAwait(handoffRequestBarrier); // All statuses should return to zero when the snapshot completes - awaitNumberOfSnapshotsInProgress(0); + safeGet(snapshotFuture); getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L))); getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L))); From 2c66600fd41a82edf90f8134a0952be71da0ea6a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 13 Aug 2025 12:26:32 +1000 Subject: [PATCH 02/11] Increase throttling --- .../org/elasticsearch/repositories/SnapshotMetricsIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 5eb23a79d732f..176a5a677efc5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -98,7 +98,7 @@ public void testSnapshotAPMMetrics() throws Exception { final String repositoryName = randomIdentifier(); // we want to ensure some throttling, but not so much that it makes the test excessively slow. - final int shardSizeMultipleToEnsureThrottling = 2; + final int shardSizeMultipleToEnsureThrottling = 1; createRepository( repositoryName, "mock", @@ -404,7 +404,7 @@ public void testByStateCounts_WaitingShards() throws Exception { safeAwait(handoffRequestBarrier); // Kick off a snapshot - ActionFuture snapshotFuture = clusterAdmin().prepareCreateSnapshot( + final ActionFuture snapshotFuture = clusterAdmin().prepareCreateSnapshot( TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier() From f034e1d3a4b4923c521e8a8a2448370f97acf82e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 13 Aug 2025 16:08:24 +1000 Subject: [PATCH 03/11] Test throttle metrics separately --- .../repositories/SnapshotMetricsIT.java | 146 ++++++++++++------ 1 file changed, 98 insertions(+), 48 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 176a5a677efc5..8859b2671b54d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -11,9 +11,7 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; @@ -79,7 +77,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } - public void testSnapshotAPMMetrics() throws Exception { + public void testSnapshotAPMMetrics() { final String indexName = randomIdentifier(); final int numShards = randomIntBetween(1, 10); final int numReplicas = randomIntBetween(0, 1); @@ -87,30 +85,8 @@ public void testSnapshotAPMMetrics() throws Exception { indexRandom(true, indexName, randomIntBetween(100, 300)); - final IndicesStatsResponse indicesStats = indicesAdmin().prepareStats(indexName).get(); - final IndexStats indexStats = indicesStats.getIndex(indexName); - long totalSizeInBytes = 0; - for (ShardStats shard : indexStats.getShards()) { - totalSizeInBytes += shard.getStats().getStore().sizeInBytes(); - } - logger.info("--> total shards size: {} bytes", totalSizeInBytes); - final String repositoryName = randomIdentifier(); - - // we want to ensure some throttling, but not so much that it makes the test excessively slow. - final int shardSizeMultipleToEnsureThrottling = 1; - createRepository( - repositoryName, - "mock", - randomRepositorySettings().put( - BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), - ByteSizeValue.ofBytes(totalSizeInBytes * shardSizeMultipleToEnsureThrottling) - ) - .put( - BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), - ByteSizeValue.ofBytes(totalSizeInBytes * shardSizeMultipleToEnsureThrottling) - ) - ); + createRepository(repositoryName, "mock"); // Block the snapshot to test "snapshot shards in progress" blockAllDataNodes(repositoryName); @@ -142,8 +118,6 @@ public void testSnapshotAPMMetrics() throws Exception { // sanity check blobs, bytes and throttling metrics assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_BLOBS_UPLOADED), greaterThan(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_BYTES_UPLOADED), greaterThan(0L)); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L)); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_COMPLETED), equalTo(1L)); @@ -168,28 +142,14 @@ public void testSnapshotAPMMetrics() throws Exception { getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION), allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) ); - assertThat( - getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION), - allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) - ); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), equalTo(0L)); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), equalTo((long) numShards)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo((long) numShards)); assertShardsInProgressMetricIs(everyItem(equalTo(0L))); - // Restore the snapshot - clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) - .setIndices(indexName) - .setWaitForCompletion(true) - .setRenamePattern("(.+)") - .setRenameReplacement("restored-$1") - .get(); - collectMetrics(); - - // assert we throttled on restore - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L)); - // assert appropriate attributes are present final Map expectedAttrs = Map.of( "project_id", @@ -218,14 +178,104 @@ public void testSnapshotAPMMetrics() throws Exception { assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED, expectedAttrsWithShardStage); assertMetricsHaveAttributes(InstrumentType.DOUBLE_HISTOGRAM, SnapshotMetrics.SNAPSHOT_SHARDS_DURATION, expectedAttrsWithShardStage); - assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs); - assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs); - assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_BYTES_UPLOADED, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_BLOBS_UPLOADED, expectedAttrs); } + public void testThrottlingMetrics() throws Exception { + final String indexName = randomIdentifier(); + final int numShards = randomIntBetween(1, 10); + final int numReplicas = randomIntBetween(0, 1); + createIndex(indexName, numShards, numReplicas); + indexRandom(true, indexName, randomIntBetween(100, 120)); + + // Create a repository with restrictive throttling settings + final String repositoryName = randomIdentifier(); + final Settings.Builder repositorySettings = randomRepositorySettings().put( + BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), + ByteSizeValue.ofBytes(1024) + ) + .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofBytes(1024)) + // Small chunk size ensures we don't get stuck throttling for too long + .put("chunk_size", ByteSizeValue.ofBytes(100)); + createRepository(repositoryName, "mock", repositorySettings); + + final String snapshotName = randomIdentifier(); + final ActionFuture snapshotFuture; + + // Kick off a snapshot + final long snapshotStartTime = System.currentTimeMillis(); + snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) + .setIndices(indexName) + .setWaitForCompletion(true) + .execute(); + + // Poll until we see some throttling occurring + assertBusy(() -> { + collectMetrics(); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L)); + }); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); + + // Remove create throttling + createRepository( + repositoryName, + "mock", + repositorySettings.put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) + ); + + // wait for the snapshot to finish + safeGet(snapshotFuture); + + // Work out the maximum amount of concurrency per node + final ThreadPool tp = internalCluster().getDataNodeInstance(ThreadPool.class); + final int snapshotThreadPoolSize = tp.info(ThreadPool.Names.SNAPSHOT).getMax(); + final int maximumPerNodeConcurrency = Math.max(snapshotThreadPoolSize, numShards); + + // we should also have incurred some read duration due to the throttling + final long upperBoundTimeSpentOnSnapshotThingsMillis = internalCluster().numDataNodes() * maximumPerNodeConcurrency * (System + .currentTimeMillis() - snapshotStartTime); + assertThat( + getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION), + allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) + ); + + // Restore the snapshot + ActionFuture restoreFuture = clusterAdmin().prepareRestoreSnapshot( + TEST_REQUEST_TIMEOUT, + repositoryName, + snapshotName + ).setIndices(indexName).setWaitForCompletion(true).setRenamePattern("(.+)").setRenameReplacement("restored-$1").execute(); + + // assert we throttled on restore + assertBusy(() -> { + collectMetrics(); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L)); + }); + + // Remove restore throttling + createRepository( + repositoryName, + "mock", + repositorySettings.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) + ); + safeGet(restoreFuture); + + // assert appropriate attributes are present + final Map expectedAttrs = Map.of( + "project_id", + ProjectId.DEFAULT.id(), + "repo_name", + repositoryName, + "repo_type", + "mock" + ); + assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs); + assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs); + assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs); + } + public void testByStateCounts_InitAndQueuedShards() throws Exception { final String indexName = randomIdentifier(); final int numShards = randomIntBetween(2, 10); From 23497d462defcc7c3812b4c134046be0b7a098bc Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 14 Aug 2025 10:50:04 +1000 Subject: [PATCH 04/11] Increase byte_per_sec, don't verify on create/update repo, add logging for stages --- .../repositories/SnapshotMetricsIT.java | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 8859b2671b54d..8d8b885e6e393 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -194,12 +194,12 @@ public void testThrottlingMetrics() throws Exception { final String repositoryName = randomIdentifier(); final Settings.Builder repositorySettings = randomRepositorySettings().put( BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), - ByteSizeValue.ofBytes(1024) + ByteSizeValue.ofBytes(2048) ) - .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofBytes(1024)) + .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofBytes(2048)) // Small chunk size ensures we don't get stuck throttling for too long .put("chunk_size", ByteSizeValue.ofBytes(100)); - createRepository(repositoryName, "mock", repositorySettings); + createRepository(repositoryName, "mock", repositorySettings, false); final String snapshotName = randomIdentifier(); final ActionFuture snapshotFuture; @@ -212,6 +212,7 @@ public void testThrottlingMetrics() throws Exception { .execute(); // Poll until we see some throttling occurring + final long snap_ts0 = System.currentTimeMillis(); assertBusy(() -> { collectMetrics(); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L)); @@ -219,14 +220,25 @@ public void testThrottlingMetrics() throws Exception { assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); // Remove create throttling + final long snap_ts1 = System.currentTimeMillis(); createRepository( repositoryName, "mock", - repositorySettings.put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) + repositorySettings.put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO), + false ); + final long snap_ts2 = System.currentTimeMillis(); // wait for the snapshot to finish safeGet(snapshotFuture); + final long snap_ts3 = System.currentTimeMillis(); + + logger.info( + "saw throttling in [{}] remove throttling took [{}], snapshot took [{}]", + TimeValue.timeValueMillis(snap_ts1 - snap_ts0), + TimeValue.timeValueMillis(snap_ts2 - snap_ts1), + TimeValue.timeValueMillis(snap_ts3 - snap_ts2) + ); // Work out the maximum amount of concurrency per node final ThreadPool tp = internalCluster().getDataNodeInstance(ThreadPool.class); @@ -242,18 +254,21 @@ public void testThrottlingMetrics() throws Exception { ); // Restore the snapshot + final long restore_ts0 = System.currentTimeMillis(); ActionFuture restoreFuture = clusterAdmin().prepareRestoreSnapshot( TEST_REQUEST_TIMEOUT, repositoryName, snapshotName ).setIndices(indexName).setWaitForCompletion(true).setRenamePattern("(.+)").setRenameReplacement("restored-$1").execute(); + final long restore_ts1 = System.currentTimeMillis(); // assert we throttled on restore assertBusy(() -> { collectMetrics(); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L)); }); + final long restore_ts2 = System.currentTimeMillis(); // Remove restore throttling createRepository( repositoryName, @@ -261,6 +276,14 @@ public void testThrottlingMetrics() throws Exception { repositorySettings.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) ); safeGet(restoreFuture); + final long restore_ts3 = System.currentTimeMillis(); + + logger.info( + "saw throttling in [{}] remove throttling took [{}], restore took [{}]", + TimeValue.timeValueMillis(restore_ts1 - restore_ts0), + TimeValue.timeValueMillis(restore_ts2 - restore_ts1), + TimeValue.timeValueMillis(restore_ts3 - restore_ts2) + ); // assert appropriate attributes are present final Map expectedAttrs = Map.of( From 62f193515471240a8eeaf15b220fed51ac51b61b Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 15 Aug 2025 09:30:31 +1000 Subject: [PATCH 05/11] Don't verify repository when turning off restore throttling --- .../java/org/elasticsearch/repositories/SnapshotMetricsIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 8d8b885e6e393..5618f6ddcf162 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -273,7 +273,8 @@ public void testThrottlingMetrics() throws Exception { createRepository( repositoryName, "mock", - repositorySettings.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) + repositorySettings.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO), + false ); safeGet(restoreFuture); final long restore_ts3 = System.currentTimeMillis(); From ca4317883fe33d3b13cfab189463cc2cfe4ec4d8 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 15 Aug 2025 14:27:05 +1000 Subject: [PATCH 06/11] Fix the last(?) race condition --- .../elasticsearch/repositories/SnapshotMetricsIT.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 5618f6ddcf162..3bbbae49b2e12 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -77,7 +77,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } - public void testSnapshotAPMMetrics() { + public void testSnapshotAPMMetrics() throws Exception { final String indexName = randomIdentifier(); final int numShards = randomIntBetween(1, 10); final int numReplicas = randomIntBetween(0, 1); @@ -99,10 +99,15 @@ public void testSnapshotAPMMetrics() { .setWaitForCompletion(true) .execute(); + // We are able to wait for either the creation to complete (`wait_for_completion=false`), or the snapshot to complete + // (`wait_for_completion=true`), but not both. To know when the creation listeners complete, we must assertBusy + assertBusy(() -> { + collectMetrics(); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); + }); + waitForBlockOnAnyDataNode(repositoryName); - collectMetrics(); assertShardsInProgressMetricIs(hasItem(greaterThan(0L))); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_COMPLETED), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), greaterThan(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo(0L)); From db4d5c640d3b1db03e3809aed1e4fae44a3fee82 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 15 Aug 2025 15:10:34 +1000 Subject: [PATCH 07/11] Uuurgh --- .../org/elasticsearch/repositories/SnapshotMetricsIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 3bbbae49b2e12..9c929b9fda140 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -104,10 +104,8 @@ public void testSnapshotAPMMetrics() throws Exception { assertBusy(() -> { collectMetrics(); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); + assertShardsInProgressMetricIs(hasItem(greaterThan(0L))); }); - - waitForBlockOnAnyDataNode(repositoryName); - assertShardsInProgressMetricIs(hasItem(greaterThan(0L))); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_COMPLETED), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), greaterThan(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo(0L)); From 0a0b09a3d4173446ed67ea6b962088cbba08bbfb Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 19 Aug 2025 12:52:55 +1000 Subject: [PATCH 08/11] Remove remnants of project_id label --- .../repositories/SnapshotMetricsIT.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 87ef9d7972b02..1691e6fefa0ad 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -154,14 +153,7 @@ public void testSnapshotAPMMetrics() throws Exception { assertShardsInProgressMetricIs(everyItem(equalTo(0L))); // assert appropriate attributes are present - final Map expectedAttrs = Map.of( - "project_id", - ProjectId.DEFAULT.id(), - "repo_name", - repositoryName, - "repo_type", - "mock" - ); + final Map expectedAttrs = Map.of("repo_name", repositoryName, "repo_type", "mock"); final Map expectedAttrsWithShardStage = Maps.copyMapWithAddedEntry( expectedAttrs, "stage", @@ -290,14 +282,7 @@ public void testThrottlingMetrics() throws Exception { ); // assert appropriate attributes are present - final Map expectedAttrs = Map.of( - "project_id", - ProjectId.DEFAULT.id(), - "repo_name", - repositoryName, - "repo_type", - "mock" - ); + final Map expectedAttrs = Map.of("repo_name", repositoryName, "repo_type", "mock"); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs); From a39714154bea7930ece0abdae27bacb2913dde0e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 19 Aug 2025 12:53:55 +1000 Subject: [PATCH 09/11] Increase size of index to ensure upload takes time --- .../java/org/elasticsearch/repositories/SnapshotMetricsIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 1691e6fefa0ad..2b43937d443b6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -82,7 +82,7 @@ public void testSnapshotAPMMetrics() throws Exception { final int numReplicas = randomIntBetween(0, 1); createIndex(indexName, numShards, numReplicas); - indexRandom(true, indexName, randomIntBetween(100, 300)); + indexRandom(true, indexName, randomIntBetween(1000, 3000)); final String repositoryName = randomIdentifier(); createRepository(repositoryName, "mock"); From 48ed1e508d1f01c2bd48b02f8f9565e740f1dd90 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 19 Aug 2025 14:48:29 +1000 Subject: [PATCH 10/11] Try and fix upload duration flappiness --- .../repositories/SnapshotMetricsIT.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 2b43937d443b6..5aa88bcaa52d2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -82,10 +82,19 @@ public void testSnapshotAPMMetrics() throws Exception { final int numReplicas = randomIntBetween(0, 1); createIndex(indexName, numShards, numReplicas); - indexRandom(true, indexName, randomIntBetween(1000, 3000)); + indexRandom(true, indexName, randomIntBetween(3000, 5000)); final String repositoryName = randomIdentifier(); - createRepository(repositoryName, "mock"); + createRepository( + repositoryName, + "mock", + Settings.builder() + .put(randomRepositorySettings().build()) + // Making chunk size small and adding throttling increases the likelihood of upload duration being non-zero + .put("chunk_size", ByteSizeValue.ofKb(1)) + .put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ofMb(1)) + .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofMb(1)) + ); // Block the snapshot to test "snapshot shards in progress" blockAllDataNodes(repositoryName); @@ -144,8 +153,6 @@ public void testSnapshotAPMMetrics() throws Exception { getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION), allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) ); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), equalTo(0L)); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), equalTo((long) numShards)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo((long) numShards)); From 4dbe2ce82c62f2cbf6b4548a66df75cde0c4e441 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 19 Aug 2025 16:18:33 +1000 Subject: [PATCH 11/11] Use ofKb instead of ofBytes --- .../org/elasticsearch/repositories/SnapshotMetricsIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index 5aa88bcaa52d2..b6a84782ce333 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -196,9 +196,9 @@ public void testThrottlingMetrics() throws Exception { final String repositoryName = randomIdentifier(); final Settings.Builder repositorySettings = randomRepositorySettings().put( BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), - ByteSizeValue.ofBytes(2048) + ByteSizeValue.ofKb(2) ) - .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofBytes(2048)) + .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofKb(2)) // Small chunk size ensures we don't get stuck throttling for too long .put("chunk_size", ByteSizeValue.ofBytes(100)); createRepository(repositoryName, "mock", repositorySettings, false);