Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
ff822ef
Add capability to stop async query on demand
smalyshev Dec 3, 2024
e2e761b
Add test skeleton
smalyshev Dec 5, 2024
932d0f9
Add security check
smalyshev Dec 6, 2024
beb7b02
Allow close exchange early
dnhatn Dec 5, 2024
f7e1d9c
Fix tests
dnhatn Dec 6, 2024
bada285
Add query interruption
smalyshev Dec 6, 2024
867a46c
spotless
smalyshev Dec 6, 2024
ccd6d34
code cleanup
smalyshev Dec 6, 2024
406d1a0
Add action to non-op list
smalyshev Dec 6, 2024
188dedb
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 6, 2024
124e070
Wait for the listener to complete
smalyshev Dec 6, 2024
0ce939b
Allow early termination in Driver
dnhatn Dec 7, 2024
516abbe
Capture partial status
smalyshev Dec 9, 2024
b53e1f1
Merge branch 'pr/118211' into partial-result-on-demand
smalyshev Dec 9, 2024
514e22d
Ensure remote pipeline early termination
smalyshev Dec 12, 2024
49c768e
fix tests
smalyshev Dec 12, 2024
85220a7
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 13, 2024
1a93f9f
more test fixes
smalyshev Dec 13, 2024
ab08912
test fix
smalyshev Dec 13, 2024
4e5b3de
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 17, 2024
b4729eb
More tests
smalyshev Dec 18, 2024
ddb8794
Plugin refactoring
smalyshev Dec 18, 2024
02a440c
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 18, 2024
ebb4cd1
More tests
smalyshev Dec 19, 2024
93c5543
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 19, 2024
bbdcdec
More tests
smalyshev Dec 19, 2024
cb3620a
Update docs/changelog/118122.yaml
smalyshev Dec 19, 2024
1b462b7
More tests
smalyshev Dec 19, 2024
a6185d7
test fix
smalyshev Dec 19, 2024
a6da813
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 21, 2024
10c0d02
Update 118122.yaml
smalyshev Dec 22, 2024
e928e10
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 22, 2024
ab37d10
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 26, 2024
4a94c15
Feedback
smalyshev Dec 26, 2024
605a5ee
[CI] Auto commit changes from spotless
Dec 26, 2024
8035e41
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 26, 2024
a488391
Improve handling of PARTIAL results
smalyshev Dec 27, 2024
6b949fe
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 27, 2024
6d41758
Fix tests - we do need to serialize isPartial in exec info
smalyshev Dec 27, 2024
d310ba4
Add checks for delay() usage
smalyshev Dec 27, 2024
8bd844f
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 2, 2025
a120670
add comments
smalyshev Jan 2, 2025
812322b
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 2, 2025
f781e7a
[CI] Auto commit changes from spotless
Jan 2, 2025
fc4d476
Add JSON defs for stop endpoint
smalyshev Jan 2, 2025
d65f173
Add docs
smalyshev Jan 2, 2025
9098c82
fix tests
smalyshev Jan 2, 2025
6f58544
fix API name
smalyshev Jan 2, 2025
a44da7b
add is_partial back, somehow it got dropped
smalyshev Jan 3, 2025
0db9d98
Add async stop telemetry test
smalyshev Jan 3, 2025
9712367
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 3, 2025
145a07e
Fix bad id test
smalyshev Jan 6, 2025
3cb2527
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 6, 2025
391253f
Delete async search at the end of the test
smalyshev Jan 6, 2025
080e1ba
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 6, 2025
d86acb4
Remove DriverEarlyTerminationException
smalyshev Jan 7, 2025
e9559fe
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 7, 2025
7ccc749
Cleanup partial status handling
smalyshev Jan 7, 2025
a67234c
Refine skipped checks
smalyshev Jan 7, 2025
9ab0207
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 8, 2025
65bf5a5
Cleanup & some docs
smalyshev Jan 8, 2025
d1160b6
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 9, 2025
e739670
Remove onFinishEarly and use EsqlQueryListener instead
smalyshev Jan 9, 2025
ebeeaf4
remove micro-optimization
smalyshev Jan 9, 2025
8900a9e
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 9, 2025
4e9a17a
Use new listener class
smalyshev Jan 10, 2025
c66b630
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 13, 2025
c7a3e3d
Add test to ensure we early terminate
smalyshev Jan 13, 2025
c5bf7c3
[CI] Auto commit changes from spotless
Jan 13, 2025
291394e
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 15, 2025
c49262f
Remove setSource as superceded by https://github.com/elastic/elastics…
smalyshev Jan 14, 2025
765b8ff
[CI] Auto commit changes from spotless
Jan 15, 2025
ba0c337
Pull feedback
smalyshev Jan 15, 2025
18bb7d5
fix test
smalyshev Jan 15, 2025
c24721a
Docs improvements
smalyshev Jan 15, 2025
3b2cf6a
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 15, 2025
7416227
Remove timeout
smalyshev Jan 15, 2025
6c98a2c
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 15, 2025
280edcd
[CI] Auto commit changes from spotless
Jan 16, 2025
1665d43
Restore the wait time
smalyshev Jan 16, 2025
2bfad31
Merge branch 'main' into partial-result-on-demand
Jan 21, 2025
8825fe5
Move is_partial to the top dir
smalyshev Jan 21, 2025
be1badf
Fix esql tests to allow is_partial in result
smalyshev Jan 21, 2025
d74b79e
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 21, 2025
c83f913
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 21, 2025
5d9056e
test fixes
smalyshev Jan 21, 2025
58d1b98
test fixes
smalyshev Jan 21, 2025
3814c2f
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 21, 2025
7fb5e63
test fix
smalyshev Jan 21, 2025
94e4e07
test fix
smalyshev Jan 22, 2025
bea45e3
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 22, 2025
ed26482
Fix doc tests
smalyshev Jan 22, 2025
10da662
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 22, 2025
dc31690
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 22, 2025
3d82200
pull feedback
smalyshev Jan 22, 2025
bcfca2b
Merge remote-tracking branch 'elastic/main' into fork/smalyshev/parti…
dnhatn Jan 22, 2025
d08313a
cluster status
dnhatn Jan 23, 2025
fb33318
better status
dnhatn Jan 23, 2025
058705f
wait for local cluster
dnhatn Jan 23, 2025
58ea6c7
Merge remote-tracking branch 'elastic/main' into fork/smalyshev/parti…
dnhatn Jan 23, 2025
8d27af8
NPE
dnhatn Jan 23, 2025
99d5eb4
fix tests
dnhatn Jan 23, 2025
80087f5
restore the stop query check
smalyshev Jan 23, 2025
cd5c4a7
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 23, 2025
9d97471
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.async;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Objects;

public class AsyncStopRequest extends ActionRequest {
private final String id;

/**
* Creates a new request
*
* @param id The id of the search progress request.
*/
public AsyncStopRequest(String id) {
this.id = id;
}

public AsyncStopRequest(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* Returns the id of the async search.
*/
public String getId() {
return id;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AsyncStopRequest request = (AsyncStopRequest) o;
return Objects.equals(id, request.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
*/
public class EsqlAsyncActionNames {
public static final String ESQL_ASYNC_GET_RESULT_ACTION_NAME = "indices:data/read/esql/async/get";
public static final String ESQL_ASYNC_STOP_ACTION_NAME = "indices:data/read/esql/async/stop";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
Expand All @@ -40,10 +42,11 @@

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
Expand Down Expand Up @@ -293,7 +296,7 @@ static final class TransportRemoteSink implements RemoteSink {
final Executor responseExecutor;

final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicReference<SubscribableListener<Void>> completionListenerRef = new AtomicReference<>(null);

TransportRemoteSink(
TransportService transportService,
Expand All @@ -318,13 +321,14 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
return;
}
// already finished
if (finished.get()) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
SubscribableListener<Void> completionListener = completionListenerRef.get();
if (completionListener != null) {
completionListener.addListener(listener.map(unused -> new ExchangeResponse(blockFactory, null, true)));
return;
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
finished.set(true);
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
Expand Down Expand Up @@ -356,10 +360,19 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang

@Override
public void close(ActionListener<Void> listener) {
if (finished.compareAndSet(false, true)) {
doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null)));
} else {
listener.onResponse(null);
final SubscribableListener<Void> candidate = new SubscribableListener<>();
final SubscribableListener<Void> actual = completionListenerRef.updateAndGet(
curr -> Objects.requireNonNullElse(curr, candidate)
);
actual.addListener(listener);
if (candidate == actual) {
doFetchPageAsync(true, ActionListener.wrap(r -> {
final Page page = r.takePage();
if (page != null) {
page.releaseBlocks();
}
candidate.onResponse(null);
}, e -> candidate.onResponse(null)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.IsBlockedResult;
import org.elasticsearch.core.Releasable;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -41,6 +43,9 @@ public final class ExchangeSourceHandler {
// The final failure collected will be notified to callers via the {@code completionListener}.
private final FailureCollector failure = new FailureCollector();

private final AtomicInteger nextSinkId = new AtomicInteger();
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();

/**
* Creates a new ExchangeSourceHandler.
*
Expand All @@ -53,7 +58,9 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
this.buffer = new ExchangeBuffer(maxBufferSize);
this.fetchExecutor = fetchExecutor;
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
final PendingInstances closingSinks = new PendingInstances(() -> {});
closingSinks.trackNewInstance();
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
buffer.addCompletionListener(ActionListener.running(() -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
Expand Down Expand Up @@ -257,7 +264,11 @@ void onSinkComplete() {
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
*/
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(listener));
final int sinkId = nextSinkId.incrementAndGet();
remoteSinks.put(sinkId, remoteSink);
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
);
fetchExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand Down Expand Up @@ -291,6 +302,22 @@ public Releasable addEmptySink() {
return outstandingSinks::finishInstance;
}

/**
* Gracefully terminates the exchange source early by instructing all remote exchange sinks to stop their computations.
* This can happen when the exchange source has accumulated enough data (e.g., reaching the LIMIT) or when users want to
* see the current result immediately.
*
* @param drainingPages whether to discard pages already fetched in the exchange
*/
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
buffer.finish(drainingPages);
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
for (RemoteSink remoteSink : remoteSinks.values()) {
remoteSink.close(refs.acquire());
}
}
}

private static class PendingInstances {
private final AtomicInteger instances = new AtomicInteger();
private final SubscribableListener<Void> completion = new SubscribableListener<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.compute.data.Page;

public interface RemoteSink {

void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);

default void close(ActionListener<Void> listener) {
fetchPageAsync(true, listener.delegateFailure((l, r) -> {
try {
r.close();
} finally {
l.onResponse(null);
final Page page = r.takePage();
if (page != null) {
page.releaseBlocks();
}
l.onResponse(null);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -421,7 +423,7 @@ public void testExchangeSourceContinueOnFailure() {
}
}

public void testEarlyTerminate() {
public void testClosingSinks() {
BlockFactory blockFactory = blockFactory();
IntBlock block1 = blockFactory.newConstantIntBlockWith(1, 2);
IntBlock block2 = blockFactory.newConstantIntBlockWith(1, 2);
Expand All @@ -441,6 +443,57 @@ public void testEarlyTerminate() {
assertTrue(sink.isFinished());
}

public void testFinishEarly() throws Exception {
ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic(), ActionListener.noop());
Semaphore permits = new Semaphore(between(1, 5));
BlockFactory blockFactory = blockFactory();
Queue<Page> pages = ConcurrentCollections.newQueue();
ExchangeSource exchangeSource = sourceHandler.createExchangeSource();
AtomicBoolean sinkClosed = new AtomicBoolean();
PlainActionFuture<Void> sinkCompleted = new PlainActionFuture<>();
sourceHandler.addRemoteSink((allSourcesFinished, listener) -> {
if (allSourcesFinished) {
sinkClosed.set(true);
permits.release(10);
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
} else {
try {
if (permits.tryAcquire(between(0, 100), TimeUnit.MICROSECONDS)) {
boolean closed = sinkClosed.get();
final Page page;
if (closed) {
page = new Page(blockFactory.newConstantIntBlockWith(1, 1));
pages.add(page);
} else {
page = null;
}
listener.onResponse(new ExchangeResponse(blockFactory, page, closed));
} else {
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
}
} catch (Exception e) {
throw new AssertionError(e);
}
}
}, false, between(1, 3), sinkCompleted);
threadPool.schedule(
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
TimeValue.timeValueMillis(between(0, 10)),
threadPool.generic()
);
sinkCompleted.actionGet();
Page p;
while ((p = exchangeSource.pollPage()) != null) {
assertSame(p, pages.poll());
p.releaseBlocks();
}
while ((p = pages.poll()) != null) {
p.releaseBlocks();
}
assertTrue(exchangeSource.isFinished());
exchangeSource.finish();
}

public void testConcurrentWithTransportActions() {
MockTransportService node0 = newTransportService();
ExchangeService exchange0 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
Expand Down Expand Up @@ -357,6 +358,63 @@ public void testAsyncQueriesWithLimit0() throws IOException {
}
}

public void testStopQuery() throws Exception {
Map<String, Object> testClusterInfo = setupClusters(3);
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards");
int remote2NumShards = (Integer) testClusterInfo.get("remote2.blocking_index.num_shards");

Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

AtomicReference<String> asyncExecutionId = new AtomicReference<>();

String q = "FROM logs-*,cluster-a:logs-*,remote-b:blocking | STATS total=sum(const) | LIMIT 10";
try (EsqlQueryResponse resp = runAsyncQuery(q, requestIncludeMeta, null, TimeValue.timeValueMillis(100))) {
assertTrue(resp.isRunning());
assertNotNull("async execution id is null", resp.asyncExecutionId());
asyncExecutionId.set(resp.asyncExecutionId().get());
// executionInfo may or may not be set on the initial response when there is a relatively low wait_for_completion_timeout
// so we do not check for it here
}

// wait until we know that the query against 'remote-b:blocking' has started
PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster("cluster-a");
assertThat(clusterA.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)));
}
});

/* at this point:
* the query against cluster-a should be finished
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
*/

// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId.get());
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow remoteB query to proceed
PauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(1, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
} finally {
AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId.get());
assertThat(acknowledgedResponse.isAcknowledged(), is(true));
}
}

protected EsqlQueryResponse runAsyncQuery(String query, Boolean ccsMetadata, QueryBuilder filter, TimeValue waitCompletionTime) {
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
request.query(query);
Expand Down
Loading