diff --git a/CHANGELOG.md b/CHANGELOG.md index f0d35c921ccd5..18866530e448f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233)) - Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568)) - Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) - - Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) +- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/ClusterMergeSchedulerConfigsIT.java b/server/src/internalClusterTest/java/org/opensearch/index/ClusterMergeSchedulerConfigsIT.java new file mode 100644 index 0000000000000..b12e294ec8e06 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/ClusterMergeSchedulerConfigsIT.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index; + +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.common.util.concurrent.OpenSearchExecutors.NODE_PROCESSORS_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_AUTO_THROTTLE_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2) +public class ClusterMergeSchedulerConfigsIT extends OpenSearchIntegTestCase { + + @Override + public Settings indexSettings() { + Settings.Builder s = Settings.builder().put(super.indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1); + return s.build(); + } + + @Override + protected Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { + if (random.nextBoolean()) { + builder.put( + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), + (random.nextBoolean() ? random.nextDouble() : random.nextBoolean()).toString() + ); + } + + return builder; + } + + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(NODE_PROCESSORS_SETTING.getKey(), 6).build(); + + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + internalCluster().startClusterManagerOnlyNode(); + } + + public void testMergeSchedulerSettings() throws ExecutionException, InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + List dataNodes = new ArrayList<>(internalCluster().getDataNodeNames()); + String indexName = "log-myindex-1"; + createIndex(indexName); + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes)); + String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); + assertEquals(8, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(3, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(true, indexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING.getKey(), 40) + .put(CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING.getKey(), 20) + ) + .get(); + + indexName = "log-myindex-2"; + createIndex(indexName); + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes)); + uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService secondIndexService = indicesService.indexService(new Index(indexName, uuid)); + assertEquals(40, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(40, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(true, indexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + assertEquals(true, secondIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + ; + + int replicas = randomIntBetween(1, Math.max(1, internalCluster().numDataNodes() - 1)); + // Create index with index level override in settings + indexName = "log-myindex-3"; + createIndex( + indexName, + Settings.builder() + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 150) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicas) + .build() + ); + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes)); + uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService thirdIndexService = indicesService.indexService(new Index(indexName, uuid)); + assertEquals(150, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(40, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(40, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(true, indexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + ; + assertEquals(true, secondIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + ; + assertEquals(true, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + ; + + // changing cluster level default should only affect indices without index level override + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING.getKey(), 35)) + .get(); + + assertEquals(35, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(35, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(150, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + + // updating cluster level auto_throttle to false + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_DEFAULT_AUTO_THROTTLE_SETTING.getKey(), false)) + .get(); + + // removing index level override should pick up the cluster level default + UpdateSettingsRequestBuilder builder = client().admin().indices().prepareUpdateSettings(indexName); + builder.setSettings(Settings.builder().putNull(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey()).build()); + builder.execute().actionGet(); + + assertEquals(35, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(35, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(35, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(false, indexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + assertEquals(false, secondIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + assertEquals(false, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + + // try to update with an invalid setting + builder = client().admin().indices().prepareUpdateSettings(indexName); + builder.setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 17).build()); + UpdateSettingsRequestBuilder finalBuilder = builder; + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> finalBuilder.execute().actionGet()); + assertTrue(exception.getMessage().contains("maxThreadCount (= 20) should be <= maxMergeCount (= 17)")); + + // verify no change in settings post failure + assertEquals(35, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(35, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(35, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + + // update index level setting to override cluster level default + builder = client().admin().indices().prepareUpdateSettings(indexName); + builder.setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 27).build()); + builder.execute().actionGet(); + + assertEquals(35, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(35, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(27, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxMergeCount()); + assertEquals(20, indexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, secondIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + assertEquals(20, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()); + + // setting auto throttle back to true for one index + builder = client().admin().indices().prepareUpdateSettings("log-myindex-2"); + builder.setSettings(Settings.builder().put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true").build()); + builder.execute().actionGet(); + + assertEquals(false, indexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + assertEquals(true, secondIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + assertEquals(false, thirdIndexService.getIndexSettings().getMergeSchedulerConfig().isAutoThrottle()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java index 9d58f4fe5d136..a5f9d67238e22 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java @@ -50,7 +50,7 @@ public class MergedSegmentWarmerIT extends SegmentReplicationIT { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true) + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java index 4895ee589f88b..54b1c02a96f36 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java @@ -43,7 +43,7 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath)) - .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true) + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true) .build(); } @@ -187,7 +187,7 @@ public void testMergeSegmentWarmerWithWarmingDisabled() throws Exception { .cluster() .prepareUpdateSettings() .setPersistentSettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), false).build() + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), false).build() ) .get(); diff --git a/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java index 5346b1c6fc1b8..5e0b077ac3a35 100644 --- a/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java @@ -50,7 +50,7 @@ public class MergeStatsIT extends RemoteStoreBaseIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true) + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true) .build(); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 572d8ab2914c7..16818d9f5e4b9 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -307,6 +307,9 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesQueryCache.INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR, IndicesQueryCache.INDICES_QUERY_CACHE_MIN_FREQUENCY, IndicesQueryCache.INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY, + IndicesService.CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING, + IndicesService.CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING, + IndicesService.CLUSTER_DEFAULT_AUTO_THROTTLE_SETTING, IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING, @@ -324,9 +327,10 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, - RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING, - RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING, - RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING, + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING, + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING, + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 7a8eee076fa37..bfa3d76a4cd88 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -668,7 +668,10 @@ public IndexService newIndexService( Supplier shardLevelRefreshEnabled, RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, - Supplier clusterDefaultMaxMergeAtOnceSupplier + Supplier clusterDefaultMaxMergeAtOnceSupplier, + Supplier clusterDefaultMaxMergeCountSupplier, + Supplier clusterDefaultMaxMergeThreadCountSupplier, + Supplier clusterDefaultMergeAutoThrottleEnabledSupplier ) throws IOException { return newIndexService( indexCreationContext, @@ -696,7 +699,10 @@ public IndexService newIndexService( remoteStoreSettings, (s) -> {}, shardId -> ReplicationStats.empty(), - clusterDefaultMaxMergeAtOnceSupplier + clusterDefaultMaxMergeAtOnceSupplier, + clusterDefaultMaxMergeCountSupplier, + clusterDefaultMaxMergeThreadCountSupplier, + clusterDefaultMergeAutoThrottleEnabledSupplier ); } @@ -726,7 +732,10 @@ public IndexService newIndexService( RemoteStoreSettings remoteStoreSettings, Consumer replicator, Function segmentReplicationStatsProvider, - Supplier clusterDefaultMaxMergeAtOnceSupplier + Supplier clusterDefaultMaxMergeAtOnceSupplier, + Supplier clusterDefaultMaxMergeCountSupplier, + Supplier clusterDefaultMaxMergeThreadCountSupplier, + Supplier clusterDefaultAutoThrottleEnabledSupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -798,7 +807,10 @@ public IndexService newIndexService( compositeIndexSettings, replicator, segmentReplicationStatsProvider, - clusterDefaultMaxMergeAtOnceSupplier + clusterDefaultMaxMergeAtOnceSupplier, + clusterDefaultMaxMergeCountSupplier, + clusterDefaultMaxMergeThreadCountSupplier, + clusterDefaultAutoThrottleEnabledSupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 6612ea732942d..19c5c71bd7582 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -252,7 +252,10 @@ public IndexService( CompositeIndexSettings compositeIndexSettings, Consumer replicator, Function segmentReplicationStatsProvider, - Supplier clusterDefaultMaxMergeAtOnceSupplier + Supplier clusterDefaultMaxMergeAtOnceSupplier, + Supplier clusterDefaultMaxMergeCountSupplier, + Supplier clusterDefaultMaxThreadCountSupplier, + Supplier clusterDefaultAutoThrottleEnabledSupplier ) { super(indexSettings); this.storeFactory = storeFactory; @@ -352,6 +355,11 @@ public IndexService( this.replicator = replicator; this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get()); + indexSettings.setDefaultMaxThreadAndMergeCount( + clusterDefaultMaxThreadCountSupplier.get(), + clusterDefaultMaxMergeCountSupplier.get() + ); + indexSettings.setDefaultAutoThrottleEnabled(clusterDefaultAutoThrottleEnabledSupplier.get()); updateFsyncTaskIfNecessary(); synchronized (refreshMutex) { if (shardLevelRefreshEnabled == false) { @@ -400,7 +408,10 @@ public IndexService( boolean shardLevelRefreshEnabled, RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, - Supplier clusterDefaultMaxMergeAtOnce + Supplier clusterDefaultMaxMergeAtOnce, + Supplier clusterDefaultMaxMergeCountSupplier, + Supplier clusterDefaultMaxThreadCountSupplier, + Supplier clusterDefaultAutoThrottleEnabledSupplier ) { this( indexSettings, @@ -445,7 +456,10 @@ public IndexService( null, s -> {}, (shardId) -> ReplicationStats.empty(), - clusterDefaultMaxMergeAtOnce + clusterDefaultMaxMergeAtOnce, + clusterDefaultMaxMergeCountSupplier, + clusterDefaultMaxThreadCountSupplier, + clusterDefaultAutoThrottleEnabledSupplier ); } @@ -1235,6 +1249,21 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) { indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce); } + /** + * Called when the cluster level settings: {@code cluster.default.index.merge.scheduler.max_merge_count} OR + * {@code cluster.default.index.merge.scheduler.max_thread_count} change. + */ + public void onDefaultMaxMergeOrThreadCountUpdate(int maxThreadCount, int maxMergeCount) { + indexSettings.setDefaultMaxThreadAndMergeCount(maxThreadCount, maxMergeCount); + } + + /** + * Called whenever the cluster level {@code cluster.default.index.merge.scheduler.auto_throttle} changes. + */ + public void onDefaultAutoThrottleEnabledUpdate(boolean enabled) { + indexSettings.setDefaultAutoThrottleEnabled(enabled); + } + /** * Called whenever the cluster level {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} changes. * The change is only applied if the index doesn't have its own explicit force merge MB per sec setting. diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index bf3fef3d83caf..f3fdfbad74747 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -615,7 +615,7 @@ public static IndexMergePolicy fromString(String text) { /** * Expert: sets the amount of time to wait for merges (during {@link org.apache.lucene.index.IndexWriter#commit} - * or {@link org.apache.lucene.index.IndexWriter#getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...). + * or {@link org.apache.lucene.index.IndexWriter # getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...). * If this time is reached, we proceed with the commit based on segments merged up to that point. The merges are not * aborted, and will still run to completion independent of the commit or getReader call, like natural segment merges. */ @@ -1314,6 +1314,27 @@ void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) { } } + /** + * Update the cached defaults for {@code MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING} and + * {@code MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING} + */ + void setDefaultMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) { + // Upon updates to the cluster default settings, we always update the cached default values in + // the MergeSchedulerConfig, but we only update the actual values when an index level setting is not present + boolean updateActuals = MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.exists(getSettings()) == false + && MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.exists(getSettings()) == false; + mergeSchedulerConfig.setDefaultMaxThreadAndMergeCount(maxThreadCount, maxMergeCount, updateActuals); + } + + void setDefaultAutoThrottleEnabled(boolean enabled) { + // Upon updates to the cluster default settings, we always update the cached default values in + // the MergeSchedulerConfig, but we only update the actual values when an index level setting is not present + mergeSchedulerConfig.setDefaultAutoThrottleEnabled( + enabled, + MergeSchedulerConfig.AUTO_THROTTLE_SETTING.exists(getSettings()) == false + ); + } + /** * Updates the maxMergesAtOnce for actual TieredMergePolicy used by the engine. * Sets it to default maxMergesAtOnce if index level settings is being removed diff --git a/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java b/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java index bbd375dfd4375..2075b5c9b5b6c 100644 --- a/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java +++ b/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java @@ -32,6 +32,9 @@ package org.opensearch.index; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -39,6 +42,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import java.util.Objects; + /** * The merge scheduler (ConcurrentMergeScheduler) controls the execution of * merge operations once they are needed (according to the merge policy). Merges @@ -91,24 +96,26 @@ */ @PublicApi(since = "1.0.0") public final class MergeSchedulerConfig { + static Logger logger = LogManager.getLogger(MergeSchedulerConfig.class); + public static final boolean DEFAULT_AUTO_THROTTLE = true; public static final Setting MAX_THREAD_COUNT_SETTING = new Setting<>( "index.merge.scheduler.max_thread_count", - (s) -> Integer.toString(Math.max(1, Math.min(4, OpenSearchExecutors.allocatedProcessors(s) / 2))), + MergeSchedulerConfig::getDefaultMaxThreadCount, (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"), Property.Dynamic, Property.IndexScope ); public static final Setting MAX_MERGE_COUNT_SETTING = new Setting<>( "index.merge.scheduler.max_merge_count", - (s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) + 5), + MergeSchedulerConfig::getDefaultMergeCount, (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_merge_count"), Property.Dynamic, Property.IndexScope ); public static final Setting AUTO_THROTTLE_SETTING = Setting.boolSetting( "index.merge.scheduler.auto_throttle", - true, + MergeSchedulerConfig::getDefaultAutoThrottle, Property.Dynamic, Property.IndexScope ); @@ -130,19 +137,61 @@ public final class MergeSchedulerConfig { Property.NodeScope ); + private final String indexName; private volatile boolean autoThrottle; private volatile int maxThreadCount; private volatile int maxMergeCount; private volatile double maxForceMergeMBPerSec; + private static volatile Boolean cachedAutoThrottleEnabledDefault; + private static volatile Integer cachedMaxThreadCountDefault; + private static volatile Integer cachedMaxMergeCountDefault; MergeSchedulerConfig(IndexSettings indexSettings) { - int maxThread = indexSettings.getValue(MAX_THREAD_COUNT_SETTING); - int maxMerge = indexSettings.getValue(MAX_MERGE_COUNT_SETTING); - setMaxThreadAndMergeCount(maxThread, maxMerge); - this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING); + indexName = indexSettings.getIndex().getName(); + initMergeConfigs(indexSettings); updateMaxForceMergeMBPerSec(indexSettings); } + public synchronized void setDefaultMaxThreadAndMergeCount(int threadCount, int mergeCount, boolean updateActuals) { + cachedMaxThreadCountDefault = threadCount; + cachedMaxMergeCountDefault = mergeCount; + if (updateActuals == true) { + setMaxThreadAndMergeCount(threadCount, mergeCount); + } + } + + public synchronized void setDefaultAutoThrottleEnabled(boolean enabled, boolean updateActuals) { + cachedAutoThrottleEnabledDefault = enabled; + if (updateActuals == true) { + setAutoThrottle(enabled); + } + } + + private void initMergeConfigs(IndexSettings indexSettings) { + Settings settings = indexSettings.getSettings(); + int maxThread = (MAX_THREAD_COUNT_SETTING.exists(settings) == false && cachedMaxThreadCountDefault != null) + ? cachedMaxThreadCountDefault + : MAX_THREAD_COUNT_SETTING.get(settings); + int maxMerge = (MAX_MERGE_COUNT_SETTING.exists(settings) == false && cachedAutoThrottleEnabledDefault != null) + ? cachedMaxMergeCountDefault + : MAX_MERGE_COUNT_SETTING.get(settings); + boolean autoThrottleEnabled = (AUTO_THROTTLE_SETTING.exists(settings) == false && cachedAutoThrottleEnabledDefault != null) + ? cachedAutoThrottleEnabledDefault + : AUTO_THROTTLE_SETTING.get(settings); + + setAutoThrottle(autoThrottleEnabled); + setMaxThreadAndMergeCount(maxThread, maxMerge); + logger.info( + new ParameterizedMessage( + "Initialized index {} with maxMergeCount={}, maxThreadCount={}, autoThrottleEnabled={}", + this.indexName, + this.maxMergeCount, + this.maxThreadCount, + this.autoThrottle + ) + ); + } + /** * Returns true iff auto throttle is enabled. * @@ -155,8 +204,11 @@ public boolean isAutoThrottle() { /** * Enables / disables auto throttling on the {@link ConcurrentMergeScheduler} */ - void setAutoThrottle(boolean autoThrottle) { - this.autoThrottle = autoThrottle; + public void setAutoThrottle(boolean enabled) { + logger.info( + new ParameterizedMessage("Updating autoThrottle for index {} from [{}] to [{}]", this.indexName, this.autoThrottle, enabled) + ); + this.autoThrottle = enabled; } /** @@ -170,20 +222,40 @@ public int getMaxThreadCount() { * Expert: directly set the maximum number of merge threads and * simultaneous merges allowed. */ - void setMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) { + public void setMaxThreadAndMergeCount(int newMaxThreadCount, int newMaxMergeCount) { + if (newMaxThreadCount == this.maxThreadCount && newMaxMergeCount == this.maxMergeCount) { + return; + } + validateMaxThreadAndMergeCount(newMaxThreadCount, newMaxMergeCount); + logger.info( + new ParameterizedMessage( + "Updating maxThreadCount from [{}] to [{}] and maxMergeCount from [{}] to [{}] for index {}.", + this.maxThreadCount, + newMaxThreadCount, + this.maxMergeCount, + newMaxMergeCount, + this.indexName + ) + ); + this.maxThreadCount = newMaxThreadCount; + this.maxMergeCount = newMaxMergeCount; + } + + /** + * Validate the values of {@code maxMergeCount} and {@code maxThreadCount} + */ + public static void validateMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) { if (maxThreadCount < 1) { - throw new IllegalArgumentException("maxThreadCount should be at least 1"); + throw new IllegalArgumentException("maxThreadCount (= " + maxThreadCount + ") should be at least 1"); } if (maxMergeCount < 1) { - throw new IllegalArgumentException("maxMergeCount should be at least 1"); + throw new IllegalArgumentException("maxMergeCount (= " + maxMergeCount + ") should be at least 1"); } if (maxThreadCount > maxMergeCount) { throw new IllegalArgumentException( "maxThreadCount (= " + maxThreadCount + ") should be <= maxMergeCount (= " + maxMergeCount + ")" ); } - this.maxThreadCount = maxThreadCount; - this.maxMergeCount = maxMergeCount; } /** @@ -222,4 +294,24 @@ public void updateMaxForceMergeMBPerSec(IndexSettings indexSettings) { this.maxForceMergeMBPerSec = CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.get(indexSettings.getNodeSettings()); } } + + public static String getDefaultMaxThreadCount(Settings settings) { + return Integer.toString( + Objects.requireNonNullElseGet( + cachedMaxThreadCountDefault, + () -> Math.max(1, Math.min(4, OpenSearchExecutors.allocatedProcessors(settings) / 2)) + ) + ); + + } + + private static String getDefaultMergeCount(Settings settings) { + return Integer.toString( + Objects.requireNonNullElseGet(cachedMaxMergeCountDefault, () -> MAX_THREAD_COUNT_SETTING.get(settings) + 5) + ); + } + + private static String getDefaultAutoThrottle(Settings settings) { + return Boolean.toString(Objects.requireNonNullElseGet(cachedAutoThrottleEnabledDefault, () -> DEFAULT_AUTO_THROTTLE)); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java index e1246582b02f7..dc656942efdd8 100644 --- a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java +++ b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java @@ -10,7 +10,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentReader; @@ -24,12 +24,12 @@ import java.io.IOException; /** - * Implementation of a {@link IndexWriter.IndexReaderWarmer} for merged segment replication in + * Implementation of a {@link IndexReaderWarmer} for merged segment replication in * local on-disk and remote store enabled domains. * * @opensearch.internal */ -public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer { +public class MergedSegmentWarmer implements IndexReaderWarmer { private final TransportService transportService; private final RecoverySettings recoverySettings; private final ClusterService clusterService; @@ -53,19 +53,20 @@ public MergedSegmentWarmer( @Override public void warm(LeafReader leafReader) throws IOException { - if (shouldWarm() == false) { - return; - } - mergedSegmentTransferTracker.incrementTotalWarmInvocationsCount(); - mergedSegmentTransferTracker.incrementOngoingWarms(); - // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. - assert leafReader instanceof SegmentReader; - assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); + SegmentCommitInfo segmentCommitInfo = segmentCommitInfo(leafReader); long startTime = System.currentTimeMillis(); - long elapsedTime = 0; + long elapsedTime = -1; + boolean shouldWarm = false; try { - SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo(); + // If shouldWarm fails, we increment the warmFailureCount + // However, the time taken by shouldWarm is not accounted for in the totalWarmTime + shouldWarm = shouldWarm(segmentCommitInfo); + if (shouldWarm == false) { + return; + } + mergedSegmentTransferTracker.incrementTotalWarmInvocationsCount(); + mergedSegmentTransferTracker.incrementOngoingWarms(); logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo)); indexShard.publishMergedSegment(segmentCommitInfo); elapsedTime = System.currentTimeMillis() - startTime; @@ -83,16 +84,53 @@ public void warm(LeafReader leafReader) throws IOException { ); }); } catch (Throwable t) { - logger.warn(() -> new ParameterizedMessage("Failed to warm segment. Continuing. {}", leafReader), t); + logger.warn(() -> new ParameterizedMessage("Failed to warm segment. Continuing. {}", segmentCommitInfo), t); mergedSegmentTransferTracker.incrementTotalWarmFailureCount(); } finally { - mergedSegmentTransferTracker.addTotalWarmTimeMillis(elapsedTime); - mergedSegmentTransferTracker.decrementOngoingWarms(); + if (shouldWarm == true) { + if (elapsedTime == -1) { + elapsedTime = System.currentTimeMillis() - startTime; + } + mergedSegmentTransferTracker.addTotalWarmTimeMillis(elapsedTime); + mergedSegmentTransferTracker.decrementOngoingWarms(); + } } } + SegmentCommitInfo segmentCommitInfo(LeafReader leafReader) { + + // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. + assert leafReader instanceof SegmentReader; + assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); + return ((SegmentReader) leafReader).getSegmentInfo(); + } + // package-private for tests - boolean shouldWarm() { - return indexShard.getRecoverySettings().isMergedSegmentReplicationWarmerEnabled() == true; + boolean shouldWarm(SegmentCommitInfo segmentCommitInfo) throws IOException { + if (indexShard.getRecoverySettings().isMergedSegmentReplicationWarmerEnabled() == false) { + return false; + } + + // in case we are unable to gauge the size of the merged segment segmentCommitInfo.sizeInBytes throws IOException + // we would not warm the segment + if (segmentCommitInfo.info == null || segmentCommitInfo.info.dir == null) { + return false; + } + + long segmentSize = segmentCommitInfo.sizeInBytes(); + double threshold = indexShard.getRecoverySettings().getMergedSegmentWarmerMinSegmentSizeThreshold().getBytes(); + if (segmentSize < threshold) { + logger.trace( + () -> new ParameterizedMessage( + "Skipping warm for segment {}. SegmentSize {}B is less than the configured threshold {}B.", + segmentCommitInfo.info.name, + segmentSize, + threshold + ) + ); + return false; + } + + return true; } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 667eb9edb65f7..aff7996f51113 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -214,6 +214,7 @@ import static org.opensearch.core.common.util.CollectionUtils.arrayAsArrayList; import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX; import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION; +import static org.opensearch.index.MergeSchedulerConfig.DEFAULT_AUTO_THROTTLE; import static org.opensearch.index.TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE; import static org.opensearch.index.TieredMergePolicyProvider.MIN_DEFAULT_MAX_MERGE_AT_ONCE; import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; @@ -296,6 +297,29 @@ public class IndicesService extends AbstractLifecycleComponent Property.Dynamic ); + public static final Setting CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING = new Setting<>( + "cluster.default.index.merge.scheduler.max_thread_count", + (s) -> Integer.toString(Math.max(1, Math.min(4, OpenSearchExecutors.allocatedProcessors(s) / 2))), + (s) -> Setting.parseInt(s, 1, "cluster.default.index.merge.scheduler.max_thread_count"), + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING = new Setting<>( + "cluster.default.index.merge.scheduler.max_merge_count", + (s) -> Integer.toString(CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING.get(s) + 5), + (s) -> Setting.parseInt(s, 1, "cluster.default.index.merge.scheduler.max_merge_count"), + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting CLUSTER_DEFAULT_AUTO_THROTTLE_SETTING = Setting.boolSetting( + "cluster.default.index.merge.scheduler.auto_throttle", + DEFAULT_AUTO_THROTTLE, + Property.Dynamic, + Property.NodeScope + ); + /** * This setting is used to set the minimum refresh interval applicable for all indexes in a cluster. The * {@code cluster.default.index.refresh_interval} setting value needs to be higher than this setting's value. Index @@ -417,6 +441,9 @@ public class IndicesService extends AbstractLifecycleComponent private final Function segmentReplicationStatsProvider; private volatile int maxSizeInRequestCache; private volatile int defaultMaxMergeAtOnce; + private volatile int clusterDefaultMaxThreadCount; + private volatile int clusterDefaultMaxMergeCount; + private volatile boolean clusterDefaultAutoThrottleEnabled; @Override protected void doStart() { @@ -590,6 +617,11 @@ protected void closeInternal() { .addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING, this::setMaxSizeInRequestCache); this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings()); + + clusterDefaultMaxThreadCount = CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING.get(clusterService.getSettings()); + clusterDefaultMaxMergeCount = CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING.get(clusterService.getSettings()); + clusterDefaultAutoThrottleEnabled = CLUSTER_DEFAULT_AUTO_THROTTLE_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate); clusterService.getClusterSettings() @@ -597,6 +629,22 @@ protected void closeInternal() { MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING, this::onClusterLevelForceMergeMBPerSecUpdate ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + this::setClusterDefaultMaxThreadAndMergeCount, + List.of(CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING, CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING), + this::validateMaxThreadAndMergeCount + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_DEFAULT_AUTO_THROTTLE_SETTING, this::onClusterDefaultMergeAutoThrottleUpdate); + + } + + private void validateMaxThreadAndMergeCount(Settings settings) { + MergeSchedulerConfig.validateMaxThreadAndMergeCount( + CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING.get(settings), + CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING.get(settings) + ); } @InternalApi @@ -714,6 +762,26 @@ private void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec } } + private void setClusterDefaultMaxThreadAndMergeCount(Settings settings) { + // CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING and CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING + // settings received here have already been validated using + // IndicesService.validateMaxThreadAndMergeCount + clusterDefaultMaxThreadCount = CLUSTER_DEFAULT_MAX_THREAD_COUNT_SETTING.get(settings); + clusterDefaultMaxMergeCount = CLUSTER_DEFAULT_MAX_MERGE_COUNT_SETTING.get(settings); + for (Map.Entry entry : indices.entrySet()) { + IndexService indexService = entry.getValue(); + indexService.onDefaultMaxMergeOrThreadCountUpdate(clusterDefaultMaxThreadCount, clusterDefaultMaxMergeCount); + } + } + + private void onClusterDefaultMergeAutoThrottleUpdate(Boolean value) { + clusterDefaultAutoThrottleEnabled = value; + for (Map.Entry entry : indices.entrySet()) { + IndexService indexService = entry.getValue(); + indexService.onDefaultAutoThrottleEnabledUpdate(clusterDefaultAutoThrottleEnabled); + } + } + private static BiFunction getTranslogFactorySupplier( Supplier repositoriesServiceSupplier, ThreadPool threadPool, @@ -1121,7 +1189,10 @@ private synchronized IndexService createIndexService( this.remoteStoreSettings, replicator, segmentReplicationStatsProvider, - this::getClusterDefaultMaxMergeAtOnce + this::getClusterDefaultMaxMergeAtOnce, + this::getClusterDefaultMaxMergeCount, + this::getClusterDefaultMaxThreadCount, + this::getClusterDefaultMergeAutoThrottleEnabled ); } @@ -2330,6 +2401,18 @@ private Integer getClusterDefaultMaxMergeAtOnce() { return this.defaultMaxMergeAtOnce; } + private Integer getClusterDefaultMaxMergeCount() { + return this.clusterDefaultMaxMergeCount; + } + + private Integer getClusterDefaultMaxThreadCount() { + return this.clusterDefaultMaxThreadCount; + } + + private Boolean getClusterDefaultMergeAutoThrottleEnabled() { + return this.clusterDefaultAutoThrottleEnabled; + } + public RemoteStoreSettings getRemoteStoreSettings() { return this.remoteStoreSettings; } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 35cdfe278f4f0..5a70508075e08 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -40,7 +40,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; -import org.opensearch.common.settings.Setting.Validator; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; @@ -77,22 +76,28 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Dynamic setting to set a threshold for minimum size of a merged segment to be warmed. + */ + public static final Setting INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING = Setting + .byteSizeSetting( + "indices.replication.merges.warmer.min_segment_size_threshold", + new ByteSizeValue(500, ByteSizeUnit.MB), + Property.Dynamic, + Property.NodeScope + ); + /** * Dynamic setting to enable the merged segment warming(pre-copy) feature, default: false */ - public static final Setting INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING = Setting.boolSetting( - "indices.replication.merged_segment_warmer_enabled", + public static final Setting INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING = Setting.boolSetting( + "indices.replication.merges.warmer.enabled", false, - new Validator() { - @Override - public void validate(Boolean value) { - if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false && value == true) { - throw new IllegalArgumentException( - "FeatureFlag " - + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG - + " must be enabled to set this property to true." - ); - } + value -> { + if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false && value == true) { + throw new IllegalArgumentException( + "FeatureFlag " + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG + " must be enabled to set this property to true." + ); } }, Property.Dynamic, @@ -102,8 +107,8 @@ public void validate(Boolean value) { /** * Individual speed setting for merged segment replication, default -1B to reuse the setting of recovery. */ - public static final Setting INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( - "indices.merged_segment_replication.max_bytes_per_sec", + public static final Setting INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + "indices.replication.merges.warmer.max_bytes_per_sec", new ByteSizeValue(-1), Property.Dynamic, Property.NodeScope @@ -112,8 +117,8 @@ public void validate(Boolean value) { /** * Control the maximum waiting time for replicate merged segment to the replica */ - public static final Setting INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING = Setting.timeSetting( - "indices.merged_segment_replication_timeout", + public static final Setting INDICES_REPLICATION_MERGES_WARMER_TIMEOUT_SETTING = Setting.timeSetting( + "indices.replication.merges.warmer.timeout", TimeValue.timeValueMinutes(15), TimeValue.timeValueMinutes(0), Property.Dynamic, @@ -233,6 +238,7 @@ public void validate(Boolean value) { Property.NodeScope ); + private volatile ByteSizeValue mergedSegmentWarmerMinSegmentSizeThreshold; private volatile ByteSizeValue recoveryMaxBytesPerSec; private volatile ByteSizeValue replicationMaxBytesPerSec; private volatile boolean mergedSegmentReplicationWarmerEnabled; @@ -275,9 +281,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); - this.mergedSegmentReplicationWarmerEnabled = INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.get(settings); - this.mergedSegmentReplicationMaxBytesPerSec = INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); - this.mergedSegmentReplicationTimeout = INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING.get(settings); + this.mergedSegmentReplicationWarmerEnabled = INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.get(settings); + this.mergedSegmentReplicationMaxBytesPerSec = INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING.get(settings); + this.mergedSegmentReplicationTimeout = INDICES_REPLICATION_MERGES_WARMER_TIMEOUT_SETTING.get(settings); + this.mergedSegmentWarmerMinSegmentSizeThreshold = INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.get( + settings + ); replicationRateLimiter = getReplicationRateLimiter(replicationMaxBytesPerSec); mergedSegmentReplicationRateLimiter = getReplicationRateLimiter(mergedSegmentReplicationMaxBytesPerSec); @@ -288,17 +297,21 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer( - INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING, + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING, this::setIndicesMergedSegmentReplicationWarmerEnabled ); clusterSettings.addSettingsUpdateConsumer( - INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING, + INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING, this::setMergedSegmentReplicationMaxBytesPerSec ); clusterSettings.addSettingsUpdateConsumer( - INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING, + INDICES_REPLICATION_MERGES_WARMER_TIMEOUT_SETTING, this::setMergedSegmentReplicationTimeout ); + clusterSettings.addSettingsUpdateConsumer( + INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING, + this::setMergedSegmentWarmerMinSegmentSizeThreshold + ); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer( @@ -321,6 +334,14 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { ); } + private void setMergedSegmentWarmerMinSegmentSizeThreshold(ByteSizeValue value) { + this.mergedSegmentWarmerMinSegmentSizeThreshold = value; + } + + public ByteSizeValue getMergedSegmentWarmerMinSegmentSizeThreshold() { + return this.mergedSegmentWarmerMinSegmentSizeThreshold; + } + public RateLimiter recoveryRateLimiter() { return recoveryRateLimiter; } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 29dd60c3e638f..5121dacc86e68 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -270,7 +270,10 @@ private IndexService newIndexService(IndexModule module) throws IOException { DefaultRemoteStoreSettings.INSTANCE, s -> {}, null, - () -> TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE + () -> TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE, + () -> 6, + () -> 1, + () -> true ); } diff --git a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java index 6c2a4fd254d16..8318f62e40d6f 100644 --- a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java +++ b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java @@ -8,47 +8,147 @@ package org.opensearch.index.engine; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.search.Sort; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.store.MockDirectoryWrapper; +import org.apache.lucene.util.StringHelper; +import org.opensearch.Version; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class MergedSegmentWarmerTests extends OpenSearchTestCase { IndexShard mockIndexShard; MergedSegmentWarmer mergedSegmentWarmer; + @Mock + SegmentCommitInfo segmentCommitInfo; + @Mock + MergedSegmentTransferTracker mergedSegmentTransferTracker; @Override public void setUp() throws Exception { super.setUp(); mockIndexShard = mock(IndexShard.class); + mergedSegmentTransferTracker = mock(MergedSegmentTransferTracker.class); + when(mockIndexShard.mergedSegmentTransferTracker()).thenReturn(mergedSegmentTransferTracker); when(mockIndexShard.shardId()).thenReturn(new ShardId(new Index("test-index", "_na_"), 0)); mergedSegmentWarmer = new MergedSegmentWarmer(null, null, null, mockIndexShard); + segmentCommitInfo = new SegmentCommitInfo(segmentInfo(), 0, 0, 0, 0, 0, null); } - public void testShouldWarm_warmerEnabled() { + public void testShouldWarm_warmerEnabled() throws IOException { RecoverySettings mockRecoverySettings = mock(RecoverySettings.class); when(mockRecoverySettings.isMergedSegmentReplicationWarmerEnabled()).thenReturn(true); + when(mockRecoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold()).thenReturn(new ByteSizeValue(500, ByteSizeUnit.MB)); when(mockIndexShard.getRecoverySettings()).thenReturn(mockRecoverySettings); + when(segmentCommitInfo.info.dir.fileLength(any())).thenReturn(600 * 1_000_000L); assertTrue( "MergedSegmentWarmer#shouldWarm is expected to return true when merged segment warmer is enabled", - mergedSegmentWarmer.shouldWarm() + mergedSegmentWarmer.shouldWarm(segmentCommitInfo) ); } - public void testShouldWarm_warmerDisabled() { + public void testShouldWarm_warmerDisabled() throws IOException { RecoverySettings mockRecoverySettings = mock(RecoverySettings.class); when(mockRecoverySettings.isMergedSegmentReplicationWarmerEnabled()).thenReturn(false); + when(mockRecoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold()).thenReturn(new ByteSizeValue(500, ByteSizeUnit.MB)); when(mockIndexShard.getRecoverySettings()).thenReturn(mockRecoverySettings); + when(segmentCommitInfo.info.dir.fileLength(any())).thenReturn(600 * 1_000_000L); assertFalse( "MergedSegmentWarmer#shouldWarm is expected to return false when merged segment warmer is disabled", - mergedSegmentWarmer.shouldWarm() + mergedSegmentWarmer.shouldWarm(segmentCommitInfo) + ); + } + + public void testShouldWarm_segmentSizeLessThanThreshold() throws IOException { + RecoverySettings mockRecoverySettings = mock(RecoverySettings.class); + when(mockRecoverySettings.isMergedSegmentReplicationWarmerEnabled()).thenReturn(true); + when(mockRecoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold()).thenReturn(new ByteSizeValue(500, ByteSizeUnit.MB)); + when(mockIndexShard.getRecoverySettings()).thenReturn(mockRecoverySettings); + when(segmentCommitInfo.info.dir.fileLength(any())).thenReturn(150 * 1_000_000L); + + assertFalse( + "MergedSegmentWarmer#shouldWarm is expected to return true when merged segment warmer is enabled", + mergedSegmentWarmer.shouldWarm(segmentCommitInfo) + ); + } + + public void testShouldWarm_segmentSizeGreaterThanThreshold() throws IOException { + RecoverySettings mockRecoverySettings = mock(RecoverySettings.class); + when(mockRecoverySettings.isMergedSegmentReplicationWarmerEnabled()).thenReturn(true); + when(mockRecoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold()).thenReturn(new ByteSizeValue(500, ByteSizeUnit.MB)); + when(mockIndexShard.getRecoverySettings()).thenReturn(mockRecoverySettings); + when(segmentCommitInfo.info.dir.fileLength(any())).thenReturn(600 * 1_000_000L); + + assertTrue( + "MergedSegmentWarmer#shouldWarm is expected to return false when merged segment warmer is disabled", + mergedSegmentWarmer.shouldWarm(segmentCommitInfo) + ); + } + + public void testShouldWarm_failure() throws IOException { + ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false); + + MergedSegmentWarmer warmer = spy(new MergedSegmentWarmer(null, null, null, mockIndexShard)); + doThrow(new RuntimeException("test exception")).when(warmer).shouldWarm(any()); + doReturn(mock(SegmentCommitInfo.class)).when(warmer).segmentCommitInfo(any()); + warmer.warm(mock(LeafReader.class)); + verify(mergedSegmentTransferTracker, times(0)).incrementOngoingWarms(); + verify(mergedSegmentTransferTracker, times(0)).incrementTotalWarmInvocationsCount(); + verify(mergedSegmentTransferTracker, times(0)).addTotalWarmTimeMillis(ArgumentMatchers.anyLong()); + verify(mergedSegmentTransferTracker, times(0)).decrementOngoingWarms(); + verify(mergedSegmentTransferTracker, times(1)).incrementTotalWarmFailureCount(); + } + + private SegmentInfo segmentInfo() throws IOException { + Directory dir = mock(MockDirectoryWrapper.class); + Codec codec = Codec.getDefault(); + + byte[] id = StringHelper.randomId(); + SegmentInfo info = new SegmentInfo( + dir, + Version.CURRENT.luceneVersion, + Version.V_3_4_0.luceneVersion, + "_123", + 1, + true, + false, + codec, + Collections.emptyMap(), + id, + Collections.emptyMap(), + new Sort() ); + info.setFiles(List.of("_foo.si")); + when(dir.fileLength(any())).thenReturn(100L); + return info; } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 5898d54e90124..082bd2c35c48f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -162,7 +162,7 @@ public void testReplication() throws Exception { public void testMergedSegmentReplication() throws Exception { // Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 1 replica shard. final RecoverySettings recoverySettings = new RecoverySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); try ( @@ -208,7 +208,7 @@ public void testMergedSegmentReplicationWithException() throws Exception { throw new RuntimeException("mock exception"); }); final RecoverySettings recoverySettings = new RecoverySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); try ( @@ -251,7 +251,7 @@ public void testMergedSegmentReplicationWithException() throws Exception { public void testMergedSegmentReplicationWithZeroReplica() throws Exception { // Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 0 replica shard. final RecoverySettings recoverySettings = new RecoverySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); try ( @@ -286,7 +286,7 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception { @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) public void testCleanupRedundantPendingMergeSegment() throws Exception { final RecoverySettings recoverySettings = new RecoverySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); try ( diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index a3f8a51ec2369..432477c053e4b 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -62,7 +62,7 @@ public void testZeroBytesPerSecondIsNoRateLimit() { ); assertNull(recoverySettings.replicationRateLimiter()); clusterSettings.applySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); assertNull(recoverySettings.mergedSegmentReplicationRateLimiter()); } @@ -88,7 +88,7 @@ public void testSetMergedSegmentReplicationMaxBytesPerSec() { clusterSettings.applySettings( Settings.builder() .put( - RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB) ) .build() @@ -97,7 +97,7 @@ public void testSetMergedSegmentReplicationMaxBytesPerSec() { clusterSettings.applySettings( Settings.builder() .put( - RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB) ) .build() @@ -109,13 +109,13 @@ public void testMergedSegmentReplicationTimeout() { assertEquals(15, (int) recoverySettings.getMergedSegmentReplicationTimeout().minutes()); clusterSettings.applySettings( Settings.builder() - .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMinutes(5)) + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMinutes(5)) .build() ); assertEquals(5, (int) recoverySettings.getMergedSegmentReplicationTimeout().minutes()); clusterSettings.applySettings( Settings.builder() - .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMinutes(25)) + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMinutes(25)) .build() ); assertEquals(25, (int) recoverySettings.getMergedSegmentReplicationTimeout().minutes()); @@ -191,12 +191,12 @@ public void testMergedSegmentReplicationWarmerEnabledSetting() { FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build()); clusterSettings.applySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build() + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true).build() ); assertTrue(recoverySettings.isMergedSegmentReplicationWarmerEnabled()); clusterSettings.applySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), false).build() + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), false).build() ); assertFalse(recoverySettings.isMergedSegmentReplicationWarmerEnabled()); } @@ -205,11 +205,13 @@ public void testMergedSegmentReplicationWarmerEnabledSettingInvalidUpdate() { Exception e = assertThrows( IllegalArgumentException.class, () -> clusterSettings.applySettings( - Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build() + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey(), true).build() ) ); assertEquals( - "illegal value can't update [indices.replication.merged_segment_warmer_enabled] from [false] to [true]", + "illegal value can't update [" + + RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING.getKey() + + "] from [false] to [true]", e.getMessage() ); assertEquals(IllegalArgumentException.class, e.getCause().getClass()); @@ -218,4 +220,31 @@ public void testMergedSegmentReplicationWarmerEnabledSettingInvalidUpdate() { e.getCause().getMessage() ); } + + public void testMergedSegmentWarmerSegmentSizeThresholdSetting() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build()); + + assertEquals(500L, recoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold().getMb()); + + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.getKey(), "100gb") + .build() + ); + assertEquals(100L, recoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold().getGb()); + + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.getKey(), "4KB") + .build() + ); + assertEquals(4L, recoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold().getKb()); + + clusterSettings.applySettings( + Settings.builder() + .putNull(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.getKey()) + .build() + ); + assertEquals(500L, recoverySettings.getMergedSegmentWarmerMinSegmentSizeThreshold().getMb()); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index bf6d2d9e65882..88ab8a248078c 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -74,7 +75,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { - closeShards(primaryShard, replicaShard); + closeShardsWithRetry(primaryShard, replicaShard); super.tearDown(); } @@ -229,7 +230,6 @@ public void testGetMergedSegmentFiles() throws IOException, ExecutionException, GetSegmentFilesResponse response = res.get(); assertEquals(response.files.size(), filesToFetch.size()); assertTrue(response.files.containsAll(filesToFetch)); - closeShardWithRetry(replicaShard); } public void testGetMergedSegmentFilesDownloadTimeout() throws IOException, ExecutionException, InterruptedException { @@ -293,7 +293,6 @@ public void onFailure(Exception e) { observedException.getMessage() != null && observedException.getMessage().equals("Timed out waiting for merged segments download from remote store") ); - closeShardWithRetry(replicaShard); } public void testGetMergedSegmentFilesFailure() throws IOException, ExecutionException, InterruptedException { @@ -345,7 +344,6 @@ public void onFailure(Exception e) { Exception observedException = failureRef.get(); assertTrue(observedException instanceof NoSuchFileException); assertTrue(observedException.getMessage() != null && observedException.getMessage().startsWith("invalid.")); - closeShards(replicaShard); } private void buildIndexShardBehavior(IndexShard mockShard, IndexShard indexShard) { @@ -357,17 +355,22 @@ private void buildIndexShardBehavior(IndexShard mockShard, IndexShard indexShard when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); } - private void closeShardWithRetry(IndexShard shard) { - try { - assertBusy(() -> { - try { - closeShards(shard); - } catch (RuntimeException e) { - throw new AssertionError("Failed to close shard", e); - } - }); - } catch (Exception e) { - logger.warn("Unable to close shard " + shard.shardId() + ". Exception: " + e); + private void closeShardsWithRetry(IndexShard... shards) { + for (IndexShard shard : shards) { + AtomicInteger retry = new AtomicInteger(1); + String shardId = shard.shardId().toString() + (shard.isPrimaryMode() ? "[p]" : "[r]"); + try { + assertBusy(() -> { + try { + logger.info("Trying to close " + shardId + " try: " + retry.getAndIncrement()); + closeShard(shard, true); + } catch (RuntimeException e) { + throw new AssertionError("Failed to close shard", e); + } + }); + } catch (Exception e) { + logger.warn("Unable to close shard " + shardId + ". Exception: " + e); + } } } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 4d3d00f65a1f6..51effc46981e7 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -478,10 +478,6 @@ protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builde setRandomIndexMergeSettings(random, builder); setRandomIndexTranslogSettings(random, builder); - if (random.nextBoolean()) { - builder.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), false); - } - if (random.nextBoolean()) { builder.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), random.nextBoolean()); } @@ -505,26 +501,28 @@ protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builde return builder; } - private static Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { + protected Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { builder.put( TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), (random.nextBoolean() ? random.nextDouble() : random.nextBoolean()).toString() ); } - switch (random.nextInt(4)) { - case 3: - final int maxThreadCount = RandomNumbers.randomIntBetween(random, 1, 4); - final int maxMergeCount = RandomNumbers.randomIntBetween(random, maxThreadCount, maxThreadCount + 4); - builder.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount); - builder.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount); - break; + int value = random.nextInt(4); + if (value == 3) { + final int maxThreadCount = RandomNumbers.randomIntBetween(random, 1, 4); + final int maxMergeCount = RandomNumbers.randomIntBetween(random, maxThreadCount, maxThreadCount + 4); + builder.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount); + builder.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount); + } + if (value % 2 == 1) { + builder.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), false); } return builder; } - private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) { + protected static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { builder.put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),