Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT
method: testRevertModelSnapshot
issue: https://github.com/elastic/elasticsearch/issues/132733
- class: org.elasticsearch.repositories.SnapshotMetricsIT
method: testSnapshotAPMMetrics
issue: https://github.com/elastic/elasticsearch/issues/132731
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
method: test {csv-spec:lookup-join.MvJoinKeyFromRowExpanded}
issue: https://github.com/elastic/elasticsearch/issues/132778
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

package org.elasticsearch.repositories;

import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand Down Expand Up @@ -77,48 +77,27 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.build();
}

public void testSnapshotAPMMetrics() throws Exception {
public void testSnapshotAPMMetrics() {
final String indexName = randomIdentifier();
final int numShards = randomIntBetween(1, 10);
final int numReplicas = randomIntBetween(0, 1);
createIndex(indexName, numShards, numReplicas);

indexRandom(true, indexName, randomIntBetween(100, 300));

final IndicesStatsResponse indicesStats = indicesAdmin().prepareStats(indexName).get();
final IndexStats indexStats = indicesStats.getIndex(indexName);
long totalSizeInBytes = 0;
for (ShardStats shard : indexStats.getShards()) {
totalSizeInBytes += shard.getStats().getStore().sizeInBytes();
}
logger.info("--> total shards size: {} bytes", totalSizeInBytes);

final String repositoryName = randomIdentifier();

// we want to ensure some throttling, but not so much that it makes the test excessively slow.
final int shardSizeMultipleToEnsureThrottling = 2;
createRepository(
repositoryName,
"mock",
randomRepositorySettings().put(
BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(),
ByteSizeValue.ofBytes(totalSizeInBytes * shardSizeMultipleToEnsureThrottling)
)
.put(
BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(),
ByteSizeValue.ofBytes(totalSizeInBytes * shardSizeMultipleToEnsureThrottling)
)
);
createRepository(repositoryName, "mock");

// Block the snapshot to test "snapshot shards in progress"
blockAllDataNodes(repositoryName);
final String snapshotName = randomIdentifier();
final long beforeCreateSnapshotNanos = System.nanoTime();
final ActionFuture<CreateSnapshotResponse> snapshotFuture;
try {
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setIndices(indexName)
.setWaitForCompletion(false)
.get();
.setWaitForCompletion(true)
.execute();

waitForBlockOnAnyDataNode(repositoryName);
collectMetrics();
Expand All @@ -132,15 +111,13 @@ public void testSnapshotAPMMetrics() throws Exception {
}

// wait for snapshot to finish to test the other metrics
awaitNumberOfSnapshotsInProgress(0);
safeGet(snapshotFuture);
Copy link
Contributor Author

@nicktindall nicktindall Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we wait for the response to be received that should mean the metrics should be written? (note waitForCompletion=true above now)

I'm running in a loop to try and confirm that now

Copy link
Contributor Author

@nicktindall nicktindall Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduced a source of flakiness in the tension between the throttling and safeGet. If the setting was too high, we exceeded the safeGet timeout, if the setting was too low, we didn't see any throttling.

For this reason I've moved out the test for the throttling metrics to another test. It has more complexity to ensure we see throttling and we don't run too long.

final TimeValue snapshotElapsedTime = TimeValue.timeValueNanos(System.nanoTime() - beforeCreateSnapshotNanos);
collectMetrics();

// sanity check blobs, bytes and throttling metrics
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_BLOBS_UPLOADED), greaterThan(0L));
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_BYTES_UPLOADED), greaterThan(0L));
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L));
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L));

assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L));
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_COMPLETED), equalTo(1L));
Expand All @@ -165,28 +142,14 @@ public void testSnapshotAPMMetrics() throws Exception {
getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION),
allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis))
);
assertThat(
getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION),
allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis))
);
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), equalTo(0L));
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L));

assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), equalTo((long) numShards));
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo((long) numShards));

assertShardsInProgressMetricIs(everyItem(equalTo(0L)));

// Restore the snapshot
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setIndices(indexName)
.setWaitForCompletion(true)
.setRenamePattern("(.+)")
.setRenameReplacement("restored-$1")
.get();
collectMetrics();

// assert we throttled on restore
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L));

// assert appropriate attributes are present
final Map<String, Object> expectedAttrs = Map.of(
"project_id",
Expand Down Expand Up @@ -215,14 +178,127 @@ public void testSnapshotAPMMetrics() throws Exception {
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED, expectedAttrsWithShardStage);
assertMetricsHaveAttributes(InstrumentType.DOUBLE_HISTOGRAM, SnapshotMetrics.SNAPSHOT_SHARDS_DURATION, expectedAttrsWithShardStage);

assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_BYTES_UPLOADED, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_BLOBS_UPLOADED, expectedAttrs);
}

public void testThrottlingMetrics() throws Exception {
final String indexName = randomIdentifier();
final int numShards = randomIntBetween(1, 10);
final int numReplicas = randomIntBetween(0, 1);
createIndex(indexName, numShards, numReplicas);
indexRandom(true, indexName, randomIntBetween(100, 120));

// Create a repository with restrictive throttling settings
final String repositoryName = randomIdentifier();
final Settings.Builder repositorySettings = randomRepositorySettings().put(
BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(),
ByteSizeValue.ofBytes(2048)
)
.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofBytes(2048))
// Small chunk size ensures we don't get stuck throttling for too long
.put("chunk_size", ByteSizeValue.ofBytes(100));
createRepository(repositoryName, "mock", repositorySettings, false);

final String snapshotName = randomIdentifier();
final ActionFuture<CreateSnapshotResponse> snapshotFuture;

// Kick off a snapshot
final long snapshotStartTime = System.currentTimeMillis();
snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setIndices(indexName)
.setWaitForCompletion(true)
.execute();

// Poll until we see some throttling occurring
final long snap_ts0 = System.currentTimeMillis();
assertBusy(() -> {
collectMetrics();
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L));
});
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L));

// Remove create throttling
final long snap_ts1 = System.currentTimeMillis();
createRepository(
repositoryName,
"mock",
repositorySettings.put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO),
false
);
final long snap_ts2 = System.currentTimeMillis();

// wait for the snapshot to finish
safeGet(snapshotFuture);
final long snap_ts3 = System.currentTimeMillis();

logger.info(
"saw throttling in [{}] remove throttling took [{}], snapshot took [{}]",
TimeValue.timeValueMillis(snap_ts1 - snap_ts0),
TimeValue.timeValueMillis(snap_ts2 - snap_ts1),
TimeValue.timeValueMillis(snap_ts3 - snap_ts2)
);

// Work out the maximum amount of concurrency per node
final ThreadPool tp = internalCluster().getDataNodeInstance(ThreadPool.class);
final int snapshotThreadPoolSize = tp.info(ThreadPool.Names.SNAPSHOT).getMax();
final int maximumPerNodeConcurrency = Math.max(snapshotThreadPoolSize, numShards);

// we should also have incurred some read duration due to the throttling
final long upperBoundTimeSpentOnSnapshotThingsMillis = internalCluster().numDataNodes() * maximumPerNodeConcurrency * (System
.currentTimeMillis() - snapshotStartTime);
assertThat(
getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION),
allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis))
);

// Restore the snapshot
final long restore_ts0 = System.currentTimeMillis();
ActionFuture<RestoreSnapshotResponse> restoreFuture = clusterAdmin().prepareRestoreSnapshot(
TEST_REQUEST_TIMEOUT,
repositoryName,
snapshotName
).setIndices(indexName).setWaitForCompletion(true).setRenamePattern("(.+)").setRenameReplacement("restored-$1").execute();

final long restore_ts1 = System.currentTimeMillis();
// assert we throttled on restore
assertBusy(() -> {
collectMetrics();
assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L));
});

final long restore_ts2 = System.currentTimeMillis();
// Remove restore throttling
createRepository(
repositoryName,
"mock",
repositorySettings.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it significant that verify is omitted here, which will default to true, while the previous createRepository() calls explicitly set it to false? Is this to save time and just verify at the end?

I see this is a PUT when I drill down, but createOrUpdateRepository() or putRepository() might help when reading. Just an observation though, this method and overrides have been around a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I set it to false because verify makes it take longer, when all we're trying to do is turn off the throttling. Good pick up, I've set it to false in 62f1935

Sadly overnight the test failed on line 105, another race condition where the snapshot thread blocks in the repo. before the SNAPSHOTS_STARTED counter is incremented. I'll fix that and run for another day again 🙄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is a PUT when I drill down, but createOrUpdateRepository() or putRepository() might help when reading. Just an observation though, this method and overrides have been around a long time.

Agree on the naming, I won't do that as part of this PR but I might follow up with a second one (it will probably add lots of noise)

safeGet(restoreFuture);
final long restore_ts3 = System.currentTimeMillis();

logger.info(
"saw throttling in [{}] remove throttling took [{}], restore took [{}]",
TimeValue.timeValueMillis(restore_ts1 - restore_ts0),
TimeValue.timeValueMillis(restore_ts2 - restore_ts1),
TimeValue.timeValueMillis(restore_ts3 - restore_ts2)
);

// assert appropriate attributes are present
final Map<String, Object> expectedAttrs = Map.of(
"project_id",
ProjectId.DEFAULT.id(),
"repo_name",
repositoryName,
"repo_type",
"mock"
);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs);
assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs);
}

public void testByStateCounts_InitAndQueuedShards() throws Exception {
final String indexName = randomIdentifier();
final int numShards = randomIntBetween(2, 10);
Expand All @@ -237,11 +313,13 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception {
blockAllDataNodes(repositoryName);

final String snapshotName = randomIdentifier();
final ActionFuture<CreateSnapshotResponse> firstSnapshotFuture;
final ActionFuture<CreateSnapshotResponse> secondSnapshotFuture;
try {
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
firstSnapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setIndices(indexName)
.setWaitForCompletion(false)
.get();
.setWaitForCompletion(true)
.execute();

waitForBlockOnAnyDataNode(repositoryName);

Expand All @@ -252,10 +330,12 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception {
assertThat(snapshotStates.get(SnapshotsInProgress.State.STARTED), equalTo(1L));

// Queue up another snapshot
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
secondSnapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
.setIndices(indexName)
.setWaitForCompletion(false)
.get();
.setWaitForCompletion(true)
.execute();

awaitNumberOfSnapshotsInProgress(2);

// Should be {numShards} in QUEUED and INIT states, and 2 STARTED snapshots
shardStates = getShardStates();
Expand All @@ -268,7 +348,8 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception {
}

// All statuses should return to zero when the snapshots complete
awaitNumberOfSnapshotsInProgress(0);
safeGet(firstSnapshotFuture);
safeGet(secondSnapshotFuture);
getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L)));
getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L)));

Expand Down Expand Up @@ -309,12 +390,13 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception {
blockNodeOnAnyFiles(repositoryName, nodeForRemoval);

final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final ActionFuture<CreateSnapshotResponse> snapshotFuture;
try {
// Kick off a snapshot
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
.setIndices(indexName)
.setWaitForCompletion(false)
.get();
.setWaitForCompletion(true)
.execute();

// Wait till we're blocked
waitForBlock(nodeForRemoval, repositoryName);
Expand All @@ -337,7 +419,7 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception {
clearShutdownMetadata(clusterService);

// All statuses should return to zero when the snapshot completes
awaitNumberOfSnapshotsInProgress(0);
safeGet(snapshotFuture);
getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L)));
getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L)));

Expand Down Expand Up @@ -395,10 +477,14 @@ public void testByStateCounts_WaitingShards() throws Exception {
safeAwait(handoffRequestBarrier);

// Kick off a snapshot
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
.setIndices(indexName)
.setWaitForCompletion(false)
.get();
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin().prepareCreateSnapshot(
TEST_REQUEST_TIMEOUT,
repositoryName,
randomIdentifier()
).setIndices(indexName).setWaitForCompletion(true).execute();

// Wait for the snapshot to start
awaitNumberOfSnapshotsInProgress(1);

// Wait till we see a shard in WAITING state
createSnapshotInStateListener(clusterService(), repositoryName, indexName, 1, SnapshotsInProgress.ShardState.WAITING);
Expand All @@ -413,7 +499,7 @@ public void testByStateCounts_WaitingShards() throws Exception {
safeAwait(handoffRequestBarrier);

// All statuses should return to zero when the snapshot completes
awaitNumberOfSnapshotsInProgress(0);
safeGet(snapshotFuture);
getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L)));
getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L)));

Expand Down