expected = new ArrayList<>(docCount);
+ populateIndices.populate(docCount, expected);
/*
* Find the data node hosting the only shard of the source index.
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
index a64ca40470b4a..3c5c69c9c355d 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
@@ -566,7 +566,7 @@ public enum Cap {
/**
* LOOKUP JOIN
*/
- JOIN_LOOKUP_V10(Build.current().isSnapshot()),
+ JOIN_LOOKUP_V11(Build.current().isSnapshot()),
/**
* Fix for https://github.com/elastic/elasticsearch/issues/117054
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
index c65e143b42173..13f0325d48d6b 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
@@ -22,13 +22,11 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
-import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntVector;
@@ -41,6 +39,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
+import org.elasticsearch.compute.operator.ProjectOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
@@ -87,19 +86,19 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
- * {@link AbstractLookupService} performs a single valued {@code LEFT JOIN} for a
- * given input page against another index. This is quite similar to a nested loop
- * join. It is restricted to indices with only a single shard.
+ * {@link AbstractLookupService} performs a {@code LEFT JOIN} for a given input
+ * page against another index that must have only a single
+ * shard.
*
* This registers a {@link TransportRequestHandler} so we can handle requests
* to join data that isn't local to the node, but it is much faster if the
@@ -107,7 +106,7 @@
*
*
* The join process spawns a {@link Driver} per incoming page which runs in
- * three stages:
+ * two or three stages:
*
*
* Stage 1: Finding matching document IDs for the input page. This stage is done
@@ -120,9 +119,9 @@
* {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}.
*
*
- * Stage 3: Combining the extracted values based on positions and filling nulls for
- * positions without matches. This is done by {@link MergePositionsOperator}. The output
- * page is represented as {@code [Block: field1, Block: field2,...]}.
+ * Stage 3: Optionally this combines the extracted values based on positions and filling
+ * nulls for positions without matches. This is done by {@link MergePositionsOperator}.
+ * The output page is represented as {@code [Block: field1, Block: field2,...]}.
*
*
* The {@link Page#getPositionCount()} of the output {@link Page} is equal to the
@@ -139,6 +138,15 @@ abstract class AbstractLookupService readRequest
) {
this.actionName = actionName;
@@ -157,6 +166,7 @@ abstract class AbstractLookupService resultPages, BlockFactory blockFactory) throws IOException;
+
+ /**
+ * Read the response from a {@link StreamInput}.
+ */
+ protected abstract LookupResponse readLookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException;
+
protected static QueryList termQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
@@ -199,9 +219,9 @@ protected static QueryList termQueryList(
/**
* Perform the actual lookup.
*/
- public final void lookupAsync(R request, CancellableTask parentTask, ActionListener outListener) {
+ public final void lookupAsync(R request, CancellableTask parentTask, ActionListener> outListener) {
ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
- ActionListener listener = ContextPreservingActionListener.wrapPreservingContext(outListener, threadContext);
+ ActionListener> listener = ContextPreservingActionListener.wrapPreservingContext(outListener, threadContext);
hasPrivilege(listener.delegateFailureAndWrap((delegate, ignored) -> {
ClusterState clusterState = clusterService.state();
GroupShardsIterator shardIterators = clusterService.operationRouting()
@@ -228,8 +248,8 @@ public final void lookupAsync(R request, CancellableTask parentTask, ActionListe
parentTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
- delegate.map(LookupResponse::takePage),
- in -> new LookupResponse(in, blockFactory),
+ delegate.map(LookupResponse::takePages),
+ in -> readLookupResponse(in, blockFactory),
executor
)
);
@@ -294,10 +314,13 @@ private void hasPrivilege(ActionListener outListener) {
);
}
- private void doLookup(T request, CancellableTask task, ActionListener listener) {
+ private void doLookup(T request, CancellableTask task, ActionListener> listener) {
Block inputBlock = request.inputPage.getBlock(0);
if (inputBlock.areAllValuesNull()) {
- listener.onResponse(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
+ List nullResponse = mergePages
+ ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
+ : List.of();
+ listener.onResponse(nullResponse);
return;
}
final List releasables = new ArrayList<>(6);
@@ -318,31 +341,31 @@ private void doLookup(T request, CancellableTask task, ActionListener list
mergingTypes[i] = PlannerUtils.toElementType(request.extractFields.get(i).dataType());
}
final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray();
- final MergePositionsOperator mergePositionsOperator;
+ final Operator finishPages;
final OrdinalBytesRefBlock ordinalsBytesRefBlock;
- if (inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
+ if (mergePages // TODO fix this optimization for Lookup.
+ && inputBlock instanceof BytesRefBlock bytesRefBlock
+ && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
+
inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock();
var selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock();
- mergePositionsOperator = new MergePositionsOperator(
- 1,
- mergingChannels,
- mergingTypes,
- selectedPositions,
- driverContext.blockFactory()
- );
-
+ finishPages = new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory());
} else {
- try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) {
- mergePositionsOperator = new MergePositionsOperator(
- 1,
- mergingChannels,
- mergingTypes,
- selectedPositions,
- driverContext.blockFactory()
- );
+ if (mergePages) {
+ try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) {
+ finishPages = new MergePositionsOperator(
+ 1,
+ mergingChannels,
+ mergingTypes,
+ selectedPositions,
+ driverContext.blockFactory()
+ );
+ }
+ } else {
+ finishPages = dropDocBlockOperator(request.extractFields);
}
}
- releasables.add(mergePositionsOperator);
+ releasables.add(finishPages);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
@@ -362,8 +385,15 @@ private void doLookup(T request, CancellableTask task, ActionListener list
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
releasables.add(extractFieldsOperator);
- AtomicReference result = new AtomicReference<>();
- OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set);
+ /*
+ * Collect all result Pages in a synchronizedList mostly out of paranoia. We'll
+ * be collecting these results in the Driver thread and reading them in its
+ * completion listener which absolutely happens-after the insertions. So,
+ * technically, we don't need synchronization here. But we're doing it anyway
+ * because the list will never grow mega large.
+ */
+ List collectedPages = Collections.synchronizedList(new ArrayList<>());
+ OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), collectedPages::add);
releasables.add(outputOperator);
Driver driver = new Driver(
"enrich-lookup:" + request.sessionId,
@@ -372,7 +402,7 @@ private void doLookup(T request, CancellableTask task, ActionListener list
driverContext,
request::toString,
queryOperator,
- List.of(extractFieldsOperator, mergePositionsOperator),
+ List.of(extractFieldsOperator, finishPages),
outputOperator,
Driver.DEFAULT_STATUS_INTERVAL,
Releasables.wrap(searchContext, localBreaker)
@@ -383,9 +413,9 @@ private void doLookup(T request, CancellableTask task, ActionListener list
});
var threadContext = transportService.getThreadPool().getThreadContext();
Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> {
- Page out = result.get();
- if (out == null) {
- out = createNullResponse(request.inputPage.getPositionCount(), request.extractFields);
+ List out = collectedPages;
+ if (mergePages && out.isEmpty()) {
+ out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
}
return out;
}));
@@ -437,6 +467,18 @@ private static Operator extractFieldsOperator(
);
}
+ /**
+ * Drop just the first block, keeping the remaining.
+ */
+ private Operator dropDocBlockOperator(List extractFields) {
+ int end = extractFields.size() + 1;
+ List projection = new ArrayList<>(end);
+ for (int i = 1; i <= end; i++) {
+ projection.add(i);
+ }
+ return new ProjectOperator(projection);
+ }
+
private Page createNullResponse(int positionCount, List extractFields) {
final Block[] blocks = new Block[extractFields.size()];
try {
@@ -460,7 +502,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) {
request,
(CancellableTask) task,
listener.delegateFailureAndWrap(
- (l, outPage) -> ActionListener.respondAndRelease(l, new LookupResponse(outPage, blockFactory))
+ (l, resultPages) -> ActionListener.respondAndRelease(l, createLookupResponse(resultPages, blockFactory))
)
);
}
@@ -590,45 +632,24 @@ public final String toString() {
protected abstract String extraDescription();
}
- private static class LookupResponse extends TransportResponse {
- private final RefCounted refs = AbstractRefCounted.of(this::releasePage);
- private final BlockFactory blockFactory;
- private Page page;
- private long reservedBytes = 0;
+ abstract static class LookupResponse extends TransportResponse {
+ private final RefCounted refs = AbstractRefCounted.of(this::release);
+ protected final BlockFactory blockFactory;
+ protected long reservedBytes = 0;
- LookupResponse(Page page, BlockFactory blockFactory) {
- this.page = page;
- this.blockFactory = blockFactory;
- }
-
- LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
- try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
- this.page = new Page(bsi);
- }
+ LookupResponse(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
}
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- long bytes = page.ramBytesUsedByBlocks();
- blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response");
- reservedBytes += bytes;
- page.writeTo(out);
- }
-
- Page takePage() {
- var p = page;
- page = null;
- return p;
- }
+ protected abstract List takePages();
- private void releasePage() {
+ private void release() {
blockFactory.breaker().addWithoutBreaking(-reservedBytes);
- if (page != null) {
- Releasables.closeExpectNoException(page::releaseBlocks);
- }
+ innerRelease();
}
+ protected abstract void innerRelease();
+
@Override
public void incRef() {
refs.incRef();
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java
index df608a04632a2..d1664888187a9 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java
@@ -17,6 +17,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
+import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
@@ -27,7 +28,7 @@
import java.util.List;
import java.util.Objects;
-public final class EnrichLookupOperator extends AsyncOperator {
+public final class EnrichLookupOperator extends AsyncOperator {
private final EnrichLookupService enrichLookupService;
private final String sessionId;
private final CancellableTask parentTask;
@@ -128,13 +129,29 @@ protected void performAsync(Page inputPage, ActionListener listener) {
enrichFields,
source
);
+ CheckedFunction, Page, Exception> handleResponse = pages -> {
+ if (pages.size() != 1) {
+ throw new UnsupportedOperationException("ENRICH should only return a single page");
+ }
+ return inputPage.appendPage(pages.get(0));
+ };
enrichLookupService.lookupAsync(
request,
parentTask,
- ActionListener.runBefore(listener.map(inputPage::appendPage), responseHeadersCollector::collect)
+ ActionListener.runBefore(listener.map(handleResponse), responseHeadersCollector::collect)
);
}
+ @Override
+ public Page getOutput() {
+ return fetchFromBuffer();
+ }
+
+ @Override
+ protected void releaseFetchedOnAnyThread(Page page) {
+ releasePageOnAnyThread(page);
+ }
+
@Override
public String toString() {
return "EnrichOperator[index="
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
index 7057b586871eb..a343e368375cd 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
@@ -17,6 +17,7 @@
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.lookup.QueryList;
+import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.mapper.RangeType;
@@ -52,7 +53,16 @@ public EnrichLookupService(
BigArrays bigArrays,
BlockFactory blockFactory
) {
- super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom);
+ super(
+ LOOKUP_ACTION_NAME,
+ clusterService,
+ searchService,
+ transportService,
+ bigArrays,
+ blockFactory,
+ true,
+ TransportRequest::readFrom
+ );
}
@Override
@@ -86,6 +96,19 @@ protected String getRequiredPrivilege() {
return ClusterPrivilegeResolver.MONITOR_ENRICH.name();
}
+ @Override
+ protected LookupResponse createLookupResponse(List pages, BlockFactory blockFactory) throws IOException {
+ if (pages.size() != 1) {
+ throw new UnsupportedOperationException("ENRICH always makes a single page of output");
+ }
+ return new LookupResponse(pages.get(0), blockFactory);
+ }
+
+ @Override
+ protected LookupResponse readLookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
+ return new LookupResponse(in, blockFactory);
+ }
+
private static void validateTypes(DataType inputDataType, MappedFieldType fieldType) {
if (fieldType instanceof RangeFieldMapper.RangeFieldType rangeType) {
// For range policy types, the ENRICH index field type will be one of a list of supported range types,
@@ -210,4 +233,42 @@ protected String extraDescription() {
return " ,match_type=" + matchType + " ,match_field=" + matchField;
}
}
+
+ private static class LookupResponse extends AbstractLookupService.LookupResponse {
+ private Page page;
+
+ private LookupResponse(Page page, BlockFactory blockFactory) {
+ super(blockFactory);
+ this.page = page;
+ }
+
+ private LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
+ super(blockFactory);
+ try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
+ this.page = new Page(bsi);
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ long bytes = page.ramBytesUsedByBlocks();
+ blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response");
+ reservedBytes += bytes;
+ page.writeTo(out);
+ }
+
+ @Override
+ protected List takePages() {
+ var p = List.of(page);
+ page = null;
+ return p;
+ }
+
+ @Override
+ protected void innerRelease() {
+ if (page != null) {
+ Releasables.closeExpectNoException(page::releaseBlocks);
+ }
+ }
+ }
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java
index f09f7d0e23e7b..73dfcf8d43620 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java
@@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.enrich;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -15,7 +16,11 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.AsyncOperator;
import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.IsBlockedResult;
import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
@@ -23,11 +28,13 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
// TODO rename package
-public final class LookupFromIndexOperator extends AsyncOperator {
+public final class LookupFromIndexOperator extends AsyncOperator {
public record Factory(
String sessionId,
CancellableTask parentTask,
@@ -81,6 +88,14 @@ public Operator get(DriverContext driverContext) {
private final List loadFields;
private final Source source;
private long totalTerms = 0L;
+ /**
+ * Total number of pages emitted by this {@link Operator}.
+ */
+ private long emittedPages = 0L;
+ /**
+ * The ongoing join or {@code null} none is ongoing at the moment.
+ */
+ private OngoingJoin ongoing = null;
public LookupFromIndexOperator(
String sessionId,
@@ -108,7 +123,7 @@ public LookupFromIndexOperator(
}
@Override
- protected void performAsync(Page inputPage, ActionListener listener) {
+ protected void performAsync(Page inputPage, ActionListener listener) {
final Block inputBlock = inputPage.getBlock(inputChannel);
totalTerms += inputBlock.getTotalValueCount();
LookupFromIndexService.Request request = new LookupFromIndexService.Request(
@@ -120,7 +135,47 @@ protected void performAsync(Page inputPage, ActionListener listener) {
loadFields,
source
);
- lookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
+ lookupService.lookupAsync(
+ request,
+ parentTask,
+ listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator()))
+ );
+ }
+
+ @Override
+ public Page getOutput() {
+ if (ongoing == null) {
+ // No ongoing join, start a new one if we can.
+ ongoing = fetchFromBuffer();
+ if (ongoing == null) {
+ // Buffer empty, wait for the next time we're called.
+ return null;
+ }
+ }
+ if (ongoing.itr.hasNext()) {
+ // There's more to do in the ongoing join.
+ Page right = ongoing.itr.next();
+ emittedPages++;
+ try {
+ return ongoing.join.join(right);
+ } finally {
+ right.releaseBlocks();
+ }
+ }
+ // Current join is all done. Emit any trailing unmatched rows.
+ Optional remaining = ongoing.join.noMoreRightHandPages();
+ ongoing.close();
+ ongoing = null;
+ if (remaining.isEmpty()) {
+ return null;
+ }
+ emittedPages++;
+ return remaining.get();
+ }
+
+ @Override
+ protected void releaseFetchedOnAnyThread(OngoingJoin ongoingJoin) {
+ ongoingJoin.releaseOnAnyThread();
}
@Override
@@ -138,15 +193,29 @@ public String toString() {
+ "]";
}
+ @Override
+ public boolean isFinished() {
+ return ongoing == null && super.isFinished();
+ }
+
+ @Override
+ public IsBlockedResult isBlocked() {
+ if (ongoing != null) {
+ return NOT_BLOCKED;
+ }
+ return super.isBlocked();
+ }
+
@Override
protected void doClose() {
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
// then cancel it when this operator terminates early (e.g., have enough result).
+ Releasables.close(ongoing);
}
@Override
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
- return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms);
+ return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
}
public static class Status extends AsyncOperator.Status {
@@ -156,22 +225,29 @@ public static class Status extends AsyncOperator.Status {
Status::new
);
- final long totalTerms;
+ private final long totalTerms;
+ /**
+ * Total number of pages emitted by this {@link Operator}.
+ */
+ private final long emittedPages;
- Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) {
+ Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms, long emittedPages) {
super(receivedPages, completedPages, totalTimeInMillis);
this.totalTerms = totalTerms;
+ this.emittedPages = emittedPages;
}
Status(StreamInput in) throws IOException {
super(in);
this.totalTerms = in.readVLong();
+ this.emittedPages = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(totalTerms);
+ out.writeVLong(emittedPages);
}
@Override
@@ -179,11 +255,20 @@ public String getWriteableName() {
return ENTRY.name;
}
+ public long emittedPages() {
+ return emittedPages;
+ }
+
+ public long totalTerms() {
+ return totalTerms;
+ }
+
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
- innerToXContent(builder);
- builder.field("total_terms", totalTerms);
+ super.innerToXContent(builder);
+ builder.field("emitted_pages", emittedPages());
+ builder.field("total_terms", totalTerms());
return builder.endObject();
}
@@ -196,12 +281,26 @@ public boolean equals(Object o) {
return false;
}
Status status = (Status) o;
- return totalTerms == status.totalTerms;
+ return totalTerms == status.totalTerms && emittedPages == status.emittedPages;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), totalTerms);
+ return Objects.hash(super.hashCode(), totalTerms, emittedPages);
+ }
+ }
+
+ protected record OngoingJoin(RightChunkedLeftJoin join, Iterator itr) implements Releasable {
+ @Override
+ public void close() {
+ Releasables.close(join, Releasables.wrap(() -> Iterators.map(itr, page -> page::releaseBlocks)));
+ }
+
+ public void releaseOnAnyThread() {
+ Releasables.close(
+ join::releaseOnAnyThread,
+ Releasables.wrap(() -> Iterators.map(itr, page -> () -> releasePageOnAnyThread(page)))
+ );
}
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java
index 0bbfc6dd0ce99..ad65394fdfbde 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java
@@ -9,6 +9,7 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
@@ -17,6 +18,7 @@
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.lookup.QueryList;
+import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
@@ -33,6 +35,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
/**
* {@link LookupFromIndexService} performs lookup against a Lookup index for
@@ -49,7 +52,16 @@ public LookupFromIndexService(
BigArrays bigArrays,
BlockFactory blockFactory
) {
- super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom);
+ super(
+ LOOKUP_ACTION_NAME,
+ clusterService,
+ searchService,
+ transportService,
+ bigArrays,
+ blockFactory,
+ false,
+ TransportRequest::readFrom
+ );
}
@Override
@@ -73,6 +85,16 @@ protected QueryList queryList(TransportRequest request, SearchExecutionContext c
return termQueryList(fieldType, context, inputBlock, inputDataType);
}
+ @Override
+ protected LookupResponse createLookupResponse(List pages, BlockFactory blockFactory) throws IOException {
+ return new LookupResponse(pages, blockFactory);
+ }
+
+ @Override
+ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
+ return new LookupResponse(in, blockFactory);
+ }
+
@Override
protected String getRequiredPrivilege() {
return null;
@@ -171,4 +193,65 @@ protected String extraDescription() {
return " ,match_field=" + matchField;
}
}
+
+ protected static class LookupResponse extends AbstractLookupService.LookupResponse {
+ private List pages;
+
+ LookupResponse(List pages, BlockFactory blockFactory) {
+ super(blockFactory);
+ this.pages = pages;
+ }
+
+ LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
+ super(blockFactory);
+ try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
+ this.pages = bsi.readCollectionAsList(Page::new);
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ long bytes = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
+ blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize lookup join response");
+ reservedBytes += bytes;
+ out.writeCollection(pages);
+ }
+
+ @Override
+ protected List takePages() {
+ var p = pages;
+ pages = null;
+ return p;
+ }
+
+ List pages() {
+ return pages;
+ }
+
+ @Override
+ protected void innerRelease() {
+ if (pages != null) {
+ Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(pages.iterator(), page -> page::releaseBlocks)));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LookupResponse that = (LookupResponse) o;
+ return Objects.equals(pages, that.pages);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(pages);
+ }
+
+ @Override
+ public String toString() {
+ return "LookupResponse{pages=" + pages + '}';
+ }
+ }
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
index 5f4671aba2cd3..7589b5173640a 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
@@ -262,7 +262,7 @@ public final void test() throws Throwable {
);
assumeFalse(
"lookup join disabled for csv tests",
- testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName())
+ testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName())
);
assumeFalse(
"can't use TERM function in csv tests",
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
index eccad1255024f..df04ae89157fd 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
@@ -2147,7 +2147,7 @@ public void testLookupMatchTypeWrong() {
}
public void testLookupJoinUnknownIndex() {
- assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String errorMessage = "Unknown index [foobar]";
IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage);
@@ -2176,7 +2176,7 @@ public void testLookupJoinUnknownIndex() {
}
public void testLookupJoinUnknownField() {
- assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name";
String errorMessage = "1:45: Unknown column [last_name] in right side of join";
@@ -2199,7 +2199,7 @@ public void testLookupJoinUnknownField() {
}
public void testMultipleLookupJoinsGiveDifferentAttributes() {
- assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
// The field attributes that get contributed by different LOOKUP JOIN commands must have different name ids,
// even if they have the same names. Otherwise, things like dependency analysis - like in PruneColumns - cannot work based on
@@ -2229,7 +2229,7 @@ public void testMultipleLookupJoinsGiveDifferentAttributes() {
}
public void testLookupJoinIndexMode() {
- assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
var indexResolution = AnalyzerTestUtils.expandedDefaultIndexResolution();
var lookupResolution = AnalyzerTestUtils.defaultLookupResolution();
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java
index 180e32fb7c15d..2ee6cf6136114 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java
@@ -113,7 +113,7 @@ public void testTooBigQuery() {
}
public void testJoinOnConstant() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertEquals(
"1:55: JOIN ON clause only supports fields at the moment, found [123]",
error("row languages = 1, gender = \"f\" | lookup join test on 123")
@@ -129,7 +129,7 @@ public void testJoinOnConstant() {
}
public void testJoinOnMultipleFields() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertEquals(
"1:35: JOIN ON clause only supports one field at the moment, found [2]",
error("row languages = 1, gender = \"f\" | lookup join test on gender, languages")
@@ -137,7 +137,7 @@ public void testJoinOnMultipleFields() {
}
public void testJoinTwiceOnTheSameField() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertEquals(
"1:35: JOIN ON clause only supports one field at the moment, found [2]",
error("row languages = 1, gender = \"f\" | lookup join test on languages, languages")
@@ -145,7 +145,7 @@ public void testJoinTwiceOnTheSameField() {
}
public void testJoinTwiceOnTheSameField_TwoLookups() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertEquals(
"1:80: JOIN ON clause only supports one field at the moment, found [2]",
error("row languages = 1, gender = \"f\" | lookup join test on languages | eval x = 1 | lookup join test on gender, gender")
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java
index 19a30fb8eff49..124288a786ff9 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java
@@ -1957,7 +1957,7 @@ public void testSortByAggregate() {
}
public void testLookupJoinDataTypeMismatch() {
- assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code");
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java
new file mode 100644
index 0000000000000..a204e93b0d16a
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class LookupFromIndexOperatorStatusTests extends AbstractWireSerializingTestCase {
+ @Override
+ protected Writeable.Reader instanceReader() {
+ return LookupFromIndexOperator.Status::new;
+ }
+
+ @Override
+ protected LookupFromIndexOperator.Status createTestInstance() {
+ return new LookupFromIndexOperator.Status(
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomLongBetween(0, TimeValue.timeValueHours(1).millis()),
+ randomNonNegativeLong(),
+ randomNonNegativeLong()
+ );
+ }
+
+ @Override
+ protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator.Status in) throws IOException {
+ long receivedPages = in.receivedPages();
+ long completedPages = in.completedPages();
+ long totalTimeInMillis = in.totalTimeInMillis();
+ long totalTerms = in.totalTerms();
+ long emittedPages = in.emittedPages();
+ switch (randomIntBetween(0, 4)) {
+ case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong);
+ case 1 -> completedPages = randomValueOtherThan(completedPages, ESTestCase::randomNonNegativeLong);
+ case 2 -> totalTimeInMillis = randomValueOtherThan(totalTimeInMillis, ESTestCase::randomNonNegativeLong);
+ case 3 -> totalTerms = randomValueOtherThan(totalTerms, ESTestCase::randomNonNegativeLong);
+ case 4 -> emittedPages = randomValueOtherThan(emittedPages, ESTestCase::randomNonNegativeLong);
+ default -> throw new UnsupportedOperationException();
+ }
+ return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
+ }
+
+ public void testToXContent() {
+ var status = new LookupFromIndexOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120, 88);
+ String json = Strings.toString(status, true, true);
+ assertThat(json, equalTo("""
+ {
+ "received_pages" : 100,
+ "completed_pages" : 50,
+ "total_time_in_millis" : 10000,
+ "total_time" : "10s",
+ "emitted_pages" : 88,
+ "total_terms" : 120
+ }"""));
+ }
+}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexServiceResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexServiceResponseTests.java
new file mode 100644
index 0000000000000..098ea9eaa0c2d
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexServiceResponseTests.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.PageCacheRecycler;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.esql.TestBlockFactory;
+import org.junit.After;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class LookupFromIndexServiceResponseTests extends AbstractWireSerializingTestCase {
+ private final List breakers = new ArrayList<>();
+
+ LookupFromIndexService.LookupResponse createTestInstance(BlockFactory blockFactory) {
+ return new LookupFromIndexService.LookupResponse(randomList(0, 10, () -> testPage(blockFactory)), blockFactory);
+ }
+
+ /**
+ * Build a {@link Page} to test serialization. If we had nice random
+ * {@linkplain Page} generation we'd use that happily, but it's off
+ * in the tests for compute, and we're in ESQL. And we don't
+ * really need a fully random one to verify serialization
+ * here.
+ */
+ Page testPage(BlockFactory blockFactory) {
+ try (IntVector.Builder builder = blockFactory.newIntVectorFixedBuilder(3)) {
+ builder.appendInt(1);
+ builder.appendInt(2);
+ builder.appendInt(3);
+ return new Page(builder.build().asBlock());
+ }
+ }
+
+ @Override
+ protected LookupFromIndexService.LookupResponse createTestInstance() {
+ // Can't use a real block factory for the basic serialization tests because they don't release.
+ return createTestInstance(TestBlockFactory.getNonBreakingInstance());
+ }
+
+ @Override
+ protected Writeable.Reader instanceReader() {
+ return in -> new LookupFromIndexService.LookupResponse(in, TestBlockFactory.getNonBreakingInstance());
+ }
+
+ @Override
+ protected LookupFromIndexService.LookupResponse mutateInstance(LookupFromIndexService.LookupResponse instance) throws IOException {
+ assertThat(instance.blockFactory, sameInstance(TestBlockFactory.getNonBreakingInstance()));
+ List pages = new ArrayList<>(instance.pages().size());
+ pages.addAll(instance.pages());
+ pages.add(testPage(TestBlockFactory.getNonBreakingInstance()));
+ return new LookupFromIndexService.LookupResponse(pages, instance.blockFactory);
+ }
+
+ @Override
+ protected NamedWriteableRegistry getNamedWriteableRegistry() {
+ return new NamedWriteableRegistry(List.of(IntBlock.ENTRY));
+ }
+
+ public void testWithBreaker() throws IOException {
+ BlockFactory origFactory = blockFactory();
+ BlockFactory copyFactory = blockFactory();
+ LookupFromIndexService.LookupResponse orig = createTestInstance(origFactory);
+ try {
+ LookupFromIndexService.LookupResponse copy = copyInstance(
+ orig,
+ getNamedWriteableRegistry(),
+ (out, v) -> v.writeTo(out),
+ in -> new LookupFromIndexService.LookupResponse(in, copyFactory),
+ TransportVersion.current()
+ );
+ try {
+ assertThat(copy, equalTo(orig));
+ } finally {
+ copy.decRef();
+ }
+ assertThat(copyFactory.breaker().getUsed(), equalTo(0L));
+ } finally {
+ orig.decRef();
+ }
+ assertThat(origFactory.breaker().getUsed(), equalTo(0L));
+ }
+
+ /**
+ * Tests that we don't reserve any memory other than that in the {@link Page}s we
+ * hold, and calling {@link LookupFromIndexService.LookupResponse#takePages}
+ * gives us those pages. If we then close those pages, we should have 0
+ * reserved memory.
+ */
+ public void testTakePages() {
+ BlockFactory factory = blockFactory();
+ LookupFromIndexService.LookupResponse orig = createTestInstance(factory);
+ try {
+ if (orig.pages().isEmpty()) {
+ assertThat(factory.breaker().getUsed(), equalTo(0L));
+ return;
+ }
+ List pages = orig.takePages();
+ Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(pages.iterator(), page -> page::releaseBlocks)));
+ assertThat(factory.breaker().getUsed(), equalTo(0L));
+ assertThat(orig.takePages(), nullValue());
+ } finally {
+ orig.decRef();
+ }
+ assertThat(factory.breaker().getUsed(), equalTo(0L));
+ }
+
+ private BlockFactory blockFactory() {
+ BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofMb(4 /* more than we need*/))
+ .withCircuitBreaking();
+ CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
+ breakers.add(breaker);
+ return new BlockFactory(breaker, bigArrays);
+ }
+
+ @After
+ public void allBreakersEmpty() throws Exception {
+ // first check that all big arrays are released, which can affect breakers
+ MockBigArrays.ensureAllArraysAreReleased();
+
+ for (CircuitBreaker breaker : breakers) {
+ assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L));
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
index 8b12267011f02..b0cd70a2d73c4 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
@@ -4928,7 +4928,7 @@ public void testPlanSanityCheck() throws Exception {
}
public void testPlanSanityCheckWithBinaryPlans() throws Exception {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
var plan = optimizedPlan("""
FROM test
@@ -6003,7 +6003,7 @@ public void testLookupStats() {
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
*/
public void testLookupJoinPushDownFilterOnJoinKeyWithRename() {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String query = """
FROM test
@@ -6045,7 +6045,7 @@ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() {
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
*/
public void testLookupJoinPushDownFilterOnLeftSideField() {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String query = """
FROM test
@@ -6088,7 +6088,7 @@ public void testLookupJoinPushDownFilterOnLeftSideField() {
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
*/
public void testLookupJoinPushDownDisabledForLookupField() {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String query = """
FROM test
@@ -6132,7 +6132,7 @@ public void testLookupJoinPushDownDisabledForLookupField() {
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20]
*/
public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String query = """
FROM test
@@ -6183,7 +6183,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20]
*/
public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField() {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
String query = """
FROM test
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
index 2e620256a41ef..37f25223701ad 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
@@ -2618,7 +2618,7 @@ public void testVerifierOnMissingReferences() {
}
public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
// Do not assert serialization:
// This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator.
@@ -7204,7 +7204,7 @@ public void testLookupThenTopN() {
}
public void testLookupJoinFieldLoading() throws Exception {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
TestDataSource data = dataSetWithLookupIndices(Map.of("lookup_index", List.of("first_name", "foo", "bar", "baz")));
@@ -7281,7 +7281,7 @@ public void testLookupJoinFieldLoading() throws Exception {
}
public void testLookupJoinFieldLoadingTwoLookups() throws Exception {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
TestDataSource data = dataSetWithLookupIndices(
Map.of(
@@ -7335,7 +7335,7 @@ public void testLookupJoinFieldLoadingTwoLookups() throws Exception {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/119082")
public void testLookupJoinFieldLoadingTwoLookupsProjectInBetween() throws Exception {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
TestDataSource data = dataSetWithLookupIndices(
Map.of(
@@ -7376,7 +7376,7 @@ public void testLookupJoinFieldLoadingTwoLookupsProjectInBetween() throws Except
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/118778")
public void testLookupJoinFieldLoadingDropAllFields() throws Exception {
- assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
TestDataSource data = dataSetWithLookupIndices(Map.of("lookup_index", List.of("first_name", "foo", "bar", "baz")));
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java
index 4db4f7925d4ff..b1c9030db7a43 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java
@@ -1365,7 +1365,7 @@ public void testMetrics() {
}
public void testLookupJoin() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"FROM employees | KEEP languages | RENAME languages AS language_code | LOOKUP JOIN languages_lookup ON language_code",
Set.of("languages", "languages.*", "language_code", "language_code.*"),
@@ -1374,7 +1374,7 @@ public void testLookupJoin() {
}
public void testLookupJoinKeep() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM employees
@@ -1388,7 +1388,7 @@ public void testLookupJoinKeep() {
}
public void testLookupJoinKeepWildcard() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM employees
@@ -1402,7 +1402,7 @@ public void testLookupJoinKeepWildcard() {
}
public void testMultiLookupJoin() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1415,7 +1415,7 @@ public void testMultiLookupJoin() {
}
public void testMultiLookupJoinKeepBefore() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1429,7 +1429,7 @@ public void testMultiLookupJoinKeepBefore() {
}
public void testMultiLookupJoinKeepBetween() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1454,7 +1454,7 @@ public void testMultiLookupJoinKeepBetween() {
}
public void testMultiLookupJoinKeepAfter() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1481,7 +1481,7 @@ public void testMultiLookupJoinKeepAfter() {
}
public void testMultiLookupJoinKeepAfterWildcard() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1495,7 +1495,7 @@ public void testMultiLookupJoinKeepAfterWildcard() {
}
public void testMultiLookupJoinSameIndex() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1509,7 +1509,7 @@ public void testMultiLookupJoinSameIndex() {
}
public void testMultiLookupJoinSameIndexKeepBefore() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1524,7 +1524,7 @@ public void testMultiLookupJoinSameIndexKeepBefore() {
}
public void testMultiLookupJoinSameIndexKeepBetween() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
@@ -1550,7 +1550,7 @@ public void testMultiLookupJoinSameIndexKeepBetween() {
}
public void testMultiLookupJoinSameIndexKeepAfter() {
- assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled());
+ assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled());
assertFieldNames(
"""
FROM sample_data
diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml
index 1567b6b556bdd..e7cda33896149 100644
--- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml
+++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml
@@ -6,7 +6,7 @@ setup:
- method: POST
path: /_query
parameters: []
- capabilities: [join_lookup_v10]
+ capabilities: [join_lookup_v11]
reason: "uses LOOKUP JOIN"
- do:
indices.create: