Skip to content

Commit 82da6e9

Browse files
authored
Adding a setting for data stream reindexing concurrency (#119484)
1 parent 385040d commit 82da6e9

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.settings.ClusterSettings;
1818
import org.elasticsearch.common.settings.IndexScopedSettings;
19+
import org.elasticsearch.common.settings.Setting;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.settings.SettingsFilter;
2122
import org.elasticsearch.common.settings.SettingsModule;
@@ -58,6 +59,7 @@
5859
import java.util.function.Supplier;
5960

6061
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
62+
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
6163

6264
public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
6365

@@ -153,4 +155,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
153155
return List.of();
154156
}
155157
}
158+
159+
@Override
160+
public List<Setting<?>> getSettings() {
161+
List<Setting<?>> pluginSettings = new ArrayList<>();
162+
pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
163+
return pluginSettings;
164+
}
156165
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.ElasticsearchException;
1113
import org.elasticsearch.action.ActionListener;
1214
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.cluster.metadata.DataStream;
2022
import org.elasticsearch.cluster.metadata.DataStreamAction;
2123
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.settings.Setting;
2225
import org.elasticsearch.core.Nullable;
2326
import org.elasticsearch.core.TimeValue;
2427
import org.elasticsearch.index.Index;
@@ -39,6 +42,19 @@
3942
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;
4043

4144
public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
45+
/*
46+
* This setting controls how many indices we reindex concurrently for a single data stream. This is not an overall limit -- if five
47+
* data streams are being reindexed, then each of them can have this many indices being reindexed at once. This setting is dynamic,
48+
* but changing it does not have an impact if the task is already running (unless the task is restarted or moves to another node).
49+
*/
50+
public static final Setting<Integer> MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING = Setting.intSetting(
51+
"migrate.max_concurrent_indices_reindexed_per_data_stream",
52+
1,
53+
1,
54+
Setting.Property.Dynamic,
55+
Setting.Property.NodeScope
56+
);
57+
private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class);
4258
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
4359
private final Client client;
4460
private final ClusterService clusterService;
@@ -165,8 +181,9 @@ private void reindexIndices(
165181
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
166182
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
167183
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
184+
final int maxConcurrentIndices = clusterService.getClusterSettings().get(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
168185
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
169-
final int maxConcurrentIndices = 1;
186+
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
170187
for (int i = 0; i < maxConcurrentIndices; i++) {
171188
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
172189
}

0 commit comments

Comments
 (0)