Skip to content

Commit 60992fe

Browse files
authored
Avoid hoarding cluster state references during rollover (#124107) (#124257)
By keeping a list of all the rollover results in a rollover request batch, we were keeping references to all the intermediate cluster states that we built. We've seen this list take up ~1.4GB with 600 rollover requests in one batch. We only kept the list of results to compute the "reason" for the allocation reroute, so we can easily drop the cluster state reference from the list and only keep what we need. Fixes #123893
1 parent f393fbe commit 60992fe

File tree

3 files changed

+14
-24
lines changed

3 files changed

+14
-24
lines changed

docs/changelog/124107.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 124107
2+
summary: Avoid hoarding cluster state references during rollover
3+
area: Indices APIs
4+
type: bug
5+
issues:
6+
- 123893

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3333
import org.elasticsearch.common.Priority;
3434
import org.elasticsearch.common.Strings;
35-
import org.elasticsearch.common.collect.Iterators;
3635
import org.elasticsearch.injection.guice.Inject;
3736
import org.elasticsearch.tasks.CancellableTask;
3837
import org.elasticsearch.tasks.Task;
@@ -183,7 +182,7 @@ record LazyRolloverExecutor(
183182
@Override
184183
public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecutionContext) {
185184
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
186-
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
185+
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
187186
var state = batchExecutionContext.initialState();
188187
Map<RolloverRequest, List<TaskContext<LazyRolloverTask>>> groupedRequests = new HashMap<>();
189188
for (final var taskContext : batchExecutionContext.taskContexts()) {
@@ -203,14 +202,7 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
203202

204203
if (state != batchExecutionContext.initialState()) {
205204
var reason = new StringBuilder();
206-
Strings.collectionToDelimitedStringWithLimit(
207-
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
208-
",",
209-
"lazy bulk rollover [",
210-
"]",
211-
1024,
212-
reason
213-
);
205+
Strings.collectionToDelimitedStringWithLimit(results, ",", "lazy bulk rollover [", "]", 1024, reason);
214206
try (var ignored = batchExecutionContext.dropHeadersContext()) {
215207
state = allocationService.reroute(state, reason.toString(), listener.reroute());
216208
}
@@ -223,7 +215,7 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
223215
public ClusterState executeTask(
224216
ClusterState currentState,
225217
RolloverRequest rolloverRequest,
226-
List<MetadataRolloverService.RolloverResult> results,
218+
ArrayList<String> results,
227219
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts,
228220
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
229221
) throws Exception {
@@ -260,7 +252,7 @@ public ClusterState executeTask(
260252
null,
261253
isFailureStoreRollover
262254
);
263-
results.add(rolloverResult);
255+
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
264256
logger.trace("lazy rollover result [{}]", rolloverResult);
265257

266258
final var rolloverIndexName = rolloverResult.rolloverIndexName();

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
4848
import org.elasticsearch.common.Priority;
4949
import org.elasticsearch.common.Strings;
50-
import org.elasticsearch.common.collect.Iterators;
5150
import org.elasticsearch.common.unit.ByteSizeValue;
5251
import org.elasticsearch.common.util.concurrent.EsExecutors;
5352
import org.elasticsearch.core.Nullable;
@@ -483,7 +482,7 @@ record RolloverExecutor(
483482
@Override
484483
public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionContext) {
485484
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
486-
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
485+
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
487486
var state = batchExecutionContext.initialState();
488487
for (final var taskContext : batchExecutionContext.taskContexts()) {
489488
try (var ignored = taskContext.captureResponseHeaders()) {
@@ -495,14 +494,7 @@ public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionCo
495494

496495
if (state != batchExecutionContext.initialState()) {
497496
var reason = new StringBuilder();
498-
Strings.collectionToDelimitedStringWithLimit(
499-
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
500-
",",
501-
"bulk rollover [",
502-
"]",
503-
1024,
504-
reason
505-
);
497+
Strings.collectionToDelimitedStringWithLimit(results, ",", "bulk rollover [", "]", 1024, reason);
506498
try (var ignored = batchExecutionContext.dropHeadersContext()) {
507499
state = allocationService.reroute(state, reason.toString(), listener.reroute());
508500
}
@@ -514,7 +506,7 @@ public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionCo
514506

515507
public ClusterState executeTask(
516508
ClusterState currentState,
517-
List<MetadataRolloverService.RolloverResult> results,
509+
ArrayList<String> results,
518510
TaskContext<RolloverTask> rolloverTaskContext,
519511
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
520512
) throws Exception {
@@ -586,7 +578,7 @@ public ClusterState executeTask(
586578
rolloverTask.autoShardingResult(),
587579
targetFailureStore
588580
);
589-
results.add(rolloverResult);
581+
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
590582
logger.trace("rollover result [{}]", rolloverResult);
591583

592584
final var rolloverIndexName = rolloverResult.rolloverIndexName();

0 commit comments

Comments
 (0)