diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/capabilities/NodeCapability.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/capabilities/NodeCapability.java index 7d70e83f6558c..6ff5d347ab058 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/capabilities/NodeCapability.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/capabilities/NodeCapability.java @@ -41,4 +41,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(supported); } + + @Override + public String toString() { + return "NodeCapability{supported=" + supported + '}'; + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java index 863f89827207e..87b33b3b0893d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java @@ -246,4 +246,9 @@ public OrdinalBytesRefBlock expand() { public long ramBytesUsed() { return ordinals.ramBytesUsed() + bytes.ramBytesUsed(); } + + @Override + public String toString() { + return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]"; + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java index 06b890603e489..df522e931ca07 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java @@ -29,15 +29,18 @@ import java.util.concurrent.atomic.LongAdder; /** - * {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}. - * This operator acts as a client and operates on a per-page basis to reduce communication overhead. + * {@link AsyncOperator} performs an external computation specified in + * {@link #performAsync(Page, ActionListener)}. This operator acts as a client + * to reduce communication overhead and fetches a {@code Fetched} at a time. + * It's the responsibility of subclasses to transform that {@code Fetched} into + * output. * @see #performAsync(Page, ActionListener) */ -public abstract class AsyncOperator implements Operator { +public abstract class AsyncOperator implements Operator { private volatile SubscribableListener blockedFuture; - private final Map buffers = ConcurrentCollections.newConcurrentMap(); + private final Map buffers = ConcurrentCollections.newConcurrentMap(); private final FailureCollector failureCollector = new FailureCollector(); private final DriverContext driverContext; @@ -84,7 +87,7 @@ public void addInput(Page input) { driverContext.addAsyncAction(); boolean success = false; try { - final ActionListener listener = ActionListener.wrap(output -> { + final ActionListener listener = ActionListener.wrap(output -> { buffers.put(seqNo, output); onSeqNoCompleted(seqNo); }, e -> { @@ -105,18 +108,20 @@ public void addInput(Page input) { } } - private void releasePageOnAnyThread(Page page) { + protected static void releasePageOnAnyThread(Page page) { page.allowPassingToDifferentDriver(); page.releaseBlocks(); } + protected abstract void releaseFetchedOnAnyThread(Fetched result); + /** * Performs an external computation and notify the listener when the result is ready. * * @param inputPage the input page * @param listener the listener */ - protected abstract void performAsync(Page inputPage, ActionListener listener); + protected abstract void performAsync(Page inputPage, ActionListener listener); protected abstract void doClose(); @@ -126,7 +131,7 @@ private void onSeqNoCompleted(long seqNo) { notifyIfBlocked(); } if (closed || failureCollector.hasFailure()) { - discardPages(); + discardResults(); } } @@ -146,18 +151,18 @@ private void notifyIfBlocked() { private void checkFailure() { Exception e = failureCollector.getFailure(); if (e != null) { - discardPages(); + discardResults(); throw ExceptionsHelper.convertToRuntime(e); } } - private void discardPages() { + private void discardResults() { long nextCheckpoint; while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) { - Page page = buffers.remove(nextCheckpoint); + Fetched result = buffers.remove(nextCheckpoint); checkpoint.markSeqNoAsPersisted(nextCheckpoint); - if (page != null) { - releasePageOnAnyThread(page); + if (result != null) { + releaseFetchedOnAnyThread(result); } } } @@ -166,7 +171,7 @@ private void discardPages() { public final void close() { finish(); closed = true; - discardPages(); + discardResults(); doClose(); } @@ -185,15 +190,18 @@ public boolean isFinished() { } } - @Override - public Page getOutput() { + /** + * Get a {@link Fetched} from the buffer. + * @return a result if one is ready or {@code null} if none are available. + */ + public final Fetched fetchFromBuffer() { checkFailure(); long persistedCheckpoint = checkpoint.getPersistedCheckpoint(); if (persistedCheckpoint < checkpoint.getProcessedCheckpoint()) { persistedCheckpoint++; - Page page = buffers.remove(persistedCheckpoint); + Fetched result = buffers.remove(persistedCheckpoint); checkpoint.markSeqNoAsPersisted(persistedCheckpoint); - return page; + return result; } else { return null; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java index f9895ff346b5c..2e2a0d383e6b4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java @@ -128,7 +128,7 @@ * | l99 | null | null | * } */ -class RightChunkedLeftJoin implements Releasable { +public class RightChunkedLeftJoin implements Releasable { private final Page leftHand; private final int mergedElementCount; /** @@ -138,12 +138,12 @@ class RightChunkedLeftJoin implements Releasable { */ private int next = 0; - RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) { + public RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) { this.leftHand = leftHand; this.mergedElementCount = mergedElementCounts; } - Page join(Page rightHand) { + public Page join(Page rightHand) { IntVector positions = rightHand.getBlock(0).asVector(); if (positions.getInt(0) < next - 1) { throw new IllegalArgumentException("maximum overlap is one position"); @@ -209,7 +209,7 @@ Page join(Page rightHand) { } } - Optional noMoreRightHandPages() { + public Optional noMoreRightHandPages() { if (next == leftHand.getPositionCount()) { return Optional.empty(); } @@ -237,6 +237,14 @@ Optional noMoreRightHandPages() { } } + /** + * Release this on any thread, rather than just the thread that built it. + */ + public void releaseOnAnyThread() { + leftHand.allowPassingToDifferentDriver(); + leftHand.releaseBlocks(); + } + @Override public void close() { Releasables.close(leftHand::releaseBlocks); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java index fbcf11cd948c0..38f25244cd917 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java @@ -110,7 +110,7 @@ protected Page createPage(int positionOffset, int length) { } }; int maxConcurrentRequests = randomIntBetween(1, 10); - AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) { + AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) { final LookupService lookupService = new LookupService(threadPool, globalBlockFactory, dict, maxConcurrentRequests); @Override @@ -118,6 +118,16 @@ protected void performAsync(Page inputPage, ActionListener listener) { lookupService.lookupAsync(inputPage, listener); } + @Override + public Page getOutput() { + return fetchFromBuffer(); + } + + @Override + protected void releaseFetchedOnAnyThread(Page page) { + releasePageOnAnyThread(page); + } + @Override public void doClose() { @@ -159,7 +169,7 @@ public void doClose() { Releasables.close(localBreaker); } - class TestOp extends AsyncOperator { + class TestOp extends AsyncOperator { Map> handlers = new HashMap<>(); TestOp(DriverContext driverContext, int maxOutstandingRequests) { @@ -171,6 +181,16 @@ protected void performAsync(Page inputPage, ActionListener listener) { handlers.put(inputPage, listener); } + @Override + public Page getOutput() { + return fetchFromBuffer(); + } + + @Override + protected void releaseFetchedOnAnyThread(Page page) { + releasePageOnAnyThread(page); + } + @Override protected void doClose() { @@ -233,7 +253,7 @@ public void testFailure() throws Exception { ); int maxConcurrentRequests = randomIntBetween(1, 10); AtomicBoolean failed = new AtomicBoolean(); - AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) { + AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) { @Override protected void performAsync(Page inputPage, ActionListener listener) { ActionRunnable command = new ActionRunnable<>(listener) { @@ -256,6 +276,16 @@ protected void doRun() { } } + @Override + public Page getOutput() { + return fetchFromBuffer(); + } + + @Override + protected void releaseFetchedOnAnyThread(Page page) { + releasePageOnAnyThread(page); + } + @Override protected void doClose() { @@ -285,7 +315,7 @@ public void testIsFinished() { for (int i = 0; i < iters; i++) { DriverContext driverContext = new DriverContext(blockFactory.bigArrays(), blockFactory); CyclicBarrier barrier = new CyclicBarrier(2); - AsyncOperator asyncOperator = new AsyncOperator(driverContext, between(1, 10)) { + AsyncOperator asyncOperator = new AsyncOperator(driverContext, between(1, 10)) { @Override protected void performAsync(Page inputPage, ActionListener listener) { ActionRunnable command = new ActionRunnable<>(listener) { @@ -302,6 +332,16 @@ protected void doRun() { threadPool.executor(ESQL_TEST_EXECUTOR).execute(command); } + @Override + public Page getOutput() { + return fetchFromBuffer(); + } + + @Override + protected void releaseFetchedOnAnyThread(Page page) { + releasePageOnAnyThread(page); + } + @Override protected void doClose() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index 4cc08c22184b4..88028bae368ec 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -325,7 +325,7 @@ private static Page randomPage() { return new Page(block.block()); } - static class SwitchContextOperator extends AsyncOperator { + static class SwitchContextOperator extends AsyncOperator { private final ThreadPool threadPool; SwitchContextOperator(DriverContext driverContext, ThreadPool threadPool) { @@ -348,6 +348,16 @@ protected void performAsync(Page page, ActionListener listener) { }), TimeValue.timeValueNanos(between(1, 1_000_000)), threadPool.executor("esql")); } + @Override + public Page getOutput() { + return fetchFromBuffer(); + } + + @Override + protected void releaseFetchedOnAnyThread(Page page) { + releasePageOnAnyThread(page); + } + @Override protected void doClose() { diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java index ce4aa8582929b..98e5799c8d3f2 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java @@ -548,7 +548,7 @@ record Listen(long timestamp, String songId, double duration) { public void testLookupJoinIndexAllowed() throws Exception { assumeTrue( "Requires LOOKUP JOIN capability", - EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName())) + EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName())) ); Response resp = runESQLCommand( @@ -587,7 +587,7 @@ public void testLookupJoinIndexAllowed() throws Exception { public void testLookupJoinIndexForbidden() throws Exception { assumeTrue( "Requires LOOKUP JOIN capability", - EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName())) + EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName())) ); var resp = expectThrows( diff --git a/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java b/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java index f5d8f472fd59f..3b5377c2768fb 100644 --- a/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java @@ -20,7 +20,7 @@ import java.util.List; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V10; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V11; public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase { @ClassRule @@ -82,7 +82,7 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName())); + return hasCapabilities(List.of(JOIN_LOOKUP_V11.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 987a5334f903c..f8b921f239923 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -48,7 +48,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V10; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V11; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST; import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC; @@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); - assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V10.capabilityName())); + assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V11.capabilityName())); } private TestFeatureService remoteFeaturesService() throws IOException { diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index ac5a3d4be27f3..5e0aeb5b3535d 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -227,7 +227,7 @@ public void testIndicesDontExist() throws IOException { assertThat(e.getMessage(), containsString("index_not_found_exception")); assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]"))); - if (EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()) { + if (EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()) { e = expectThrows( ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("test1") + " | LOOKUP JOIN foo ON id1")) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 95119cae95590..0bad68cd9b287 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -8,7 +8,7 @@ ############################################### basicOnTheDataNode -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = languages @@ -25,7 +25,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; basicRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW language_code = 1 | LOOKUP JOIN languages_lookup ON language_code @@ -36,7 +36,7 @@ language_code:integer | language_name:keyword ; basicOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -53,7 +53,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; subsequentEvalOnTheDataNode -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = languages @@ -71,7 +71,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x ; subsequentEvalOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -89,7 +89,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x ; sortEvalBeforeLookup -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -106,7 +106,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; nonUniqueLeftKeyOnTheDataNode -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | WHERE emp_no <= 10030 @@ -130,60 +130,69 @@ emp_no:integer | language_code:integer | language_name:keyword ; nonUniqueRightKeyOnTheDataNode -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = emp_no % 10 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code | WHERE emp_no > 10090 AND emp_no < 10096 -| SORT emp_no -| EVAL country = MV_SORT(country) +| SORT emp_no, country | KEEP emp_no, language_code, language_name, country ; -emp_no:integer | language_code:integer | language_name:keyword | country:keyword -10091 | 1 | [English, English, English] | [Canada, United Kingdom, United States of America] -10092 | 2 | [German, German, German] | [Austria, Germany, Switzerland] -10093 | 3 | null | null -10094 | 4 | Quenya | null -10095 | 5 | null | Atlantis +emp_no:integer | language_code:integer | language_name:keyword | country:text + 10091 | 1 | English | Canada + 10091 | 1 | null | United Kingdom + 10091 | 1 | English | United States of America + 10091 | 1 | English | null + 10092 | 2 | German | [Germany, Austria] + 10092 | 2 | German | Switzerland + 10092 | 2 | German | null + 10093 | 3 | null | null + 10094 | 4 | Quenya | null + 10095 | 5 | null | Atlantis ; nonUniqueRightKeyOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no | LIMIT 5 | EVAL language_code = emp_no % 10 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code -| EVAL country = MV_SORT(country) | KEEP emp_no, language_code, language_name, country ; -emp_no:integer | language_code:integer | language_name:keyword | country:keyword -10001 | 1 | [English, English, English] | [Canada, United Kingdom, United States of America] -10002 | 2 | [German, German, German] | [Austria, Germany, Switzerland] -10003 | 3 | null | null -10004 | 4 | Quenya | null -10005 | 5 | null | Atlantis +emp_no:integer | language_code:integer | language_name:keyword | country:text +10001 | 1 | English | Canada +10001 | 1 | English | null +10001 | 1 | null | United Kingdom +10001 | 1 | English | United States of America +10002 | 2 | German | [Germany, Austria] +10002 | 2 | German | Switzerland +10002 | 2 | German | null +10003 | 3 | null | null +10004 | 4 | Quenya | null +10005 | 5 | null | Atlantis ; nonUniqueRightKeyFromRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW language_code = 2 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code | DROP country.keyword -| EVAL country = MV_SORT(country) ; -language_code:integer | language_name:keyword | country:keyword -2 | [German, German, German] | [Austria, Germany, Switzerland] +language_code:integer | country:text | language_name:keyword +2 | [Germany, Austria] | German +2 | Switzerland | German +2 | null | German ; repeatedIndexOnFrom -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM languages_lookup | LOOKUP JOIN languages_lookup ON language_code @@ -201,7 +210,7 @@ dropAllLookedUpFieldsOnTheDataNode-Ignore // Depends on // https://github.com/elastic/elasticsearch/issues/118778 // https://github.com/elastic/elasticsearch/issues/118781 -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = emp_no % 10 @@ -222,7 +231,7 @@ dropAllLookedUpFieldsOnTheCoordinator-Ignore // Depends on // https://github.com/elastic/elasticsearch/issues/118778 // https://github.com/elastic/elasticsearch/issues/118781 -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -247,7 +256,7 @@ emp_no:integer ############################################### filterOnLeftSide -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = languages @@ -264,7 +273,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnRightSide -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -280,7 +289,7 @@ FROM sample_data ; filterOnRightSideAfterStats -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -293,7 +302,7 @@ count:long | type:keyword ; filterOnJoinKey -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = languages @@ -308,7 +317,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnJoinKeyAndRightSide -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | WHERE emp_no < 10006 @@ -325,7 +334,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnRightSideOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -341,7 +350,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnJoinKeyOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -357,7 +366,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnJoinKeyAndRightSideOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | SORT emp_no @@ -374,7 +383,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnTheDataNodeThenFilterOnTheCoordinator -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | EVAL language_code = languages @@ -395,31 +404,35 @@ emp_no:integer | language_code:integer | language_name:keyword ########################################################################### nullJoinKeyOnTheDataNode -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | WHERE emp_no < 10004 | EVAL language_code = emp_no % 10, language_code = CASE(language_code == 3, null, language_code) | LOOKUP JOIN languages_lookup_non_unique_key ON language_code -| SORT emp_no +| SORT emp_no, language_code, language_name | KEEP emp_no, language_code, language_name ; emp_no:integer | language_code:integer | language_name:keyword -10001 | 1 | [English, English, English] -10002 | 2 | [German, German, German] +10001 | 1 | English +10001 | 1 | English +10001 | 1 | English +10001 | 1 | null +10002 | 2 | German +10002 | 2 | German +10002 | 2 | German 10003 | null | null ; mvJoinKeyOnTheDataNode -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | WHERE 10003 < emp_no AND emp_no < 10008 | EVAL language_code = emp_no % 10 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code -| SORT emp_no -| EVAL language_name = MV_SORT(language_name) +| SORT emp_no, language_name | KEEP emp_no, language_code, language_name ; @@ -427,38 +440,42 @@ emp_no:integer | language_code:integer | language_name:keyword 10004 | 4 | Quenya 10005 | 5 | null 10006 | 6 | Mv-Lang -10007 | 7 | [Mv-Lang, Mv-Lang2] +10007 | 7 | Mv-Lang +10007 | 7 | Mv-Lang2 ; mvJoinKeyFromRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW language_code = [4, 5, 6, 7] | LOOKUP JOIN languages_lookup_non_unique_key ON language_code -| EVAL language_name = MV_SORT(language_name), country = MV_SORT(country) | KEEP language_code, language_name, country ; -language_code:integer | language_name:keyword | country:keyword -[4, 5, 6, 7] | [Mv-Lang, Mv-Lang2, Quenya] | [Atlantis, Mv-Land, Mv-Land2] +language_code:integer | language_name:keyword | country:text +[4, 5, 6, 7] | Quenya | null +[4, 5, 6, 7] | null | Atlantis +[4, 5, 6, 7] | Mv-Lang | Mv-Land +[4, 5, 6, 7] | Mv-Lang2 | Mv-Land2 ; mvJoinKeyFromRowExpanded -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW language_code = [4, 5, 6, 7, 8] | MV_EXPAND language_code | LOOKUP JOIN languages_lookup_non_unique_key ON language_code -| EVAL language_name = MV_SORT(language_name), country = MV_SORT(country) | KEEP language_code, language_name, country +| SORT language_code, language_name, country ; -language_code:integer | language_name:keyword | country:keyword -4 | Quenya | null -5 | null | Atlantis -6 | Mv-Lang | Mv-Land -7 | [Mv-Lang, Mv-Lang2] | [Mv-Land, Mv-Land2] -8 | Mv-Lang2 | Mv-Land2 +language_code:integer | language_name:keyword | country:text +4 | Quenya | null +5 | null | Atlantis +6 | Mv-Lang | Mv-Land +7 | Mv-Lang | Mv-Land +7 | Mv-Lang2 | Mv-Land2 +8 | Mv-Lang2 | Mv-Land2 ; ########################################################################### @@ -466,7 +483,7 @@ language_code:integer | language_name:keyword | country:keyword ########################################################################### joinOnNestedField -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM employees | WHERE 10000 < emp_no AND emp_no < 10006 @@ -486,7 +503,7 @@ emp_no:integer | language.id:integer | language.name:text joinOnNestedFieldRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW language.code = "EN" | LOOKUP JOIN languages_nested_fields ON language.code @@ -499,7 +516,7 @@ language.id:integer | language.code:keyword | language.name.keyword:keyword joinOnNestedNestedFieldRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW language.name.keyword = "English" | LOOKUP JOIN languages_nested_fields ON language.name.keyword @@ -515,7 +532,7 @@ language.id:integer | language.name:text | language.name.keyword:keyword ############################################### lookupIPFromRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -526,7 +543,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromKeepRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", right = "right" | KEEP left, client_ip, right @@ -538,7 +555,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowing -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -549,7 +566,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowingKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -562,7 +579,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowingKeepReordered -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -575,7 +592,7 @@ right | Development | 172.21.0.5 ; lookupIPFromIndex -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -594,7 +611,7 @@ ignoreOrder:true ; lookupIPFromIndexKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -614,7 +631,7 @@ ignoreOrder:true ; lookupIPFromIndexKeepKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | KEEP client_ip, event_duration, @timestamp, message @@ -636,7 +653,7 @@ timestamp:date | client_ip:keyword | event_duration:long | msg:keyword ; lookupIPFromIndexStats -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -652,7 +669,7 @@ count:long | env:keyword ; lookupIPFromIndexStatsKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -669,7 +686,7 @@ count:long | env:keyword ; statsAndLookupIPFromIndex -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -690,7 +707,7 @@ count:long | client_ip:keyword | env:keyword ############################################### lookupMessageFromRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -701,7 +718,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromKeepRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", message = "Connected to 10.1.0.1", right = "right" | KEEP left, message, right @@ -713,7 +730,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromRowWithShadowing -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -724,7 +741,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromRowWithShadowingKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -736,7 +753,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromIndex -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -754,7 +771,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -773,7 +790,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeepKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | KEEP client_ip, event_duration, @timestamp, message @@ -793,7 +810,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeepReordered -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -812,7 +829,7 @@ Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 ; lookupMessageFromIndexStats -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -827,7 +844,7 @@ count:long | type:keyword ; lookupMessageFromIndexStatsKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -843,7 +860,7 @@ count:long | type:keyword ; statsAndLookupMessageFromIndex -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | STATS count = count(message) BY message @@ -861,7 +878,7 @@ count:long | type:keyword | message:keyword ; lookupMessageFromIndexTwice -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -883,7 +900,7 @@ ignoreOrder:true ; lookupMessageFromIndexTwiceKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -906,7 +923,7 @@ ignoreOrder:true ; lookupMessageFromIndexTwiceFullyShadowing -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -930,7 +947,7 @@ ignoreOrder:true ############################################### lookupIPAndMessageFromRow -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -942,7 +959,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowKeepBefore -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | KEEP left, client_ip, message, right @@ -955,7 +972,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowKeepBetween -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -968,7 +985,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowKeepAfter -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -981,7 +998,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowing -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", type = "type", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -993,7 +1010,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1007,7 +1024,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeepKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1022,7 +1039,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeepKeepKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1038,7 +1055,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeepReordered -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1052,7 +1069,7 @@ right | Development | Success | 172.21.0.5 ; lookupIPAndMessageFromIndex -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1072,7 +1089,7 @@ ignoreOrder:true ; lookupIPAndMessageFromIndexKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1093,7 +1110,7 @@ ignoreOrder:true ; lookupIPAndMessageFromIndexStats -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1111,7 +1128,7 @@ count:long | env:keyword | type:keyword ; lookupIPAndMessageFromIndexStatsKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1130,7 +1147,7 @@ count:long | env:keyword | type:keyword ; statsAndLookupIPAndMessageFromIndex -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1149,7 +1166,7 @@ count:long | client_ip:keyword | message:keyword | env:keyword | type:keyw ; lookupIPAndMessageFromIndexChainedEvalKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1171,7 +1188,7 @@ ignoreOrder:true ; lookupIPAndMessageFromIndexChainedRenameKeep -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1193,7 +1210,7 @@ ignoreOrder:true ; lookupIndexInFromRepeatedRowBug -required_capability: join_lookup_v10 +required_capability: join_lookup_v11 FROM languages_lookup_non_unique_key | WHERE language_code == 1 | LOOKUP JOIN languages_lookup ON language_code diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index f31eabea9d616..060a50684b39f 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -80,9 +81,8 @@ public void testLookupIndex() throws IOException { /** * Tests when multiple results match. */ - @AwaitsFix(bugUrl = "fixing real soon now") public void testLookupIndexMultiResults() throws IOException { - runLookup(new UsingSingleLookupTable(new Object[] { "aa", new String[] { "bb", "ff" }, "cc", "dd" })); + runLookup(new UsingSingleLookupTable(new String[] { "aa", "bb", "bb", "dd" })); } interface PopulateIndices { @@ -90,24 +90,24 @@ interface PopulateIndices { } class UsingSingleLookupTable implements PopulateIndices { - private final Object[] lookupData; + private final Map> matches = new HashMap<>(); + private final String[] lookupData; - UsingSingleLookupTable(Object[] lookupData) { + UsingSingleLookupTable(String[] lookupData) { this.lookupData = lookupData; + for (int i = 0; i < lookupData.length; i++) { + matches.computeIfAbsent(lookupData[i], k -> new ArrayList<>()).add(i); + } } @Override - public void populate(int docCount, List expected) throws IOException { + public void populate(int docCount, List expected) { List docs = new ArrayList<>(); for (int i = 0; i < docCount; i++) { - docs.add(client().prepareIndex("source").setSource(Map.of("data", lookupData[i % lookupData.length]))); - Object d = lookupData[i % lookupData.length]; - if (d instanceof String s) { - expected.add(s + ":" + (i % lookupData.length)); - } else if (d instanceof String[] ss) { - for (String s : ss) { - expected.add(s + ":" + (i % lookupData.length)); - } + String data = lookupData[i % lookupData.length]; + docs.add(client().prepareIndex("source").setSource(Map.of("data", data))); + for (Integer match : matches.get(data)) { + expected.add(data + ":" + match); } } for (int i = 0; i < lookupData.length; i++) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 1acc72d4ea33e..669b4fb1d10b2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -676,7 +676,7 @@ public enum Cap { /** * LOOKUP JOIN */ - JOIN_LOOKUP_V10(Build.current().isSnapshot()), + JOIN_LOOKUP_V11(Build.current().isSnapshot()), /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 74c66c0d1b338..a486d574ddd84 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -22,13 +22,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntVector; @@ -41,6 +39,7 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OutputOperator; +import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator; import org.elasticsearch.compute.operator.lookup.MergePositionsOperator; @@ -87,19 +86,19 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; /** - * {@link AbstractLookupService} performs a single valued {@code LEFT JOIN} for a - * given input page against another index. This is quite similar to a nested loop - * join. It is restricted to indices with only a single shard. + * {@link AbstractLookupService} performs a {@code LEFT JOIN} for a given input + * page against another index that must have only a single + * shard. *

* This registers a {@link TransportRequestHandler} so we can handle requests * to join data that isn't local to the node, but it is much faster if the @@ -107,7 +106,7 @@ *

*

* The join process spawns a {@link Driver} per incoming page which runs in - * three stages: + * two or three stages: *

*

* Stage 1: Finding matching document IDs for the input page. This stage is done @@ -120,9 +119,9 @@ * {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}. *

*

- * Stage 3: Combining the extracted values based on positions and filling nulls for - * positions without matches. This is done by {@link MergePositionsOperator}. The output - * page is represented as {@code [Block: field1, Block: field2,...]}. + * Stage 3: Optionally this combines the extracted values based on positions and filling + * nulls for positions without matches. This is done by {@link MergePositionsOperator}. + * The output page is represented as {@code [Block: field1, Block: field2,...]}. *

*

* The {@link Page#getPositionCount()} of the output {@link Page} is equal to the @@ -139,6 +138,15 @@ abstract class AbstractLookupService readRequest ) { this.actionName = actionName; @@ -157,6 +166,7 @@ abstract class AbstractLookupService resultPages, BlockFactory blockFactory) throws IOException; + + /** + * Read the response from a {@link StreamInput}. + */ + protected abstract LookupResponse readLookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException; + protected static QueryList termQueryList( MappedFieldType field, SearchExecutionContext searchExecutionContext, @@ -196,9 +216,9 @@ protected static QueryList termQueryList( /** * Perform the actual lookup. */ - public final void lookupAsync(R request, CancellableTask parentTask, ActionListener outListener) { + public final void lookupAsync(R request, CancellableTask parentTask, ActionListener> outListener) { ThreadContext threadContext = transportService.getThreadPool().getThreadContext(); - ActionListener listener = ContextPreservingActionListener.wrapPreservingContext(outListener, threadContext); + ActionListener> listener = ContextPreservingActionListener.wrapPreservingContext(outListener, threadContext); hasPrivilege(listener.delegateFailureAndWrap((delegate, ignored) -> { ClusterState clusterState = clusterService.state(); GroupShardsIterator shardIterators = clusterService.operationRouting() @@ -225,8 +245,8 @@ public final void lookupAsync(R request, CancellableTask parentTask, ActionListe parentTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - delegate.map(LookupResponse::takePage), - in -> new LookupResponse(in, blockFactory), + delegate.map(LookupResponse::takePages), + in -> readLookupResponse(in, blockFactory), executor ) ); @@ -291,10 +311,13 @@ private void hasPrivilege(ActionListener outListener) { ); } - private void doLookup(T request, CancellableTask task, ActionListener listener) { + private void doLookup(T request, CancellableTask task, ActionListener> listener) { Block inputBlock = request.inputPage.getBlock(0); if (inputBlock.areAllValuesNull()) { - listener.onResponse(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)); + List nullResponse = mergePages + ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)) + : List.of(); + listener.onResponse(nullResponse); return; } final List releasables = new ArrayList<>(6); @@ -315,31 +338,31 @@ private void doLookup(T request, CancellableTask task, ActionListener list mergingTypes[i] = PlannerUtils.toElementType(request.extractFields.get(i).dataType()); } final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray(); - final MergePositionsOperator mergePositionsOperator; + final Operator finishPages; final OrdinalBytesRefBlock ordinalsBytesRefBlock; - if (inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) { + if (mergePages // TODO fix this optimization for Lookup. + && inputBlock instanceof BytesRefBlock bytesRefBlock + && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) { + inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock(); var selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock(); - mergePositionsOperator = new MergePositionsOperator( - 1, - mergingChannels, - mergingTypes, - selectedPositions, - driverContext.blockFactory() - ); - + finishPages = new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory()); } else { - try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) { - mergePositionsOperator = new MergePositionsOperator( - 1, - mergingChannels, - mergingTypes, - selectedPositions, - driverContext.blockFactory() - ); + if (mergePages) { + try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) { + finishPages = new MergePositionsOperator( + 1, + mergingChannels, + mergingTypes, + selectedPositions, + driverContext.blockFactory() + ); + } + } else { + finishPages = dropDocBlockOperator(request.extractFields); } } - releasables.add(mergePositionsOperator); + releasables.add(finishPages); SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext(); QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType); var warnings = Warnings.createWarnings( @@ -359,8 +382,15 @@ private void doLookup(T request, CancellableTask task, ActionListener list var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields); releasables.add(extractFieldsOperator); - AtomicReference result = new AtomicReference<>(); - OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set); + /* + * Collect all result Pages in a synchronizedList mostly out of paranoia. We'll + * be collecting these results in the Driver thread and reading them in its + * completion listener which absolutely happens-after the insertions. So, + * technically, we don't need synchronization here. But we're doing it anyway + * because the list will never grow mega large. + */ + List collectedPages = Collections.synchronizedList(new ArrayList<>()); + OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), collectedPages::add); releasables.add(outputOperator); Driver driver = new Driver( "enrich-lookup:" + request.sessionId, @@ -369,7 +399,7 @@ private void doLookup(T request, CancellableTask task, ActionListener list driverContext, request::toString, queryOperator, - List.of(extractFieldsOperator, mergePositionsOperator), + List.of(extractFieldsOperator, finishPages), outputOperator, Driver.DEFAULT_STATUS_INTERVAL, Releasables.wrap(searchContext, localBreaker) @@ -380,9 +410,9 @@ private void doLookup(T request, CancellableTask task, ActionListener list }); var threadContext = transportService.getThreadPool().getThreadContext(); Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> { - Page out = result.get(); - if (out == null) { - out = createNullResponse(request.inputPage.getPositionCount(), request.extractFields); + List out = collectedPages; + if (mergePages && out.isEmpty()) { + out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)); } return out; })); @@ -434,6 +464,18 @@ private static Operator extractFieldsOperator( ); } + /** + * Drop just the first block, keeping the remaining. + */ + private Operator dropDocBlockOperator(List extractFields) { + int end = extractFields.size() + 1; + List projection = new ArrayList<>(end); + for (int i = 1; i <= end; i++) { + projection.add(i); + } + return new ProjectOperator(projection); + } + private Page createNullResponse(int positionCount, List extractFields) { final Block[] blocks = new Block[extractFields.size()]; try { @@ -457,7 +499,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) { request, (CancellableTask) task, listener.delegateFailureAndWrap( - (l, outPage) -> ActionListener.respondAndRelease(l, new LookupResponse(outPage, blockFactory)) + (l, resultPages) -> ActionListener.respondAndRelease(l, createLookupResponse(resultPages, blockFactory)) ) ); } @@ -587,45 +629,24 @@ public final String toString() { protected abstract String extraDescription(); } - private static class LookupResponse extends TransportResponse { - private final RefCounted refs = AbstractRefCounted.of(this::releasePage); - private final BlockFactory blockFactory; - private Page page; - private long reservedBytes = 0; + abstract static class LookupResponse extends TransportResponse { + private final RefCounted refs = AbstractRefCounted.of(this::release); + protected final BlockFactory blockFactory; + protected long reservedBytes = 0; - LookupResponse(Page page, BlockFactory blockFactory) { - this.page = page; - this.blockFactory = blockFactory; - } - - LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException { - try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) { - this.page = new Page(bsi); - } + LookupResponse(BlockFactory blockFactory) { this.blockFactory = blockFactory; } - @Override - public void writeTo(StreamOutput out) throws IOException { - long bytes = page.ramBytesUsedByBlocks(); - blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response"); - reservedBytes += bytes; - page.writeTo(out); - } - - Page takePage() { - var p = page; - page = null; - return p; - } + protected abstract List takePages(); - private void releasePage() { + private void release() { blockFactory.breaker().addWithoutBreaking(-reservedBytes); - if (page != null) { - Releasables.closeExpectNoException(page::releaseBlocks); - } + innerRelease(); } + protected abstract void innerRelease(); + @Override public void incRef() { refs.incRef(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java index df608a04632a2..8083d67e5a19d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java @@ -17,6 +17,7 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.ResponseHeadersCollector; +import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; @@ -27,7 +28,7 @@ import java.util.List; import java.util.Objects; -public final class EnrichLookupOperator extends AsyncOperator { +public final class EnrichLookupOperator extends AsyncOperator { private final EnrichLookupService enrichLookupService; private final String sessionId; private final CancellableTask parentTask; @@ -128,13 +129,29 @@ protected void performAsync(Page inputPage, ActionListener listener) { enrichFields, source ); + CheckedFunction, Page, Exception> handleResponse = pages -> { + if (pages.size() != 1) { + throw new UnsupportedOperationException("ENRICH should only return a single page"); + } + return inputPage.appendPage(pages.getFirst()); + }; enrichLookupService.lookupAsync( request, parentTask, - ActionListener.runBefore(listener.map(inputPage::appendPage), responseHeadersCollector::collect) + ActionListener.runBefore(listener.map(handleResponse), responseHeadersCollector::collect) ); } + @Override + public Page getOutput() { + return fetchFromBuffer(); + } + + @Override + protected void releaseFetchedOnAnyThread(Page page) { + releasePageOnAnyThread(page); + } + @Override public String toString() { return "EnrichOperator[index=" diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 7057b586871eb..e3d962fa9231b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -17,6 +17,7 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.RangeFieldMapper; import org.elasticsearch.index.mapper.RangeType; @@ -52,7 +53,16 @@ public EnrichLookupService( BigArrays bigArrays, BlockFactory blockFactory ) { - super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom); + super( + LOOKUP_ACTION_NAME, + clusterService, + searchService, + transportService, + bigArrays, + blockFactory, + true, + TransportRequest::readFrom + ); } @Override @@ -86,6 +96,19 @@ protected String getRequiredPrivilege() { return ClusterPrivilegeResolver.MONITOR_ENRICH.name(); } + @Override + protected LookupResponse createLookupResponse(List pages, BlockFactory blockFactory) throws IOException { + if (pages.size() != 1) { + throw new UnsupportedOperationException("ENRICH always makes a single page of output"); + } + return new LookupResponse(pages.getFirst(), blockFactory); + } + + @Override + protected LookupResponse readLookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException { + return new LookupResponse(in, blockFactory); + } + private static void validateTypes(DataType inputDataType, MappedFieldType fieldType) { if (fieldType instanceof RangeFieldMapper.RangeFieldType rangeType) { // For range policy types, the ENRICH index field type will be one of a list of supported range types, @@ -210,4 +233,42 @@ protected String extraDescription() { return " ,match_type=" + matchType + " ,match_field=" + matchField; } } + + private static class LookupResponse extends AbstractLookupService.LookupResponse { + private Page page; + + private LookupResponse(Page page, BlockFactory blockFactory) { + super(blockFactory); + this.page = page; + } + + private LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException { + super(blockFactory); + try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) { + this.page = new Page(bsi); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + long bytes = page.ramBytesUsedByBlocks(); + blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response"); + reservedBytes += bytes; + page.writeTo(out); + } + + @Override + protected List takePages() { + var p = List.of(page); + page = null; + return p; + } + + @Override + protected void innerRelease() { + if (page != null) { + Releasables.closeExpectNoException(page::releaseBlocks); + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index f09f7d0e23e7b..73dfcf8d43620 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -15,7 +16,11 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.AsyncOperator; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.IsBlockedResult; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; @@ -23,11 +28,13 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Optional; // TODO rename package -public final class LookupFromIndexOperator extends AsyncOperator { +public final class LookupFromIndexOperator extends AsyncOperator { public record Factory( String sessionId, CancellableTask parentTask, @@ -81,6 +88,14 @@ public Operator get(DriverContext driverContext) { private final List loadFields; private final Source source; private long totalTerms = 0L; + /** + * Total number of pages emitted by this {@link Operator}. + */ + private long emittedPages = 0L; + /** + * The ongoing join or {@code null} none is ongoing at the moment. + */ + private OngoingJoin ongoing = null; public LookupFromIndexOperator( String sessionId, @@ -108,7 +123,7 @@ public LookupFromIndexOperator( } @Override - protected void performAsync(Page inputPage, ActionListener listener) { + protected void performAsync(Page inputPage, ActionListener listener) { final Block inputBlock = inputPage.getBlock(inputChannel); totalTerms += inputBlock.getTotalValueCount(); LookupFromIndexService.Request request = new LookupFromIndexService.Request( @@ -120,7 +135,47 @@ protected void performAsync(Page inputPage, ActionListener listener) { loadFields, source ); - lookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage)); + lookupService.lookupAsync( + request, + parentTask, + listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator())) + ); + } + + @Override + public Page getOutput() { + if (ongoing == null) { + // No ongoing join, start a new one if we can. + ongoing = fetchFromBuffer(); + if (ongoing == null) { + // Buffer empty, wait for the next time we're called. + return null; + } + } + if (ongoing.itr.hasNext()) { + // There's more to do in the ongoing join. + Page right = ongoing.itr.next(); + emittedPages++; + try { + return ongoing.join.join(right); + } finally { + right.releaseBlocks(); + } + } + // Current join is all done. Emit any trailing unmatched rows. + Optional remaining = ongoing.join.noMoreRightHandPages(); + ongoing.close(); + ongoing = null; + if (remaining.isEmpty()) { + return null; + } + emittedPages++; + return remaining.get(); + } + + @Override + protected void releaseFetchedOnAnyThread(OngoingJoin ongoingJoin) { + ongoingJoin.releaseOnAnyThread(); } @Override @@ -138,15 +193,29 @@ public String toString() { + "]"; } + @Override + public boolean isFinished() { + return ongoing == null && super.isFinished(); + } + + @Override + public IsBlockedResult isBlocked() { + if (ongoing != null) { + return NOT_BLOCKED; + } + return super.isBlocked(); + } + @Override protected void doClose() { // TODO: Maybe create a sub-task as the parent task of all the lookup tasks // then cancel it when this operator terminates early (e.g., have enough result). + Releasables.close(ongoing); } @Override protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) { - return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms); + return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages); } public static class Status extends AsyncOperator.Status { @@ -156,22 +225,29 @@ public static class Status extends AsyncOperator.Status { Status::new ); - final long totalTerms; + private final long totalTerms; + /** + * Total number of pages emitted by this {@link Operator}. + */ + private final long emittedPages; - Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) { + Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms, long emittedPages) { super(receivedPages, completedPages, totalTimeInMillis); this.totalTerms = totalTerms; + this.emittedPages = emittedPages; } Status(StreamInput in) throws IOException { super(in); this.totalTerms = in.readVLong(); + this.emittedPages = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(totalTerms); + out.writeVLong(emittedPages); } @Override @@ -179,11 +255,20 @@ public String getWriteableName() { return ENTRY.name; } + public long emittedPages() { + return emittedPages; + } + + public long totalTerms() { + return totalTerms; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - innerToXContent(builder); - builder.field("total_terms", totalTerms); + super.innerToXContent(builder); + builder.field("emitted_pages", emittedPages()); + builder.field("total_terms", totalTerms()); return builder.endObject(); } @@ -196,12 +281,26 @@ public boolean equals(Object o) { return false; } Status status = (Status) o; - return totalTerms == status.totalTerms; + return totalTerms == status.totalTerms && emittedPages == status.emittedPages; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), totalTerms); + return Objects.hash(super.hashCode(), totalTerms, emittedPages); + } + } + + protected record OngoingJoin(RightChunkedLeftJoin join, Iterator itr) implements Releasable { + @Override + public void close() { + Releasables.close(join, Releasables.wrap(() -> Iterators.map(itr, page -> page::releaseBlocks))); + } + + public void releaseOnAnyThread() { + Releasables.close( + join::releaseOnAnyThread, + Releasables.wrap(() -> Iterators.map(itr, page -> () -> releasePageOnAnyThread(page))) + ); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 0bbfc6dd0ce99..ad65394fdfbde 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -9,6 +9,7 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; @@ -17,6 +18,7 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; @@ -33,6 +35,7 @@ import java.io.IOException; import java.util.List; +import java.util.Objects; /** * {@link LookupFromIndexService} performs lookup against a Lookup index for @@ -49,7 +52,16 @@ public LookupFromIndexService( BigArrays bigArrays, BlockFactory blockFactory ) { - super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom); + super( + LOOKUP_ACTION_NAME, + clusterService, + searchService, + transportService, + bigArrays, + blockFactory, + false, + TransportRequest::readFrom + ); } @Override @@ -73,6 +85,16 @@ protected QueryList queryList(TransportRequest request, SearchExecutionContext c return termQueryList(fieldType, context, inputBlock, inputDataType); } + @Override + protected LookupResponse createLookupResponse(List pages, BlockFactory blockFactory) throws IOException { + return new LookupResponse(pages, blockFactory); + } + + @Override + protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException { + return new LookupResponse(in, blockFactory); + } + @Override protected String getRequiredPrivilege() { return null; @@ -171,4 +193,65 @@ protected String extraDescription() { return " ,match_field=" + matchField; } } + + protected static class LookupResponse extends AbstractLookupService.LookupResponse { + private List pages; + + LookupResponse(List pages, BlockFactory blockFactory) { + super(blockFactory); + this.pages = pages; + } + + LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException { + super(blockFactory); + try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) { + this.pages = bsi.readCollectionAsList(Page::new); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + long bytes = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); + blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize lookup join response"); + reservedBytes += bytes; + out.writeCollection(pages); + } + + @Override + protected List takePages() { + var p = pages; + pages = null; + return p; + } + + List pages() { + return pages; + } + + @Override + protected void innerRelease() { + if (pages != null) { + Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(pages.iterator(), page -> page::releaseBlocks))); + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupResponse that = (LookupResponse) o; + return Objects.equals(pages, that.pages); + } + + @Override + public int hashCode() { + return Objects.hashCode(pages); + } + + @Override + public String toString() { + return "LookupResponse{pages=" + pages + '}'; + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 6cbf910906aaf..ed1ee71ff1968 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -268,7 +268,7 @@ public final void test() throws Throwable { ); assumeFalse( "lookup join disabled for csv tests", - testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName()) + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName()) ); assumeFalse( "can't use TERM function in csv tests", diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 2df6e30e96081..c1b2b4fefc1a9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2140,7 +2140,7 @@ public void testLookupMatchTypeWrong() { } public void testLookupJoinUnknownIndex() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String errorMessage = "Unknown index [foobar]"; IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage); @@ -2169,7 +2169,7 @@ public void testLookupJoinUnknownIndex() { } public void testLookupJoinUnknownField() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name"; String errorMessage = "1:45: Unknown column [last_name] in right side of join"; @@ -2192,7 +2192,7 @@ public void testLookupJoinUnknownField() { } public void testMultipleLookupJoinsGiveDifferentAttributes() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); // The field attributes that get contributed by different LOOKUP JOIN commands must have different name ids, // even if they have the same names. Otherwise, things like dependency analysis - like in PruneColumns - cannot work based on @@ -2222,7 +2222,7 @@ public void testMultipleLookupJoinsGiveDifferentAttributes() { } public void testLookupJoinIndexMode() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); var indexResolution = AnalyzerTestUtils.expandedDefaultIndexResolution(); var lookupResolution = AnalyzerTestUtils.defaultLookupResolution(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java index 180e32fb7c15d..2ee6cf6136114 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java @@ -113,7 +113,7 @@ public void testTooBigQuery() { } public void testJoinOnConstant() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertEquals( "1:55: JOIN ON clause only supports fields at the moment, found [123]", error("row languages = 1, gender = \"f\" | lookup join test on 123") @@ -129,7 +129,7 @@ public void testJoinOnConstant() { } public void testJoinOnMultipleFields() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertEquals( "1:35: JOIN ON clause only supports one field at the moment, found [2]", error("row languages = 1, gender = \"f\" | lookup join test on gender, languages") @@ -137,7 +137,7 @@ public void testJoinOnMultipleFields() { } public void testJoinTwiceOnTheSameField() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertEquals( "1:35: JOIN ON clause only supports one field at the moment, found [2]", error("row languages = 1, gender = \"f\" | lookup join test on languages, languages") @@ -145,7 +145,7 @@ public void testJoinTwiceOnTheSameField() { } public void testJoinTwiceOnTheSameField_TwoLookups() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertEquals( "1:80: JOIN ON clause only supports one field at the moment, found [2]", error("row languages = 1, gender = \"f\" | lookup join test on languages | eval x = 1 | lookup join test on gender, gender") diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index d78c4bfa21ced..f932992e81557 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -1984,7 +1984,7 @@ public void testSortByAggregate() { } public void testLookupJoinDataTypeMismatch() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java new file mode 100644 index 0000000000000..a204e93b0d16a --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class LookupFromIndexOperatorStatusTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return LookupFromIndexOperator.Status::new; + } + + @Override + protected LookupFromIndexOperator.Status createTestInstance() { + return new LookupFromIndexOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomLongBetween(0, TimeValue.timeValueHours(1).millis()), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } + + @Override + protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator.Status in) throws IOException { + long receivedPages = in.receivedPages(); + long completedPages = in.completedPages(); + long totalTimeInMillis = in.totalTimeInMillis(); + long totalTerms = in.totalTerms(); + long emittedPages = in.emittedPages(); + switch (randomIntBetween(0, 4)) { + case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong); + case 1 -> completedPages = randomValueOtherThan(completedPages, ESTestCase::randomNonNegativeLong); + case 2 -> totalTimeInMillis = randomValueOtherThan(totalTimeInMillis, ESTestCase::randomNonNegativeLong); + case 3 -> totalTerms = randomValueOtherThan(totalTerms, ESTestCase::randomNonNegativeLong); + case 4 -> emittedPages = randomValueOtherThan(emittedPages, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); + } + return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages); + } + + public void testToXContent() { + var status = new LookupFromIndexOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120, 88); + String json = Strings.toString(status, true, true); + assertThat(json, equalTo(""" + { + "received_pages" : 100, + "completed_pages" : 50, + "total_time_in_millis" : 10000, + "total_time" : "10s", + "emitted_pages" : 88, + "total_terms" : 120 + }""")); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexServiceResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexServiceResponseTests.java new file mode 100644 index 0000000000000..098ea9eaa0c2d --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexServiceResponseTests.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.esql.TestBlockFactory; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class LookupFromIndexServiceResponseTests extends AbstractWireSerializingTestCase { + private final List breakers = new ArrayList<>(); + + LookupFromIndexService.LookupResponse createTestInstance(BlockFactory blockFactory) { + return new LookupFromIndexService.LookupResponse(randomList(0, 10, () -> testPage(blockFactory)), blockFactory); + } + + /** + * Build a {@link Page} to test serialization. If we had nice random + * {@linkplain Page} generation we'd use that happily, but it's off + * in the tests for compute, and we're in ESQL. And we don't + * really need a fully random one to verify serialization + * here. + */ + Page testPage(BlockFactory blockFactory) { + try (IntVector.Builder builder = blockFactory.newIntVectorFixedBuilder(3)) { + builder.appendInt(1); + builder.appendInt(2); + builder.appendInt(3); + return new Page(builder.build().asBlock()); + } + } + + @Override + protected LookupFromIndexService.LookupResponse createTestInstance() { + // Can't use a real block factory for the basic serialization tests because they don't release. + return createTestInstance(TestBlockFactory.getNonBreakingInstance()); + } + + @Override + protected Writeable.Reader instanceReader() { + return in -> new LookupFromIndexService.LookupResponse(in, TestBlockFactory.getNonBreakingInstance()); + } + + @Override + protected LookupFromIndexService.LookupResponse mutateInstance(LookupFromIndexService.LookupResponse instance) throws IOException { + assertThat(instance.blockFactory, sameInstance(TestBlockFactory.getNonBreakingInstance())); + List pages = new ArrayList<>(instance.pages().size()); + pages.addAll(instance.pages()); + pages.add(testPage(TestBlockFactory.getNonBreakingInstance())); + return new LookupFromIndexService.LookupResponse(pages, instance.blockFactory); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(List.of(IntBlock.ENTRY)); + } + + public void testWithBreaker() throws IOException { + BlockFactory origFactory = blockFactory(); + BlockFactory copyFactory = blockFactory(); + LookupFromIndexService.LookupResponse orig = createTestInstance(origFactory); + try { + LookupFromIndexService.LookupResponse copy = copyInstance( + orig, + getNamedWriteableRegistry(), + (out, v) -> v.writeTo(out), + in -> new LookupFromIndexService.LookupResponse(in, copyFactory), + TransportVersion.current() + ); + try { + assertThat(copy, equalTo(orig)); + } finally { + copy.decRef(); + } + assertThat(copyFactory.breaker().getUsed(), equalTo(0L)); + } finally { + orig.decRef(); + } + assertThat(origFactory.breaker().getUsed(), equalTo(0L)); + } + + /** + * Tests that we don't reserve any memory other than that in the {@link Page}s we + * hold, and calling {@link LookupFromIndexService.LookupResponse#takePages} + * gives us those pages. If we then close those pages, we should have 0 + * reserved memory. + */ + public void testTakePages() { + BlockFactory factory = blockFactory(); + LookupFromIndexService.LookupResponse orig = createTestInstance(factory); + try { + if (orig.pages().isEmpty()) { + assertThat(factory.breaker().getUsed(), equalTo(0L)); + return; + } + List pages = orig.takePages(); + Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(pages.iterator(), page -> page::releaseBlocks))); + assertThat(factory.breaker().getUsed(), equalTo(0L)); + assertThat(orig.takePages(), nullValue()); + } finally { + orig.decRef(); + } + assertThat(factory.breaker().getUsed(), equalTo(0L)); + } + + private BlockFactory blockFactory() { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofMb(4 /* more than we need*/)) + .withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + breakers.add(breaker); + return new BlockFactory(breaker, bigArrays); + } + + @After + public void allBreakersEmpty() throws Exception { + // first check that all big arrays are released, which can affect breakers + MockBigArrays.ensureAllArraysAreReleased(); + + for (CircuitBreaker breaker : breakers) { + assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L)); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 2aed259e7ad0b..a8f8054fbc6b1 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -4928,7 +4928,7 @@ public void testPlanSanityCheck() throws Exception { } public void testPlanSanityCheckWithBinaryPlans() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); var plan = optimizedPlan(""" FROM test @@ -6003,7 +6003,7 @@ public void testLookupStats() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String query = """ FROM test @@ -6045,7 +6045,7 @@ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownFilterOnLeftSideField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String query = """ FROM test @@ -6088,7 +6088,7 @@ public void testLookupJoinPushDownFilterOnLeftSideField() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownDisabledForLookupField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String query = """ FROM test @@ -6132,7 +6132,7 @@ public void testLookupJoinPushDownDisabledForLookupField() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] */ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String query = """ FROM test @@ -6183,7 +6183,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] */ public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); String query = """ FROM test diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 66891210a1e47..75825f4e8f480 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -2618,7 +2618,7 @@ public void testVerifierOnMissingReferences() { } public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); // Do not assert serialization: // This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator. @@ -7301,7 +7301,7 @@ public void testLookupThenTopN() { } public void testLookupJoinFieldLoading() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); TestDataSource data = dataSetWithLookupIndices(Map.of("lookup_index", List.of("first_name", "foo", "bar", "baz"))); @@ -7378,7 +7378,7 @@ public void testLookupJoinFieldLoading() throws Exception { } public void testLookupJoinFieldLoadingTwoLookups() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); TestDataSource data = dataSetWithLookupIndices( Map.of( @@ -7432,7 +7432,7 @@ public void testLookupJoinFieldLoadingTwoLookups() throws Exception { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/119082") public void testLookupJoinFieldLoadingTwoLookupsProjectInBetween() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); TestDataSource data = dataSetWithLookupIndices( Map.of( @@ -7473,7 +7473,7 @@ public void testLookupJoinFieldLoadingTwoLookupsProjectInBetween() throws Except @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/118778") public void testLookupJoinFieldLoadingDropAllFields() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); TestDataSource data = dataSetWithLookupIndices(Map.of("lookup_index", List.of("first_name", "foo", "bar", "baz"))); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java index 4db4f7925d4ff..b1c9030db7a43 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java @@ -1365,7 +1365,7 @@ public void testMetrics() { } public void testLookupJoin() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( "FROM employees | KEEP languages | RENAME languages AS language_code | LOOKUP JOIN languages_lookup ON language_code", Set.of("languages", "languages.*", "language_code", "language_code.*"), @@ -1374,7 +1374,7 @@ public void testLookupJoin() { } public void testLookupJoinKeep() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM employees @@ -1388,7 +1388,7 @@ public void testLookupJoinKeep() { } public void testLookupJoinKeepWildcard() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM employees @@ -1402,7 +1402,7 @@ public void testLookupJoinKeepWildcard() { } public void testMultiLookupJoin() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1415,7 +1415,7 @@ public void testMultiLookupJoin() { } public void testMultiLookupJoinKeepBefore() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1429,7 +1429,7 @@ public void testMultiLookupJoinKeepBefore() { } public void testMultiLookupJoinKeepBetween() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1454,7 +1454,7 @@ public void testMultiLookupJoinKeepBetween() { } public void testMultiLookupJoinKeepAfter() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1481,7 +1481,7 @@ public void testMultiLookupJoinKeepAfter() { } public void testMultiLookupJoinKeepAfterWildcard() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1495,7 +1495,7 @@ public void testMultiLookupJoinKeepAfterWildcard() { } public void testMultiLookupJoinSameIndex() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1509,7 +1509,7 @@ public void testMultiLookupJoinSameIndex() { } public void testMultiLookupJoinSameIndexKeepBefore() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1524,7 +1524,7 @@ public void testMultiLookupJoinSameIndexKeepBefore() { } public void testMultiLookupJoinSameIndexKeepBetween() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1550,7 +1550,7 @@ public void testMultiLookupJoinSameIndexKeepBetween() { } public void testMultiLookupJoinSameIndexKeepAfter() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()); assertFieldNames( """ FROM sample_data diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 1567b6b556bdd..e7cda33896149 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -6,7 +6,7 @@ setup: - method: POST path: /_query parameters: [] - capabilities: [join_lookup_v10] + capabilities: [join_lookup_v11] reason: "uses LOOKUP JOIN" - do: indices.create: