Skip to content

Commit 978557a

Browse files
authored
[Bug] During heavy indexing load it's possible for lazy rollover to trigger multiple rollovers (#109636) (#110031)
Let’s say we have `my-metrics` data stream which is receiving a lot of indexing requests. The following scenario can result in multiple unnecessary rollovers: 1. We update the mapping and mark it to be lazy rolled over 2. We receive 5 bulk index requests that all contain a write request for this data stream. 3. Each of these requests are being picked up “at the same time”, they see that the data stream needs to be rolled over and they issue a lazy rollover request. 4. Currently, data stream my-metrics has 5 tasks executing an unconditional rollover. 5. The data stream gets rolled over 5 times instead of one. This scenario is captured in the `LazyRolloverDuringDisruptionIT`. We have witnessed this also in the wild, where a data stream was rolled over 281 times extra resulting in 281 empty indices. This PR proposes: - To create a new task queue with a more efficient executor that further batches/deduplicates the requests. - We add two safe guards, the first to ensure we will not enqueue the rollover task if we see that a rollover has occurred already. The second safe guard is during task execution, if we see that the data stream does not have the `rolloverOnWrite` flag set to `true` we skip the rollover. - When we skip the rollover we return the following response: ``` { "acknowledged": true, "shards_acknowledged": true, "old_index": ".ds-my-data-stream-2099.05.07-000002", "new_index": ".ds-my-data-stream-2099.05.07-000002", "rolled_over": false, "dry_run": false, "lazy": false, } ```
1 parent 863f0ce commit 978557a

File tree

6 files changed

+330
-26
lines changed

6 files changed

+330
-26
lines changed

docs/changelog/109636.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 109636
2+
summary: "Ensure a lazy rollover request will rollover the target data stream once."
3+
area: Data streams
4+
type: bug
5+
issues: []
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.datastreams;
10+
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.DocWriteRequest;
13+
import org.elasticsearch.action.DocWriteResponse;
14+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
15+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
16+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
17+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
18+
import org.elasticsearch.action.index.IndexRequest;
19+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
20+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
21+
import org.elasticsearch.cluster.metadata.DataStream;
22+
import org.elasticsearch.plugins.Plugin;
23+
import org.elasticsearch.test.ESIntegTestCase;
24+
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
25+
import org.elasticsearch.test.disruption.SingleNodeDisruption;
26+
import org.elasticsearch.xcontent.XContentType;
27+
28+
import java.util.Collection;
29+
import java.util.List;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutionException;
32+
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.is;
35+
36+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
37+
public class LazyRolloverDuringDisruptionIT extends ESIntegTestCase {
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> nodePlugins() {
41+
return List.of(DataStreamsPlugin.class);
42+
}
43+
44+
public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedException {
45+
String masterNode = internalCluster().startMasterOnlyNode();
46+
internalCluster().startDataOnlyNodes(3);
47+
ensureStableCluster(4);
48+
49+
String dataStreamName = "my-data-stream";
50+
createDataStream(dataStreamName);
51+
52+
// Mark it to lazy rollover
53+
new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute().get();
54+
55+
// Verify that the data stream is marked for rollover and that it has currently one index
56+
DataStream dataStream = getDataStream(dataStreamName);
57+
assertThat(dataStream.rolloverOnWrite(), equalTo(true));
58+
assertThat(dataStream.getIndices().size(), equalTo(1));
59+
60+
// Introduce a disruption to the master node that should delay the rollover execution
61+
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), masterNode, 100, 200, 30000, 60000);
62+
internalCluster().setDisruptionScheme(masterNodeDisruption);
63+
masterNodeDisruption.startDisrupting();
64+
65+
// Start indexing operations
66+
int docs = randomIntBetween(5, 10);
67+
CountDownLatch countDownLatch = new CountDownLatch(docs);
68+
for (int i = 0; i < docs; i++) {
69+
var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
70+
final String doc = "{ \"@timestamp\": \"2099-05-06T16:21:15.000Z\", \"message\": \"something cool happened\" }";
71+
indexRequest.source(doc, XContentType.JSON);
72+
client().index(indexRequest, new ActionListener<>() {
73+
@Override
74+
public void onResponse(DocWriteResponse docWriteResponse) {
75+
countDownLatch.countDown();
76+
}
77+
78+
@Override
79+
public void onFailure(Exception e) {
80+
fail("Indexing request should have succeeded eventually, failed with " + e.getMessage());
81+
}
82+
});
83+
}
84+
85+
// End the disruption so that all pending tasks will complete
86+
masterNodeDisruption.stopDisrupting();
87+
88+
// Wait for all the indexing requests to be processed successfully
89+
countDownLatch.await();
90+
91+
// Verify that the rollover has happened once
92+
dataStream = getDataStream(dataStreamName);
93+
assertThat(dataStream.rolloverOnWrite(), equalTo(false));
94+
assertThat(dataStream.getIndices().size(), equalTo(2));
95+
}
96+
97+
private DataStream getDataStream(String dataStreamName) {
98+
return client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName }))
99+
.actionGet()
100+
.getDataStreams()
101+
.get(0)
102+
.getDataStream();
103+
}
104+
105+
private void createDataStream(String dataStreamName) throws InterruptedException, ExecutionException {
106+
final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest =
107+
new TransportPutComposableIndexTemplateAction.Request("my-template");
108+
putComposableTemplateRequest.indexTemplate(
109+
ComposableIndexTemplate.builder()
110+
.indexPatterns(List.of(dataStreamName))
111+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
112+
.build()
113+
);
114+
final AcknowledgedResponse putComposableTemplateResponse = client().execute(
115+
TransportPutComposableIndexTemplateAction.TYPE,
116+
putComposableTemplateRequest
117+
).actionGet();
118+
assertThat(putComposableTemplateResponse.isAcknowledged(), is(true));
119+
120+
final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
121+
final AcknowledgedResponse createDataStreamResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
122+
.get();
123+
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
124+
}
125+
}

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java

Lines changed: 182 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,50 @@
77
*/
88
package org.elasticsearch.action.admin.indices.rollover;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.ActionType;
1214
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
1315
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.ActiveShardsObserver;
1417
import org.elasticsearch.client.internal.Client;
1518
import org.elasticsearch.cluster.ClusterState;
19+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
20+
import org.elasticsearch.cluster.ClusterStateTaskListener;
1621
import org.elasticsearch.cluster.metadata.DataStream;
1722
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1823
import org.elasticsearch.cluster.metadata.Metadata;
1924
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
2025
import org.elasticsearch.cluster.routing.allocation.AllocationService;
26+
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
2127
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
29+
import org.elasticsearch.common.Priority;
30+
import org.elasticsearch.common.Strings;
31+
import org.elasticsearch.common.collect.Iterators;
2232
import org.elasticsearch.common.inject.Inject;
2333
import org.elasticsearch.features.NodeFeature;
2434
import org.elasticsearch.tasks.CancellableTask;
2535
import org.elasticsearch.tasks.Task;
2636
import org.elasticsearch.threadpool.ThreadPool;
2737
import org.elasticsearch.transport.TransportService;
2838

39+
import java.time.Instant;
40+
import java.util.ArrayList;
41+
import java.util.HashMap;
42+
import java.util.List;
2943
import java.util.Map;
44+
import java.util.function.Consumer;
3045

3146
/**
3247
* API that lazily rolls over a data stream that has the flag {@link DataStream#rolloverOnWrite()} enabled. These requests always
3348
* originate from requests that write into the data stream.
3449
*/
3550
public final class LazyRolloverAction extends ActionType<RolloverResponse> {
3651

52+
private static final Logger logger = LogManager.getLogger(LazyRolloverAction.class);
53+
3754
public static final NodeFeature DATA_STREAM_LAZY_ROLLOVER = new NodeFeature("data_stream.rollover.lazy");
3855

3956
public static final LazyRolloverAction INSTANCE = new LazyRolloverAction();
@@ -50,6 +67,8 @@ public String name() {
5067

5168
public static final class TransportLazyRolloverAction extends TransportRolloverAction {
5269

70+
private final MasterServiceTaskQueue<LazyRolloverTask> lazyRolloverTaskQueue;
71+
5372
@Inject
5473
public TransportLazyRolloverAction(
5574
TransportService transportService,
@@ -76,6 +95,11 @@ public TransportLazyRolloverAction(
7695
metadataDataStreamsService,
7796
dataStreamAutoShardingService
7897
);
98+
this.lazyRolloverTaskQueue = clusterService.createTaskQueue(
99+
"lazy-rollover",
100+
Priority.NORMAL,
101+
new LazyRolloverExecutor(clusterService, allocationService, rolloverService, threadPool)
102+
);
79103
}
80104

81105
@Override
@@ -93,6 +117,12 @@ protected void masterOperation(
93117
: "The auto rollover action does not expect any other parameters in the request apart from the data stream name";
94118

95119
Metadata metadata = clusterState.metadata();
120+
DataStream dataStream = metadata.dataStreams().get(rolloverRequest.getRolloverTarget());
121+
// Skip submitting the task if we detect that the lazy rollover has been already executed.
122+
if (dataStream.rolloverOnWrite() == false) {
123+
listener.onResponse(noopLazyRolloverResponse(dataStream));
124+
return;
125+
}
96126
// We evaluate the names of the source index as well as what our newly created index would be.
97127
final MetadataRolloverService.NameResolution trialRolloverNames = MetadataRolloverService.resolveRolloverNames(
98128
clusterState,
@@ -107,28 +137,164 @@ protected void masterOperation(
107137

108138
assert metadata.dataStreams().containsKey(rolloverRequest.getRolloverTarget()) : "Auto-rollover applies only to data streams";
109139

110-
final RolloverResponse trialRolloverResponse = new RolloverResponse(
111-
trialSourceIndexName,
112-
trialRolloverIndexName,
113-
Map.of(),
114-
false,
115-
false,
116-
false,
117-
false,
118-
false
119-
);
120-
121140
String source = "lazy_rollover source [" + trialSourceIndexName + "] to target [" + trialRolloverIndexName + "]";
122141
// We create a new rollover request to ensure that it doesn't contain any other parameters apart from the data stream name
123142
// This will provide a more resilient user experience
124-
RolloverTask rolloverTask = new RolloverTask(
125-
new RolloverRequest(rolloverRequest.getRolloverTarget(), null),
143+
var newRolloverRequest = new RolloverRequest(rolloverRequest.getRolloverTarget(), null);
144+
newRolloverRequest.setIndicesOptions(rolloverRequest.indicesOptions());
145+
LazyRolloverTask rolloverTask = new LazyRolloverTask(newRolloverRequest, listener);
146+
lazyRolloverTaskQueue.submitTask(source, rolloverTask, rolloverRequest.masterNodeTimeout());
147+
}
148+
}
149+
150+
/**
151+
* A lazy rollover task holds the rollover request and the listener.
152+
*/
153+
record LazyRolloverTask(RolloverRequest rolloverRequest, ActionListener<RolloverResponse> listener)
154+
implements
155+
ClusterStateTaskListener {
156+
157+
@Override
158+
public void onFailure(Exception e) {
159+
listener.onFailure(e);
160+
}
161+
}
162+
163+
/**
164+
* Performs a lazy rollover when required and notifies the listener. Due to the nature of the lazy rollover we are able
165+
* to perform certain optimisations like identifying duplicate requests and executing them once. This is an optimisation
166+
* that can work since we do not take into consideration any stats or auto-sharding conditions here.
167+
*/
168+
record LazyRolloverExecutor(
169+
ClusterService clusterService,
170+
AllocationService allocationService,
171+
MetadataRolloverService rolloverService,
172+
ThreadPool threadPool
173+
) implements ClusterStateTaskExecutor<LazyRolloverTask> {
174+
175+
@Override
176+
public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecutionContext) {
177+
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
178+
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
179+
var state = batchExecutionContext.initialState();
180+
Map<RolloverRequest, List<TaskContext<LazyRolloverTask>>> groupedRequests = new HashMap<>();
181+
for (final var taskContext : batchExecutionContext.taskContexts()) {
182+
groupedRequests.computeIfAbsent(taskContext.getTask().rolloverRequest(), ignored -> new ArrayList<>()).add(taskContext);
183+
}
184+
for (final var entry : groupedRequests.entrySet()) {
185+
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts = entry.getValue();
186+
try {
187+
RolloverRequest rolloverRequest = entry.getKey();
188+
state = executeTask(state, rolloverRequest, results, rolloverTaskContexts, listener);
189+
} catch (Exception e) {
190+
rolloverTaskContexts.forEach(taskContext -> taskContext.onFailure(e));
191+
} finally {
192+
rolloverTaskContexts.forEach(taskContext -> taskContext.captureResponseHeaders().close());
193+
}
194+
}
195+
196+
if (state != batchExecutionContext.initialState()) {
197+
var reason = new StringBuilder();
198+
Strings.collectionToDelimitedStringWithLimit(
199+
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
200+
",",
201+
"lazy bulk rollover [",
202+
"]",
203+
1024,
204+
reason
205+
);
206+
try (var ignored = batchExecutionContext.dropHeadersContext()) {
207+
state = allocationService.reroute(state, reason.toString(), listener.reroute());
208+
}
209+
} else {
210+
listener.noRerouteNeeded();
211+
}
212+
return state;
213+
}
214+
215+
public ClusterState executeTask(
216+
ClusterState currentState,
217+
RolloverRequest rolloverRequest,
218+
List<MetadataRolloverService.RolloverResult> results,
219+
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts,
220+
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
221+
) throws Exception {
222+
223+
// If the data stream has been rolled over since it was marked for lazy rollover, this operation is a noop
224+
final DataStream dataStream = currentState.metadata().dataStreams().get(rolloverRequest.getRolloverTarget());
225+
assert dataStream != null;
226+
227+
if (dataStream.rolloverOnWrite() == false) {
228+
var noopResponse = noopLazyRolloverResponse(dataStream);
229+
notifyAllListeners(rolloverTaskContexts, context -> context.getTask().listener.onResponse(noopResponse));
230+
return currentState;
231+
}
232+
233+
// Perform the actual rollover
234+
final var rolloverResult = rolloverService.rolloverClusterState(
235+
currentState,
236+
rolloverRequest.getRolloverTarget(),
237+
rolloverRequest.getNewIndexName(),
238+
rolloverRequest.getCreateIndexRequest(),
239+
List.of(),
240+
Instant.now(),
241+
false,
242+
false,
126243
null,
127-
trialRolloverResponse,
128244
null,
129-
listener
245+
rolloverRequest.targetsFailureStore()
130246
);
131-
submitRolloverTask(rolloverRequest, source, rolloverTask);
247+
results.add(rolloverResult);
248+
logger.trace("lazy rollover result [{}]", rolloverResult);
249+
250+
final var rolloverIndexName = rolloverResult.rolloverIndexName();
251+
final var sourceIndexName = rolloverResult.sourceIndexName();
252+
253+
final var waitForActiveShardsTimeout = rolloverRequest.masterNodeTimeout().millis() < 0
254+
? null
255+
: rolloverRequest.masterNodeTimeout();
256+
257+
notifyAllListeners(rolloverTaskContexts, context -> {
258+
// Now assuming we have a new state and the name of the rolled over index, we need to wait for the configured number of
259+
// active shards, as well as return the names of the indices that were rolled/created
260+
ActiveShardsObserver.waitForActiveShards(
261+
clusterService,
262+
new String[] { rolloverIndexName },
263+
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
264+
waitForActiveShardsTimeout,
265+
allocationActionMultiListener.delay(context.getTask().listener())
266+
.map(
267+
isShardsAcknowledged -> new RolloverResponse(
268+
// Note that we use the actual rollover result for these, because even though we're single threaded,
269+
// it's possible for the rollover names generated before the actual rollover to be different due to
270+
// things like date resolution
271+
sourceIndexName,
272+
rolloverIndexName,
273+
Map.of(),
274+
false,
275+
true,
276+
true,
277+
isShardsAcknowledged,
278+
false
279+
)
280+
)
281+
);
282+
});
283+
284+
// Return the new rollover cluster state, which includes the changes that create the new index
285+
return rolloverResult.clusterState();
132286
}
133287
}
288+
289+
private static void notifyAllListeners(
290+
List<ClusterStateTaskExecutor.TaskContext<LazyRolloverTask>> taskContexts,
291+
Consumer<ClusterStateTaskExecutor.TaskContext<LazyRolloverTask>> onPublicationSuccess
292+
) {
293+
taskContexts.forEach(context -> context.success(() -> onPublicationSuccess.accept(context)));
294+
}
295+
296+
private static RolloverResponse noopLazyRolloverResponse(DataStream dataStream) {
297+
String latestWriteIndex = dataStream.getWriteIndex().getName();
298+
return new RolloverResponse(latestWriteIndex, latestWriteIndex, Map.of(), false, false, true, true, false);
299+
}
134300
}

0 commit comments

Comments
 (0)