Skip to content

Commit 4462070

Browse files
nik9000elasticsearchmachine
andauthored
ESQL: Make LOOKUP more left-joiny (#119475)
* ESQL: Rewire multivalue handling of LOOKUP * Move thing * Rework async operator * Works now? Had to disable some stuff * Spotless * Fix release * [CI] Auto commit changes from spotless * Spotless * NOCOMMITS * I need a new version! * Change sort * Stop being silly * From review * Drop MV_SORTs in tests --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent f9a3721 commit 4462070

File tree

28 files changed

+865
-274
lines changed

28 files changed

+865
-274
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/capabilities/NodeCapability.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,9 @@ public void writeTo(StreamOutput out) throws IOException {
4141

4242
out.writeBoolean(supported);
4343
}
44+
45+
@Override
46+
public String toString() {
47+
return "NodeCapability{supported=" + supported + '}';
48+
}
4449
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,9 @@ public OrdinalBytesRefBlock expand() {
246246
public long ramBytesUsed() {
247247
return ordinals.ramBytesUsed() + bytes.ramBytesUsed();
248248
}
249+
250+
@Override
251+
public String toString() {
252+
return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]";
253+
}
249254
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@
2929
import java.util.concurrent.atomic.LongAdder;
3030

3131
/**
32-
* {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}.
33-
* This operator acts as a client and operates on a per-page basis to reduce communication overhead.
32+
* {@link AsyncOperator} performs an external computation specified in
33+
* {@link #performAsync(Page, ActionListener)}. This operator acts as a client
34+
* to reduce communication overhead and fetches a {@code Fetched} at a time.
35+
* It's the responsibility of subclasses to transform that {@code Fetched} into
36+
* output.
3437
* @see #performAsync(Page, ActionListener)
3538
*/
36-
public abstract class AsyncOperator implements Operator {
39+
public abstract class AsyncOperator<Fetched> implements Operator {
3740

3841
private volatile SubscribableListener<Void> blockedFuture;
3942

40-
private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
43+
private final Map<Long, Fetched> buffers = ConcurrentCollections.newConcurrentMap();
4144
private final FailureCollector failureCollector = new FailureCollector();
4245
private final DriverContext driverContext;
4346

@@ -84,7 +87,7 @@ public void addInput(Page input) {
8487
driverContext.addAsyncAction();
8588
boolean success = false;
8689
try {
87-
final ActionListener<Page> listener = ActionListener.wrap(output -> {
90+
final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
8891
buffers.put(seqNo, output);
8992
onSeqNoCompleted(seqNo);
9093
}, e -> {
@@ -105,18 +108,20 @@ public void addInput(Page input) {
105108
}
106109
}
107110

108-
private void releasePageOnAnyThread(Page page) {
111+
protected static void releasePageOnAnyThread(Page page) {
109112
page.allowPassingToDifferentDriver();
110113
page.releaseBlocks();
111114
}
112115

116+
protected abstract void releaseFetchedOnAnyThread(Fetched result);
117+
113118
/**
114119
* Performs an external computation and notify the listener when the result is ready.
115120
*
116121
* @param inputPage the input page
117122
* @param listener the listener
118123
*/
119-
protected abstract void performAsync(Page inputPage, ActionListener<Page> listener);
124+
protected abstract void performAsync(Page inputPage, ActionListener<Fetched> listener);
120125

121126
protected abstract void doClose();
122127

@@ -126,7 +131,7 @@ private void onSeqNoCompleted(long seqNo) {
126131
notifyIfBlocked();
127132
}
128133
if (closed || failureCollector.hasFailure()) {
129-
discardPages();
134+
discardResults();
130135
}
131136
}
132137

@@ -146,18 +151,18 @@ private void notifyIfBlocked() {
146151
private void checkFailure() {
147152
Exception e = failureCollector.getFailure();
148153
if (e != null) {
149-
discardPages();
154+
discardResults();
150155
throw ExceptionsHelper.convertToRuntime(e);
151156
}
152157
}
153158

154-
private void discardPages() {
159+
private void discardResults() {
155160
long nextCheckpoint;
156161
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
157-
Page page = buffers.remove(nextCheckpoint);
162+
Fetched result = buffers.remove(nextCheckpoint);
158163
checkpoint.markSeqNoAsPersisted(nextCheckpoint);
159-
if (page != null) {
160-
releasePageOnAnyThread(page);
164+
if (result != null) {
165+
releaseFetchedOnAnyThread(result);
161166
}
162167
}
163168
}
@@ -166,7 +171,7 @@ private void discardPages() {
166171
public final void close() {
167172
finish();
168173
closed = true;
169-
discardPages();
174+
discardResults();
170175
doClose();
171176
}
172177

@@ -185,15 +190,18 @@ public boolean isFinished() {
185190
}
186191
}
187192

188-
@Override
189-
public Page getOutput() {
193+
/**
194+
* Get a {@link Fetched} from the buffer.
195+
* @return a result if one is ready or {@code null} if none are available.
196+
*/
197+
public final Fetched fetchFromBuffer() {
190198
checkFailure();
191199
long persistedCheckpoint = checkpoint.getPersistedCheckpoint();
192200
if (persistedCheckpoint < checkpoint.getProcessedCheckpoint()) {
193201
persistedCheckpoint++;
194-
Page page = buffers.remove(persistedCheckpoint);
202+
Fetched result = buffers.remove(persistedCheckpoint);
195203
checkpoint.markSeqNoAsPersisted(persistedCheckpoint);
196-
return page;
204+
return result;
197205
} else {
198206
return null;
199207
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@
128128
* | l99 | null | null |
129129
* }</pre>
130130
*/
131-
class RightChunkedLeftJoin implements Releasable {
131+
public class RightChunkedLeftJoin implements Releasable {
132132
private final Page leftHand;
133133
private final int mergedElementCount;
134134
/**
@@ -138,12 +138,12 @@ class RightChunkedLeftJoin implements Releasable {
138138
*/
139139
private int next = 0;
140140

141-
RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) {
141+
public RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) {
142142
this.leftHand = leftHand;
143143
this.mergedElementCount = mergedElementCounts;
144144
}
145145

146-
Page join(Page rightHand) {
146+
public Page join(Page rightHand) {
147147
IntVector positions = rightHand.<IntBlock>getBlock(0).asVector();
148148
if (positions.getInt(0) < next - 1) {
149149
throw new IllegalArgumentException("maximum overlap is one position");
@@ -209,7 +209,7 @@ Page join(Page rightHand) {
209209
}
210210
}
211211

212-
Optional<Page> noMoreRightHandPages() {
212+
public Optional<Page> noMoreRightHandPages() {
213213
if (next == leftHand.getPositionCount()) {
214214
return Optional.empty();
215215
}
@@ -237,6 +237,14 @@ Optional<Page> noMoreRightHandPages() {
237237
}
238238
}
239239

240+
/**
241+
* Release this on <strong>any</strong> thread, rather than just the thread that built it.
242+
*/
243+
public void releaseOnAnyThread() {
244+
leftHand.allowPassingToDifferentDriver();
245+
leftHand.releaseBlocks();
246+
}
247+
240248
@Override
241249
public void close() {
242250
Releasables.close(leftHand::releaseBlocks);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,24 @@ protected Page createPage(int positionOffset, int length) {
110110
}
111111
};
112112
int maxConcurrentRequests = randomIntBetween(1, 10);
113-
AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) {
113+
AsyncOperator<Page> asyncOperator = new AsyncOperator<Page>(driverContext, maxConcurrentRequests) {
114114
final LookupService lookupService = new LookupService(threadPool, globalBlockFactory, dict, maxConcurrentRequests);
115115

116116
@Override
117117
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
118118
lookupService.lookupAsync(inputPage, listener);
119119
}
120120

121+
@Override
122+
public Page getOutput() {
123+
return fetchFromBuffer();
124+
}
125+
126+
@Override
127+
protected void releaseFetchedOnAnyThread(Page page) {
128+
releasePageOnAnyThread(page);
129+
}
130+
121131
@Override
122132
public void doClose() {
123133

@@ -159,7 +169,7 @@ public void doClose() {
159169
Releasables.close(localBreaker);
160170
}
161171

162-
class TestOp extends AsyncOperator {
172+
class TestOp extends AsyncOperator<Page> {
163173
Map<Page, ActionListener<Page>> handlers = new HashMap<>();
164174

165175
TestOp(DriverContext driverContext, int maxOutstandingRequests) {
@@ -171,6 +181,16 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
171181
handlers.put(inputPage, listener);
172182
}
173183

184+
@Override
185+
public Page getOutput() {
186+
return fetchFromBuffer();
187+
}
188+
189+
@Override
190+
protected void releaseFetchedOnAnyThread(Page page) {
191+
releasePageOnAnyThread(page);
192+
}
193+
174194
@Override
175195
protected void doClose() {
176196

@@ -233,7 +253,7 @@ public void testFailure() throws Exception {
233253
);
234254
int maxConcurrentRequests = randomIntBetween(1, 10);
235255
AtomicBoolean failed = new AtomicBoolean();
236-
AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) {
256+
AsyncOperator<Page> asyncOperator = new AsyncOperator<Page>(driverContext, maxConcurrentRequests) {
237257
@Override
238258
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
239259
ActionRunnable<Page> command = new ActionRunnable<>(listener) {
@@ -256,6 +276,16 @@ protected void doRun() {
256276
}
257277
}
258278

279+
@Override
280+
public Page getOutput() {
281+
return fetchFromBuffer();
282+
}
283+
284+
@Override
285+
protected void releaseFetchedOnAnyThread(Page page) {
286+
releasePageOnAnyThread(page);
287+
}
288+
259289
@Override
260290
protected void doClose() {
261291

@@ -285,7 +315,7 @@ public void testIsFinished() {
285315
for (int i = 0; i < iters; i++) {
286316
DriverContext driverContext = new DriverContext(blockFactory.bigArrays(), blockFactory);
287317
CyclicBarrier barrier = new CyclicBarrier(2);
288-
AsyncOperator asyncOperator = new AsyncOperator(driverContext, between(1, 10)) {
318+
AsyncOperator<Page> asyncOperator = new AsyncOperator<Page>(driverContext, between(1, 10)) {
289319
@Override
290320
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
291321
ActionRunnable<Page> command = new ActionRunnable<>(listener) {
@@ -302,6 +332,16 @@ protected void doRun() {
302332
threadPool.executor(ESQL_TEST_EXECUTOR).execute(command);
303333
}
304334

335+
@Override
336+
public Page getOutput() {
337+
return fetchFromBuffer();
338+
}
339+
340+
@Override
341+
protected void releaseFetchedOnAnyThread(Page page) {
342+
releasePageOnAnyThread(page);
343+
}
344+
305345
@Override
306346
protected void doClose() {
307347

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ private static Page randomPage() {
325325
return new Page(block.block());
326326
}
327327

328-
static class SwitchContextOperator extends AsyncOperator {
328+
static class SwitchContextOperator extends AsyncOperator<Page> {
329329
private final ThreadPool threadPool;
330330

331331
SwitchContextOperator(DriverContext driverContext, ThreadPool threadPool) {
@@ -348,6 +348,16 @@ protected void performAsync(Page page, ActionListener<Page> listener) {
348348
}), TimeValue.timeValueNanos(between(1, 1_000_000)), threadPool.executor("esql"));
349349
}
350350

351+
@Override
352+
public Page getOutput() {
353+
return fetchFromBuffer();
354+
}
355+
356+
@Override
357+
protected void releaseFetchedOnAnyThread(Page page) {
358+
releasePageOnAnyThread(page);
359+
}
360+
351361
@Override
352362
protected void doClose() {
353363

x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ record Listen(long timestamp, String songId, double duration) {
548548
public void testLookupJoinIndexAllowed() throws Exception {
549549
assumeTrue(
550550
"Requires LOOKUP JOIN capability",
551-
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName()))
551+
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName()))
552552
);
553553

554554
Response resp = runESQLCommand(
@@ -587,7 +587,7 @@ public void testLookupJoinIndexAllowed() throws Exception {
587587
public void testLookupJoinIndexForbidden() throws Exception {
588588
assumeTrue(
589589
"Requires LOOKUP JOIN capability",
590-
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName()))
590+
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName()))
591591
);
592592

593593
var resp = expectThrows(

x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.List;
2121

2222
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
23-
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V10;
23+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V11;
2424

2525
public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase {
2626
@ClassRule
@@ -82,7 +82,7 @@ protected boolean supportsInferenceTestService() {
8282

8383
@Override
8484
protected boolean supportsIndexModeLookup() throws IOException {
85-
return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName()));
85+
return hasCapabilities(List.of(JOIN_LOOKUP_V11.capabilityName()));
8686
}
8787

8888
@Override

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
4949
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
5050
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
51-
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V10;
51+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V11;
5252
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
5353
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
5454
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
@@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException {
124124
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
125125
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
126126
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
127-
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V10.capabilityName()));
127+
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V11.capabilityName()));
128128
}
129129

130130
private TestFeatureService remoteFeaturesService() throws IOException {

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public void testIndicesDontExist() throws IOException {
227227
assertThat(e.getMessage(), containsString("index_not_found_exception"));
228228
assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]")));
229229

230-
if (EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()) {
230+
if (EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()) {
231231
e = expectThrows(
232232
ResponseException.class,
233233
() -> runEsql(timestampFilter("gte", "2020-01-01").query(from("test1") + " | LOOKUP JOIN foo ON id1"))

0 commit comments

Comments
 (0)