Skip to content

Commit 511ed2b

Browse files
Merge branch '9.1' into backport/9.1/pr-134673
2 parents 5faa974 + 2549a2c commit 511ed2b

File tree

5 files changed

+74
-20
lines changed

5 files changed

+74
-20
lines changed

docs/changelog/135051.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135051
2+
summary: Ban Limit + `MvExpand` before remote Enrich
3+
area: ES|QL
4+
type: bug
5+
issues: []

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,9 +452,6 @@ tests:
452452
- class: org.elasticsearch.simdvec.ESVectorUtilTests
453453
method: testSoarDistance
454454
issue: https://github.com/elastic/elasticsearch/issues/135139
455-
- class: org.elasticsearch.snapshots.SnapshotShutdownIT
456-
method: testSnapshotShutdownProgressTracker
457-
issue: https://github.com/elastic/elasticsearch/issues/134620
458455
- class: org.elasticsearch.upgrades.TextRollingUpgradeIT
459456
method: testIndexing {upgradedNodes=1}
460457
issue: https://github.com/elastic/elasticsearch/issues/135236

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
488488
final var otherNode = internalCluster().startDataOnlyNode();
489489
final var otherIndex = randomIdentifier();
490490
createIndexWithContent(otherIndex, indexSettings(numShards, 0).put(REQUIRE_NODE_NAME_SETTING, otherNode).build());
491+
indexAllShardsToAnEqualOrGreaterMinimumSize(otherIndex, ByteSizeValue.of(2, ByteSizeUnit.KB).getBytes());
491492
blockDataNode(repoName, otherNode);
492493

493494
final var nodeForRemoval = internalCluster().startDataOnlyNode(
@@ -498,6 +499,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
498499
final var indexName = randomIdentifier();
499500
createIndexWithContent(indexName, indexSettings(numShards, 0).put(REQUIRE_NODE_NAME_SETTING, nodeForRemoval).build());
500501
indexAllShardsToAnEqualOrGreaterMinimumSize(indexName, ByteSizeValue.of(2, ByteSizeUnit.KB).getBytes());
502+
logger.info("---> nodeForRemovalId: " + nodeForRemovalId + ", numShards: " + numShards);
501503

502504
// Start the snapshot with blocking in place on the data node not to allow shard snapshots to finish yet.
503505
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
@@ -507,7 +509,21 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
507509

508510
waitForBlock(otherNode, repoName);
509511

510-
logger.info("---> nodeForRemovalId: " + nodeForRemovalId + ", numShards: " + numShards);
512+
// Block on the master when a shard snapshot request comes in, until we can verify that the Tracker saw the outgoing request.
513+
final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1);
514+
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
515+
masterTransportService.addRequestHandlingBehavior(
516+
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
517+
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
518+
safeAwait(snapshotStatusUpdateLatch);
519+
try {
520+
handler.messageReceived(request, channel, task);
521+
} catch (Exception e) {
522+
fail(e);
523+
}
524+
})
525+
);
526+
511527
mockLog.addExpectation(
512528
new MockLog.SeenEventExpectation(
513529
"SnapshotShutdownProgressTracker start log message",
@@ -555,21 +571,6 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
555571
mockLog.awaitAllExpectationsMatched();
556572
resetMockLog();
557573

558-
// Block on the master when a shard snapshot request comes in, until we can verify that the Tracker saw the outgoing request.
559-
final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1);
560-
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
561-
masterTransportService.addRequestHandlingBehavior(
562-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
563-
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
564-
safeAwait(snapshotStatusUpdateLatch);
565-
try {
566-
handler.messageReceived(request, channel, task);
567-
} catch (Exception e) {
568-
fail(e);
569-
}
570-
})
571-
);
572-
573574
mockLog.addExpectation(
574575
new MockLog.SeenEventExpectation(
575576
"SnapshotShutdownProgressTracker shard snapshot has paused log message",

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,20 @@ public void testEnrichCoordinatorThenEnrichRemote() {
423423
);
424424
}
425425

426+
public void testEnrichAfterMvExpandLimit() {
427+
String query = String.format(Locale.ROOT, """
428+
FROM *:events,events
429+
| SORT timestamp
430+
| LIMIT 2
431+
| eval ip= TO_STR(host)
432+
| MV_EXPAND host
433+
| WHERE ip != ""
434+
| %s
435+
""", enrichHosts(Enrich.Mode.REMOTE));
436+
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
437+
assertThat(error.getMessage(), containsString("MV_EXPAND after LIMIT is incompatible with remote ENRICH"));
438+
}
439+
426440
private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
427441
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
428442
assertTrue(executionInfo.isCrossClusterSearch());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.transport.RemoteClusterAware;
1919
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2020
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
21+
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
2122
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
2223
import org.elasticsearch.xpack.esql.common.Failures;
2324
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
@@ -51,7 +52,13 @@
5152
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
5253
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
5354

54-
public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
55+
public class Enrich extends UnaryPlan
56+
implements
57+
GeneratingPlan<Enrich>,
58+
PostAnalysisPlanVerificationAware,
59+
PostAnalysisVerificationAware,
60+
TelemetryAware,
61+
SortAgnostic {
5562
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5663
LogicalPlan.class,
5764
"Enrich",
@@ -326,4 +333,34 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
326333

327334
badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
328335
}
336+
337+
/**
338+
* Remote ENRICH (and any remote operation in fact) is not compatible with MV_EXPAND + LIMIT. Consider:
339+
* `FROM *:events | SORT @timestamp | LIMIT 2 | MV_EXPAND ip | ENRICH _remote:clientip_policy ON ip`
340+
* Semantically, this must take two top events and then expand them. However, this can not be executed remotely,
341+
* because this means that we have to take top 2 events on each node, then expand them, then apply Enrich,
342+
* then bring them to the coordinator - but then we can not select top 2 of them - because that would be pre-expand!
343+
* We do not know which expanded rows are coming from the true top rows and which are coming from "false" top rows
344+
* which should have been thrown out. This is only possible to execute if MV_EXPAND executes on the coordinator
345+
* - which contradicts remote Enrich.
346+
* This could be fixed by the optimizer by moving MV_EXPAND past ENRICH, at least in some cases, but currently we do not do that.
347+
*/
348+
private void checkMvExpandAfterLimit(Failures failures) {
349+
this.forEachDown(MvExpand.class, u -> {
350+
u.forEachDown(p -> {
351+
if (p instanceof Limit || p instanceof TopN) {
352+
failures.add(fail(this, "MV_EXPAND after LIMIT is incompatible with remote ENRICH"));
353+
}
354+
});
355+
});
356+
357+
}
358+
359+
@Override
360+
public void postAnalysisVerification(Failures failures) {
361+
if (this.mode == Mode.REMOTE) {
362+
checkMvExpandAfterLimit(failures);
363+
}
364+
365+
}
329366
}

0 commit comments

Comments
 (0)