Skip to content

Commit d04f1ee

Browse files
committed
Merge branch 'main' into do_not_retry_cbe
# Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
2 parents 2283ca1 + 1b2f565 commit d04f1ee

File tree

6 files changed

+142
-74
lines changed

6 files changed

+142
-74
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,11 @@ public ClusterState executeTask(
268268

269269
final var rolloverIndexName = rolloverResult.rolloverIndexName();
270270
final var sourceIndexName = rolloverResult.sourceIndexName();
271+
logger.info(
272+
"rolling over data stream [{}] to index [{}] because it was marked for lazy rollover",
273+
dataStream.getName(),
274+
rolloverIndexName
275+
);
271276

272277
final var waitForActiveShardsTimeout = rolloverRequest.masterNodeTimeout().millis() < 0
273278
? null

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 63 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -221,45 +221,17 @@ protected void masterOperation(
221221
final String trialRolloverIndexName = trialRolloverNames.rolloverName();
222222
MetadataCreateIndexService.validateIndexName(trialRolloverIndexName, projectMetadata, projectState.routingTable());
223223

224-
boolean isDataStream = projectMetadata.dataStreams().containsKey(resolvedRolloverTarget.resource());
225224
if (rolloverRequest.isLazy()) {
226-
if (isDataStream == false || rolloverRequest.getConditions().hasConditions()) {
227-
String message;
228-
if (isDataStream) {
229-
message = "Lazy rollover can be used only without any conditions."
230-
+ " Please remove the conditions from the request body or the query parameter 'lazy'.";
231-
} else if (rolloverRequest.getConditions().hasConditions() == false) {
232-
message = "Lazy rollover can be applied only on a data stream." + " Please remove the query parameter 'lazy'.";
233-
} else {
234-
message = "Lazy rollover can be applied only on a data stream with no conditions."
235-
+ " Please remove the query parameter 'lazy'.";
236-
}
237-
listener.onFailure(new IllegalArgumentException(message));
238-
return;
239-
}
240-
if (rolloverRequest.isDryRun() == false) {
241-
metadataDataStreamsService.setRolloverOnWrite(
242-
projectState.projectId(),
243-
resolvedRolloverTarget.resource(),
244-
true,
245-
targetFailureStore,
246-
rolloverRequest.ackTimeout(),
247-
rolloverRequest.masterNodeTimeout(),
248-
listener.map(
249-
response -> new RolloverResponse(
250-
trialSourceIndexName,
251-
trialRolloverIndexName,
252-
Map.of(),
253-
false,
254-
false,
255-
response.isAcknowledged(),
256-
false,
257-
response.isAcknowledged()
258-
)
259-
)
260-
);
261-
return;
262-
}
225+
markForLazyRollover(
226+
rolloverRequest,
227+
listener,
228+
projectMetadata,
229+
resolvedRolloverTarget,
230+
targetFailureStore,
231+
trialSourceIndexName,
232+
trialRolloverIndexName
233+
);
234+
return;
263235
}
264236

265237
final IndexAbstraction rolloverTargetAbstraction = projectMetadata.getIndicesLookup().get(resolvedRolloverTarget.resource());
@@ -353,7 +325,7 @@ protected void masterOperation(
353325
false,
354326
false,
355327
false,
356-
rolloverRequest.isLazy()
328+
false
357329
);
358330

359331
// If this is a dry run, return with the results without invoking a cluster state update
@@ -382,6 +354,58 @@ protected void masterOperation(
382354
);
383355
}
384356

357+
private void markForLazyRollover(
358+
RolloverRequest rolloverRequest,
359+
ActionListener<RolloverResponse> listener,
360+
ProjectMetadata projectMetadata,
361+
ResolvedExpression resolvedRolloverTarget,
362+
boolean targetFailureStore,
363+
String trialSourceIndexName,
364+
String trialRolloverIndexName
365+
) {
366+
boolean isDataStream = projectMetadata.dataStreams().containsKey(resolvedRolloverTarget.resource());
367+
if (isDataStream == false || rolloverRequest.getConditions().hasConditions()) {
368+
String message;
369+
if (isDataStream) {
370+
message = "Lazy rollover can be used only without any conditions."
371+
+ " Please remove the conditions from the request body or the query parameter 'lazy'.";
372+
} else if (rolloverRequest.getConditions().hasConditions() == false) {
373+
message = "Lazy rollover can be applied only on a data stream. Please remove the query parameter 'lazy'.";
374+
} else {
375+
message = "Lazy rollover can be applied only on a data stream with no conditions."
376+
+ " Please remove the query parameter 'lazy'.";
377+
}
378+
listener.onFailure(new IllegalArgumentException(message));
379+
return;
380+
}
381+
if (rolloverRequest.isDryRun()) {
382+
listener.onResponse(
383+
new RolloverResponse(trialSourceIndexName, trialRolloverIndexName, Map.of(), true, false, false, false, true)
384+
);
385+
return;
386+
}
387+
metadataDataStreamsService.setRolloverOnWrite(
388+
projectMetadata.id(),
389+
resolvedRolloverTarget.resource(),
390+
true,
391+
targetFailureStore,
392+
rolloverRequest.ackTimeout(),
393+
rolloverRequest.masterNodeTimeout(),
394+
listener.map(
395+
response -> new RolloverResponse(
396+
trialSourceIndexName,
397+
trialRolloverIndexName,
398+
Map.of(),
399+
false,
400+
false,
401+
response.isAcknowledged(),
402+
false,
403+
true
404+
)
405+
)
406+
);
407+
}
408+
385409
private void initializeFailureStore(
386410
ProjectId projectId,
387411
RolloverRequest rolloverRequest,

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -470,19 +470,39 @@ public void testLazyRollover() throws Exception {
470470
mockMetadataDataStreamService,
471471
dataStreamAutoShardingService
472472
);
473-
final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
474-
RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
475-
rolloverRequest.lazy(true);
476-
transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
477-
RolloverResponse rolloverResponse = future.actionGet();
478-
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
479-
assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
480-
assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
481-
assertThat(rolloverResponse.isLazy(), equalTo(true));
482-
assertThat(rolloverResponse.isDryRun(), equalTo(false));
483-
assertThat(rolloverResponse.isRolledOver(), equalTo(false));
484-
assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
485-
assertThat(rolloverResponse.isAcknowledged(), is(true));
473+
{
474+
// Regular lazy rollover
475+
final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
476+
RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
477+
rolloverRequest.lazy(true);
478+
transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
479+
RolloverResponse rolloverResponse = future.actionGet();
480+
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
481+
assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
482+
assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
483+
assertThat(rolloverResponse.isLazy(), equalTo(true));
484+
assertThat(rolloverResponse.isDryRun(), equalTo(false));
485+
assertThat(rolloverResponse.isRolledOver(), equalTo(false));
486+
assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
487+
assertThat(rolloverResponse.isAcknowledged(), is(true));
488+
}
489+
{
490+
// Dry-run lazy rollover
491+
final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
492+
RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
493+
rolloverRequest.lazy(true);
494+
rolloverRequest.dryRun(true);
495+
transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
496+
RolloverResponse rolloverResponse = future.actionGet();
497+
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
498+
assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
499+
assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
500+
assertThat(rolloverResponse.isLazy(), equalTo(true));
501+
assertThat(rolloverResponse.isDryRun(), equalTo(true));
502+
assertThat(rolloverResponse.isRolledOver(), equalTo(false));
503+
assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
504+
assertThat(rolloverResponse.isAcknowledged(), is(false));
505+
}
486506
}
487507

488508
public void testLazyRolloverFails() throws Exception {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.cluster.metadata.Template;
3535
import org.elasticsearch.cluster.node.DiscoveryNode;
3636
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.Strings;
3738
import org.elasticsearch.common.regex.Regex;
3839
import org.elasticsearch.common.settings.Settings;
3940
import org.elasticsearch.core.Nullable;
@@ -842,9 +843,8 @@ public void onFailure(Exception e) {
842843

843844
void onRolloversBulkResponse(Collection<RolloverResponse> rolloverResponses) {
844845
for (RolloverResponse rolloverResponse : rolloverResponses) {
845-
if (rolloverResponse.isRolledOver() == false) {
846-
logger.warn("rollover of the [{}] index [{}] failed", getOrigin(), rolloverResponse.getOldIndex());
847-
}
846+
assert rolloverResponse.isLazy() && rolloverResponse.isRolledOver() == false
847+
: Strings.format("Expected rollover of the [%s] index [%s] to be lazy", getOrigin(), rolloverResponse.getOldIndex());
848848
}
849849
}
850850

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,13 @@ final void startComputeOnDataNodes(
8282
final long startTimeInNanos = System.nanoTime();
8383
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
8484
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
85-
TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
86-
final int failedShards = shardFailures.size();
8785
return new ComputeResponse(
8886
profiles,
89-
took,
87+
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
9088
targetShards.totalShards(),
91-
targetShards.totalShards() - failedShards,
89+
targetShards.totalShards() - shardFailures.size(),
9290
targetShards.skippedShards(),
93-
failedShards
91+
shardFailures.size()
9492
);
9593
}))) {
9694
for (TargetShard shard : targetShards.shards.values()) {
@@ -129,8 +127,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
129127
reportedFailure = true;
130128
reportFailures(computeListener);
131129
} else {
132-
var nodeRequests = selectNodeRequests(targetShards);
133-
for (NodeRequest request : nodeRequests) {
130+
for (NodeRequest request : selectNodeRequests(targetShards)) {
134131
sendOneNodeRequest(targetShards, computeListener, request);
135132
}
136133
}
@@ -244,17 +241,11 @@ TargetShard getShard(ShardId shardId) {
244241
/**
245242
* (Remaining) allocated nodes of a given shard id and its alias filter
246243
*/
247-
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {
244+
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {}
248245

249-
}
250-
251-
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {
252-
253-
}
254-
255-
private record ShardFailure(boolean fatal, Exception failure) {
246+
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {}
256247

257-
}
248+
private record ShardFailure(boolean fatal, Exception failure) {}
258249

259250
/**
260251
* Selects the next nodes to send requests to. Limits to at most one outstanding request per node.

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.HashMap;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.Objects;
4546
import java.util.Queue;
4647
import java.util.Set;
4748
import java.util.concurrent.Executor;
@@ -87,7 +88,7 @@ public void setThreadPool() {
8788
}
8889

8990
@After
90-
public void shutdownThreadPool() throws Exception {
91+
public void shutdownThreadPool() {
9192
terminate(threadPool);
9293
}
9394

@@ -111,8 +112,7 @@ public void testOnePass() {
111112
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
112113
var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
113114
sent.add(new NodeRequest(node, shardIds, aliasFilters));
114-
var resp = new DataNodeComputeResponse(List.of(), Map.of());
115-
runWithDelay(() -> listener.onResponse(resp));
115+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
116116
});
117117
safeGet(future);
118118
assertThat(sent.size(), equalTo(2));
@@ -245,6 +245,34 @@ public void testAllowPartialResults() {
245245
assertThat(resp.successfulShards, equalTo(1));
246246
}
247247

248+
public void testNonFatalErrorIsRetriedOnAnotherShard() {
249+
var targetShards = List.of(targetShard(shard1, node1, node2));
250+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
251+
var response = safeGet(sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
252+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
253+
if (Objects.equals(node1, node)) {
254+
runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false));
255+
} else {
256+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
257+
}
258+
}));
259+
assertThat(response.totalShards, equalTo(1));
260+
assertThat(response.successfulShards, equalTo(1));
261+
assertThat(response.failedShards, equalTo(0));
262+
assertThat(sent.size(), equalTo(2));
263+
}
264+
265+
public void testNonFatalFailedOnAllNodes() {
266+
var targetShards = List.of(targetShard(shard1, node1, node2));
267+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
268+
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
269+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
270+
runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false));
271+
});
272+
expectThrows(RuntimeException.class, equalTo("test request level non fatal failure"), future::actionGet);
273+
assertThat(sent.size(), equalTo(2));
274+
}
275+
248276
public void testDoNotRetryCircuitBreakerException() {
249277
var targetShards = List.of(targetShard(shard1, node1, node2));
250278
var sent = ConcurrentCollections.newQueue();

0 commit comments

Comments
 (0)