Skip to content

Commit b7f5450

Browse files
committed
Avoid hoarding cluster state references during rollover (elastic#124107)
By keeping a list of all the rollover results in a rollover request batch, we were keeping references to all the intermediate cluster states that we built. We've seen this list take up ~1.4GB with 600 rollover requests in one batch. We only kept the list of results to compute the "reason" for the allocation reroute, so we can easily drop the cluster state reference from the list and only keep what we need. Fixes elastic#123893 (cherry picked from commit ff6465b) # Conflicts: # server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java
1 parent 4dfc977 commit b7f5450

File tree

3 files changed

+14
-24
lines changed

3 files changed

+14
-24
lines changed

docs/changelog/124107.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 124107
2+
summary: Avoid hoarding cluster state references during rollover
3+
area: Indices APIs
4+
type: bug
5+
issues:
6+
- 123893

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3131
import org.elasticsearch.common.Priority;
3232
import org.elasticsearch.common.Strings;
33-
import org.elasticsearch.common.collect.Iterators;
3433
import org.elasticsearch.features.NodeFeature;
3534
import org.elasticsearch.injection.guice.Inject;
3635
import org.elasticsearch.tasks.CancellableTask;
@@ -178,7 +177,7 @@ record LazyRolloverExecutor(
178177
@Override
179178
public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecutionContext) {
180179
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
181-
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
180+
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
182181
var state = batchExecutionContext.initialState();
183182
Map<RolloverRequest, List<TaskContext<LazyRolloverTask>>> groupedRequests = new HashMap<>();
184183
for (final var taskContext : batchExecutionContext.taskContexts()) {
@@ -198,14 +197,7 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
198197

199198
if (state != batchExecutionContext.initialState()) {
200199
var reason = new StringBuilder();
201-
Strings.collectionToDelimitedStringWithLimit(
202-
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
203-
",",
204-
"lazy bulk rollover [",
205-
"]",
206-
1024,
207-
reason
208-
);
200+
Strings.collectionToDelimitedStringWithLimit(results, ",", "lazy bulk rollover [", "]", 1024, reason);
209201
try (var ignored = batchExecutionContext.dropHeadersContext()) {
210202
state = allocationService.reroute(state, reason.toString(), listener.reroute());
211203
}
@@ -218,7 +210,7 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
218210
public ClusterState executeTask(
219211
ClusterState currentState,
220212
RolloverRequest rolloverRequest,
221-
List<MetadataRolloverService.RolloverResult> results,
213+
ArrayList<String> results,
222214
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts,
223215
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
224216
) throws Exception {
@@ -248,7 +240,7 @@ public ClusterState executeTask(
248240
null,
249241
rolloverRequest.targetsFailureStore()
250242
);
251-
results.add(rolloverResult);
243+
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
252244
logger.trace("lazy rollover result [{}]", rolloverResult);
253245

254246
final var rolloverIndexName = rolloverResult.rolloverIndexName();

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
4646
import org.elasticsearch.common.Priority;
4747
import org.elasticsearch.common.Strings;
48-
import org.elasticsearch.common.collect.Iterators;
4948
import org.elasticsearch.common.unit.ByteSizeValue;
5049
import org.elasticsearch.common.util.concurrent.EsExecutors;
5150
import org.elasticsearch.core.Nullable;
@@ -455,7 +454,7 @@ record RolloverExecutor(
455454
@Override
456455
public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionContext) {
457456
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
458-
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
457+
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
459458
var state = batchExecutionContext.initialState();
460459
for (final var taskContext : batchExecutionContext.taskContexts()) {
461460
try (var ignored = taskContext.captureResponseHeaders()) {
@@ -467,14 +466,7 @@ public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionCo
467466

468467
if (state != batchExecutionContext.initialState()) {
469468
var reason = new StringBuilder();
470-
Strings.collectionToDelimitedStringWithLimit(
471-
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
472-
",",
473-
"bulk rollover [",
474-
"]",
475-
1024,
476-
reason
477-
);
469+
Strings.collectionToDelimitedStringWithLimit(results, ",", "bulk rollover [", "]", 1024, reason);
478470
try (var ignored = batchExecutionContext.dropHeadersContext()) {
479471
state = allocationService.reroute(state, reason.toString(), listener.reroute());
480472
}
@@ -486,7 +478,7 @@ public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionCo
486478

487479
public ClusterState executeTask(
488480
ClusterState currentState,
489-
List<MetadataRolloverService.RolloverResult> results,
481+
ArrayList<String> results,
490482
TaskContext<RolloverTask> rolloverTaskContext,
491483
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
492484
) throws Exception {
@@ -552,7 +544,7 @@ public ClusterState executeTask(
552544
rolloverTask.autoShardingResult(),
553545
rolloverRequest.targetsFailureStore()
554546
);
555-
results.add(rolloverResult);
547+
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
556548
logger.trace("rollover result [{}]", rolloverResult);
557549

558550
final var rolloverIndexName = rolloverResult.rolloverIndexName();

0 commit comments

Comments
 (0)