diff --git a/muted-tests.yml b/muted-tests.yml index 70054a292efb2..86b293eac1d80 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -471,9 +471,6 @@ tests: - class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT method: testRevertModelSnapshot issue: https://github.com/elastic/elasticsearch/issues/132733 -- class: org.elasticsearch.repositories.SnapshotMetricsIT - method: testSnapshotAPMMetrics - issue: https://github.com/elastic/elasticsearch/issues/132731 - class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT method: test {csv-spec:lookup-join.MvJoinKeyFromRowExpanded} issue: https://github.com/elastic/elasticsearch/issues/132778 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java index b0b1680ce6fb0..b6a84782ce333 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/SnapshotMetricsIT.java @@ -9,12 +9,11 @@ package org.elasticsearch.repositories; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -83,47 +82,38 @@ public void testSnapshotAPMMetrics() throws Exception { final int numReplicas = randomIntBetween(0, 1); createIndex(indexName, numShards, numReplicas); - indexRandom(true, indexName, randomIntBetween(100, 300)); - - final IndicesStatsResponse indicesStats = indicesAdmin().prepareStats(indexName).get(); - final IndexStats indexStats = indicesStats.getIndex(indexName); - long totalSizeInBytes = 0; - for (ShardStats shard : indexStats.getShards()) { - totalSizeInBytes += shard.getStats().getStore().sizeInBytes(); - } - logger.info("--> total shards size: {} bytes", totalSizeInBytes); + indexRandom(true, indexName, randomIntBetween(3000, 5000)); final String repositoryName = randomIdentifier(); - - // we want to ensure some throttling, but not so much that it makes the test excessively slow. - final int shardSizeMultipleToEnsureThrottling = 2; createRepository( repositoryName, "mock", - randomRepositorySettings().put( - BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), - ByteSizeValue.ofBytes(totalSizeInBytes * shardSizeMultipleToEnsureThrottling) - ) - .put( - BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), - ByteSizeValue.ofBytes(totalSizeInBytes * shardSizeMultipleToEnsureThrottling) - ) + Settings.builder() + .put(randomRepositorySettings().build()) + // Making chunk size small and adding throttling increases the likelihood of upload duration being non-zero + .put("chunk_size", ByteSizeValue.ofKb(1)) + .put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ofMb(1)) + .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofMb(1)) ); // Block the snapshot to test "snapshot shards in progress" blockAllDataNodes(repositoryName); final String snapshotName = randomIdentifier(); final long beforeCreateSnapshotNanos = System.nanoTime(); + final ActionFuture snapshotFuture; try { - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) + snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); - - waitForBlockOnAnyDataNode(repositoryName); - collectMetrics(); - assertShardsInProgressMetricIs(hasItem(greaterThan(0L))); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); + .setWaitForCompletion(true) + .execute(); + + // We are able to wait for either the creation to complete (`wait_for_completion=false`), or the snapshot to complete + // (`wait_for_completion=true`), but not both. To know when the creation listeners complete, we must assertBusy + assertBusy(() -> { + collectMetrics(); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); + assertShardsInProgressMetricIs(hasItem(greaterThan(0L))); + }); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_COMPLETED), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), greaterThan(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo(0L)); @@ -132,15 +122,13 @@ public void testSnapshotAPMMetrics() throws Exception { } // wait for snapshot to finish to test the other metrics - awaitNumberOfSnapshotsInProgress(0); + safeGet(snapshotFuture); final TimeValue snapshotElapsedTime = TimeValue.timeValueNanos(System.nanoTime() - beforeCreateSnapshotNanos); collectMetrics(); // sanity check blobs, bytes and throttling metrics assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_BLOBS_UPLOADED), greaterThan(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_BYTES_UPLOADED), greaterThan(0L)); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L)); - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_STARTED), equalTo(1L)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOTS_COMPLETED), equalTo(1L)); @@ -165,37 +153,14 @@ public void testSnapshotAPMMetrics() throws Exception { getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION), allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) ); - assertThat( - getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION), - allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) - ); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_STARTED), equalTo((long) numShards)); assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED), equalTo((long) numShards)); assertShardsInProgressMetricIs(everyItem(equalTo(0L))); - // Restore the snapshot - clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) - .setIndices(indexName) - .setWaitForCompletion(true) - .setRenamePattern("(.+)") - .setRenameReplacement("restored-$1") - .get(); - collectMetrics(); - - // assert we throttled on restore - assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L)); - // assert appropriate attributes are present - final Map expectedAttrs = Map.of( - "project_id", - ProjectId.DEFAULT.id(), - "repo_name", - repositoryName, - "repo_type", - "mock" - ); + final Map expectedAttrs = Map.of("repo_name", repositoryName, "repo_type", "mock"); final Map expectedAttrsWithShardStage = Maps.copyMapWithAddedEntry( expectedAttrs, "stage", @@ -215,14 +180,121 @@ public void testSnapshotAPMMetrics() throws Exception { assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_SHARDS_COMPLETED, expectedAttrsWithShardStage); assertMetricsHaveAttributes(InstrumentType.DOUBLE_HISTOGRAM, SnapshotMetrics.SNAPSHOT_SHARDS_DURATION, expectedAttrsWithShardStage); - assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs); - assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs); - assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_DURATION, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_BYTES_UPLOADED, expectedAttrs); assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_BLOBS_UPLOADED, expectedAttrs); } + public void testThrottlingMetrics() throws Exception { + final String indexName = randomIdentifier(); + final int numShards = randomIntBetween(1, 10); + final int numReplicas = randomIntBetween(0, 1); + createIndex(indexName, numShards, numReplicas); + indexRandom(true, indexName, randomIntBetween(100, 120)); + + // Create a repository with restrictive throttling settings + final String repositoryName = randomIdentifier(); + final Settings.Builder repositorySettings = randomRepositorySettings().put( + BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), + ByteSizeValue.ofKb(2) + ) + .put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ofKb(2)) + // Small chunk size ensures we don't get stuck throttling for too long + .put("chunk_size", ByteSizeValue.ofBytes(100)); + createRepository(repositoryName, "mock", repositorySettings, false); + + final String snapshotName = randomIdentifier(); + final ActionFuture snapshotFuture; + + // Kick off a snapshot + final long snapshotStartTime = System.currentTimeMillis(); + snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) + .setIndices(indexName) + .setWaitForCompletion(true) + .execute(); + + // Poll until we see some throttling occurring + final long snap_ts0 = System.currentTimeMillis(); + assertBusy(() -> { + collectMetrics(); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION), greaterThan(0L)); + }); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), equalTo(0L)); + + // Remove create throttling + final long snap_ts1 = System.currentTimeMillis(); + createRepository( + repositoryName, + "mock", + repositorySettings.put(BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO), + false + ); + final long snap_ts2 = System.currentTimeMillis(); + + // wait for the snapshot to finish + safeGet(snapshotFuture); + final long snap_ts3 = System.currentTimeMillis(); + + logger.info( + "saw throttling in [{}] remove throttling took [{}], snapshot took [{}]", + TimeValue.timeValueMillis(snap_ts1 - snap_ts0), + TimeValue.timeValueMillis(snap_ts2 - snap_ts1), + TimeValue.timeValueMillis(snap_ts3 - snap_ts2) + ); + + // Work out the maximum amount of concurrency per node + final ThreadPool tp = internalCluster().getDataNodeInstance(ThreadPool.class); + final int snapshotThreadPoolSize = tp.info(ThreadPool.Names.SNAPSHOT).getMax(); + final int maximumPerNodeConcurrency = Math.max(snapshotThreadPoolSize, numShards); + + // we should also have incurred some read duration due to the throttling + final long upperBoundTimeSpentOnSnapshotThingsMillis = internalCluster().numDataNodes() * maximumPerNodeConcurrency * (System + .currentTimeMillis() - snapshotStartTime); + assertThat( + getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION), + allOf(greaterThan(0L), lessThan(upperBoundTimeSpentOnSnapshotThingsMillis)) + ); + + // Restore the snapshot + final long restore_ts0 = System.currentTimeMillis(); + ActionFuture restoreFuture = clusterAdmin().prepareRestoreSnapshot( + TEST_REQUEST_TIMEOUT, + repositoryName, + snapshotName + ).setIndices(indexName).setWaitForCompletion(true).setRenamePattern("(.+)").setRenameReplacement("restored-$1").execute(); + + final long restore_ts1 = System.currentTimeMillis(); + // assert we throttled on restore + assertBusy(() -> { + collectMetrics(); + assertThat(getTotalClusterLongCounterValue(SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION), greaterThan(0L)); + }); + + final long restore_ts2 = System.currentTimeMillis(); + // Remove restore throttling + createRepository( + repositoryName, + "mock", + repositorySettings.put(BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO), + false + ); + safeGet(restoreFuture); + final long restore_ts3 = System.currentTimeMillis(); + + logger.info( + "saw throttling in [{}] remove throttling took [{}], restore took [{}]", + TimeValue.timeValueMillis(restore_ts1 - restore_ts0), + TimeValue.timeValueMillis(restore_ts2 - restore_ts1), + TimeValue.timeValueMillis(restore_ts3 - restore_ts2) + ); + + // assert appropriate attributes are present + final Map expectedAttrs = Map.of("repo_name", repositoryName, "repo_type", "mock"); + assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_UPLOAD_READ_DURATION, expectedAttrs); + assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_RESTORE_THROTTLE_DURATION, expectedAttrs); + assertMetricsHaveAttributes(InstrumentType.LONG_COUNTER, SnapshotMetrics.SNAPSHOT_CREATE_THROTTLE_DURATION, expectedAttrs); + } + public void testByStateCounts_InitAndQueuedShards() throws Exception { final String indexName = randomIdentifier(); final int numShards = randomIntBetween(2, 10); @@ -237,11 +309,13 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception { blockAllDataNodes(repositoryName); final String snapshotName = randomIdentifier(); + final ActionFuture firstSnapshotFuture; + final ActionFuture secondSnapshotFuture; try { - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) + firstSnapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); waitForBlockOnAnyDataNode(repositoryName); @@ -252,10 +326,12 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception { assertThat(snapshotStates.get(SnapshotsInProgress.State.STARTED), equalTo(1L)); // Queue up another snapshot - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) + secondSnapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); + + awaitNumberOfSnapshotsInProgress(2); // Should be {numShards} in QUEUED and INIT states, and 2 STARTED snapshots shardStates = getShardStates(); @@ -268,7 +344,8 @@ public void testByStateCounts_InitAndQueuedShards() throws Exception { } // All statuses should return to zero when the snapshots complete - awaitNumberOfSnapshotsInProgress(0); + safeGet(firstSnapshotFuture); + safeGet(secondSnapshotFuture); getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L))); getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L))); @@ -309,12 +386,13 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception { blockNodeOnAnyFiles(repositoryName, nodeForRemoval); final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final ActionFuture snapshotFuture; try { // Kick off a snapshot - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) + snapshotFuture = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + .setWaitForCompletion(true) + .execute(); // Wait till we're blocked waitForBlock(nodeForRemoval, repositoryName); @@ -337,7 +415,7 @@ public void testByStateCounts_PausedForRemovalShards() throws Exception { clearShutdownMetadata(clusterService); // All statuses should return to zero when the snapshot completes - awaitNumberOfSnapshotsInProgress(0); + safeGet(snapshotFuture); getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L))); getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L))); @@ -395,10 +473,14 @@ public void testByStateCounts_WaitingShards() throws Exception { safeAwait(handoffRequestBarrier); // Kick off a snapshot - clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier()) - .setIndices(indexName) - .setWaitForCompletion(false) - .get(); + final ActionFuture snapshotFuture = clusterAdmin().prepareCreateSnapshot( + TEST_REQUEST_TIMEOUT, + repositoryName, + randomIdentifier() + ).setIndices(indexName).setWaitForCompletion(true).execute(); + + // Wait for the snapshot to start + awaitNumberOfSnapshotsInProgress(1); // Wait till we see a shard in WAITING state createSnapshotInStateListener(clusterService(), repositoryName, indexName, 1, SnapshotsInProgress.ShardState.WAITING); @@ -413,7 +495,7 @@ public void testByStateCounts_WaitingShards() throws Exception { safeAwait(handoffRequestBarrier); // All statuses should return to zero when the snapshot completes - awaitNumberOfSnapshotsInProgress(0); + safeGet(snapshotFuture); getShardStates().forEach((key, value) -> assertThat(value, equalTo(0L))); getSnapshotStates().forEach((key, value) -> assertThat(value, equalTo(0L)));