Skip to content

Commit cf22284

Browse files
authored
Adding a setting for data stream reindexing concurrency (#119484) (#119801)
1 parent 99b6e3d commit cf22284

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.settings.ClusterSettings;
1818
import org.elasticsearch.common.settings.IndexScopedSettings;
19+
import org.elasticsearch.common.settings.Setting;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.settings.SettingsFilter;
2122
import org.elasticsearch.common.settings.SettingsModule;
@@ -58,6 +59,7 @@
5859
import java.util.function.Supplier;
5960

6061
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
62+
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
6163

6264
public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
6365

@@ -153,4 +155,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
153155
return List.of();
154156
}
155157
}
158+
159+
@Override
160+
public List<Setting<?>> getSettings() {
161+
List<Setting<?>> pluginSettings = new ArrayList<>();
162+
pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
163+
return pluginSettings;
164+
}
156165
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.ElasticsearchException;
1113
import org.elasticsearch.action.ActionListener;
1214
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.cluster.metadata.DataStream;
2022
import org.elasticsearch.cluster.metadata.DataStreamAction;
2123
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.settings.Setting;
2225
import org.elasticsearch.core.Nullable;
2326
import org.elasticsearch.core.TimeValue;
2427
import org.elasticsearch.index.Index;
@@ -38,6 +41,19 @@
3841
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;
3942

4043
public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
44+
/*
45+
* This setting controls how many indices we reindex concurrently for a single data stream. This is not an overall limit -- if five
46+
* data streams are being reindexed, then each of them can have this many indices being reindexed at once. This setting is dynamic,
47+
* but changing it does not have an impact if the task is already running (unless the task is restarted or moves to another node).
48+
*/
49+
public static final Setting<Integer> MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING = Setting.intSetting(
50+
"migrate.max_concurrent_indices_reindexed_per_data_stream",
51+
1,
52+
1,
53+
Setting.Property.Dynamic,
54+
Setting.Property.NodeScope
55+
);
56+
private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class);
4157
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
4258
private final Client client;
4359
private final ClusterService clusterService;
@@ -164,8 +180,9 @@ private void reindexIndices(
164180
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
165181
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
166182
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
183+
final int maxConcurrentIndices = clusterService.getClusterSettings().get(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
167184
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
168-
final int maxConcurrentIndices = 1;
185+
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
169186
for (int i = 0; i < maxConcurrentIndices; i++) {
170187
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
171188
}

0 commit comments

Comments
 (0)