Skip to content

Commit d59df8a

Browse files
authored
Async search: Add ID and "is running" http headers (#112431)
Add the async execution ID and "is running" flag in the response as HTTP headers. This allows users to know the request status without having to parse the response body. It was also implemented in the `/_async_search/status/<id>` endpoint for consistency. Continuation of #111840, which implemented this same thing for ESQL. Fixes #109576
1 parent 6d161e3 commit d59df8a

File tree

6 files changed

+118
-13
lines changed

6 files changed

+118
-13
lines changed

docs/changelog/112431.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 112431
2+
summary: "Async search: Add ID and \"is running\" http headers"
3+
area: Search
4+
type: feature
5+
issues:
6+
- 109576

x-pack/plugin/async-search/qa/rest/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import org.elasticsearch.gradle.internal.info.BuildParams
22

33
apply plugin: 'elasticsearch.base-internal-es-plugin'
4+
apply plugin: 'elasticsearch.internal-java-rest-test'
45
apply plugin: 'elasticsearch.legacy-yaml-rest-test'
56
apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test'
67

@@ -10,6 +11,10 @@ esplugin {
1011
classname 'org.elasticsearch.query.DeprecatedQueryPlugin'
1112
}
1213

14+
dependencies {
15+
clusterPlugins project(xpackModule('async-search'))
16+
}
17+
1318
restResources {
1419
restApi {
1520
include '_common', 'indices', 'index', 'async_search'
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.qa;
9+
10+
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.Response;
12+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
13+
import org.elasticsearch.test.rest.ESRestTestCase;
14+
import org.junit.Before;
15+
import org.junit.ClassRule;
16+
17+
import java.io.IOException;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
21+
public class AsyncSearchHeadersIT extends ESRestTestCase {
22+
@ClassRule
23+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().plugin("x-pack-async-search").build();
24+
25+
@Override
26+
protected String getTestRestCluster() {
27+
return cluster.getHttpAddresses();
28+
}
29+
30+
@Before
31+
public void createIndex() throws IOException {
32+
client().performRequest(new Request("PUT", "/test_index"));
33+
}
34+
35+
public void testAsyncHeaders() throws IOException {
36+
Response submitResponse = client().performRequest(new Request("POST", "/test_index/_async_search?keep_on_completion=true"));
37+
var asyncExecutionId = assertAsyncHeaders(submitResponse);
38+
39+
Response statusResponse = client().performRequest(new Request("GET", "/_async_search/status/" + asyncExecutionId));
40+
assertAsyncHeaders(statusResponse);
41+
42+
Response resultResponse = client().performRequest(new Request("GET", "/_async_search/" + asyncExecutionId));
43+
assertAsyncHeaders(resultResponse);
44+
}
45+
46+
private String assertAsyncHeaders(Response response) throws IOException {
47+
var json = entityAsMap(response);
48+
49+
var asyncExecutionId = (String) json.get("id");
50+
var isRunning = (boolean) json.get("is_running");
51+
52+
if (asyncExecutionId != null) {
53+
assertThat(response.getHeader("X-ElasticSearch-Async-Id"), equalTo(asyncExecutionId));
54+
}
55+
assertThat(response.getHeader("X-ElasticSearch-Async-Is-Running"), equalTo(isRunning ? "?1" : "?0"));
56+
57+
return asyncExecutionId;
58+
}
59+
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.util.BigArrays;
1818
import org.elasticsearch.common.util.concurrent.EsExecutors;
19+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1920
import org.elasticsearch.injection.guice.Inject;
2021
import org.elasticsearch.tasks.Task;
2122
import org.elasticsearch.threadpool.ThreadPool;
2223
import org.elasticsearch.transport.TransportService;
2324
import org.elasticsearch.xpack.core.XPackPlugin;
25+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
2426
import org.elasticsearch.xpack.core.async.AsyncResultsService;
2527
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
2628
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
@@ -32,6 +34,7 @@
3234
public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncResultRequest, AsyncSearchResponse> {
3335
private final AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> resultsService;
3436
private final TransportService transportService;
37+
private final ThreadContext threadContext;
3538

3639
@Inject
3740
public TransportGetAsyncSearchAction(
@@ -45,6 +48,7 @@ public TransportGetAsyncSearchAction(
4548
) {
4649
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
4750
this.transportService = transportService;
51+
this.threadContext = threadPool.getThreadContext();
4852
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool, bigArrays);
4953
}
5054

@@ -78,15 +82,23 @@ static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> createResultsSe
7882

7983
@Override
8084
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<AsyncSearchResponse> listener) {
85+
ActionListener<AsyncSearchResponse> listenerWithHeaders = listener.map(response -> {
86+
threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, response.isRunning() ? "?1" : "?0");
87+
if (response.getId() != null) {
88+
threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, response.getId());
89+
}
90+
return response;
91+
});
92+
8193
DiscoveryNode node = resultsService.getNode(request.getId());
8294
if (node == null || resultsService.isLocalNode(node)) {
83-
resultsService.retrieveResult(request, listener);
95+
resultsService.retrieveResult(request, listenerWithHeaders);
8496
} else {
8597
transportService.sendRequest(
8698
node,
8799
GetAsyncSearchAction.NAME,
88100
request,
89-
new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
101+
new ActionListenerResponseHandler<>(listenerWithHeaders, AsyncSearchResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
90102
);
91103
}
92104
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1919
import org.elasticsearch.common.util.BigArrays;
2020
import org.elasticsearch.common.util.concurrent.EsExecutors;
21+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2122
import org.elasticsearch.injection.guice.Inject;
2223
import org.elasticsearch.rest.RestStatus;
2324
import org.elasticsearch.tasks.Task;
@@ -40,6 +41,7 @@
4041
public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsyncStatusRequest, AsyncStatusResponse> {
4142
private final TransportService transportService;
4243
private final ClusterService clusterService;
44+
private final ThreadContext threadContext;
4345
private final AsyncTaskIndexService<AsyncSearchResponse> store;
4446

4547
@Inject
@@ -55,6 +57,7 @@ public TransportGetAsyncStatusAction(
5557
super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
5658
this.transportService = transportService;
5759
this.clusterService = clusterService;
60+
this.threadContext = threadPool.getThreadContext();
5861
this.store = new AsyncTaskIndexService<>(
5962
XPackPlugin.ASYNC_RESULTS_INDEX,
6063
clusterService,
@@ -73,6 +76,12 @@ protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListene
7376
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
7477
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
7578

79+
ActionListener<AsyncStatusResponse> listenerWithHeaders = listener.map(response -> {
80+
threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, response.isRunning() ? "?1" : "?0");
81+
threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, response.getId());
82+
return response;
83+
});
84+
7685
if (node == null || Objects.equals(node, localNode)) {
7786
if (request.getKeepAlive() != null && request.getKeepAlive().getMillis() > 0) {
7887
long expirationTime = System.currentTimeMillis() + request.getKeepAlive().getMillis();
@@ -87,17 +96,17 @@ protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListene
8796
AsyncSearchTask.class,
8897
AsyncSearchTask::getStatusResponse,
8998
AsyncStatusResponse::getStatusFromStoredSearch,
90-
listener
99+
listenerWithHeaders
91100
);
92101
}, exc -> {
93102
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
94103
if (status != RestStatus.NOT_FOUND) {
95104
logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), exc);
96-
listener.onFailure(exc);
105+
listenerWithHeaders.onFailure(exc);
97106
} else {
98107
// the async search document or its index is not found.
99108
// That can happen if an invalid/deleted search id is provided.
100-
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
109+
listenerWithHeaders.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
101110
}
102111
}));
103112
} else {
@@ -107,15 +116,15 @@ protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListene
107116
AsyncSearchTask.class,
108117
AsyncSearchTask::getStatusResponse,
109118
AsyncStatusResponse::getStatusFromStoredSearch,
110-
listener
119+
listenerWithHeaders
111120
);
112121
}
113122
} else {
114123
transportService.sendRequest(
115124
node,
116125
GetAsyncStatusAction.NAME,
117126
request,
118-
new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
127+
new ActionListenerResponseHandler<>(listenerWithHeaders, AsyncStatusResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
119128
);
120129
}
121130
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ protected void doExecute(Task submitTask, SubmitAsyncSearchRequest request, Acti
9292
searchRequest
9393
);
9494
searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener());
95+
96+
ActionListener<AsyncSearchResponse> submitListenerWithHeaders = submitListener.map(response -> {
97+
threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, response.isRunning() ? "?1" : "?0");
98+
if (response.getId() != null) {
99+
threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, response.getId());
100+
}
101+
return response;
102+
});
95103
searchTask.addCompletionListener(new ActionListener<>() {
96104
@Override
97105
public void onResponse(AsyncSearchResponse searchResponse) {
@@ -119,14 +127,14 @@ public void onResponse(DocWriteResponse r) {
119127
finalResponse -> onFinalResponse(searchTask, finalResponse, () -> {})
120128
);
121129
} finally {
122-
submitListener.onResponse(searchResponse);
130+
submitListenerWithHeaders.onResponse(searchResponse);
123131
}
124132
} else {
125133
searchResponse.mustIncRef();
126134
onFinalResponse(
127135
searchTask,
128136
searchResponse,
129-
() -> ActionListener.respondAndRelease(submitListener, searchResponse)
137+
() -> ActionListener.respondAndRelease(submitListenerWithHeaders, searchResponse)
130138
);
131139
}
132140
}
@@ -138,7 +146,7 @@ public void onFailure(Exception exc) {
138146
exc,
139147
searchResponse.isRunning(),
140148
"fatal failure: unable to store initial response",
141-
submitListener
149+
submitListenerWithHeaders
142150
);
143151
}
144152
}, searchResponse::decRef)
@@ -147,14 +155,20 @@ public void onFailure(Exception exc) {
147155
initialResp.decRef();
148156
}
149157
} catch (Exception exc) {
150-
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "fatal failure: generic error", submitListener);
158+
onFatalFailure(
159+
searchTask,
160+
exc,
161+
searchResponse.isRunning(),
162+
"fatal failure: generic error",
163+
submitListenerWithHeaders
164+
);
151165
}
152166
} else {
153167
try (searchTask) {
154168
// the task completed within the timeout so the response is sent back to the user
155169
// with a null id since nothing was stored on the cluster.
156170
taskManager.unregister(searchTask);
157-
ActionListener.respondAndRelease(submitListener, searchResponse.clone(null));
171+
ActionListener.respondAndRelease(submitListenerWithHeaders, searchResponse.clone(null));
158172
}
159173
}
160174
}
@@ -163,7 +177,7 @@ public void onFailure(Exception exc) {
163177
public void onFailure(Exception exc) {
164178
// this will only ever be called if there is an issue scheduling the thread that executes
165179
// the completion listener once the wait for completion timeout expires.
166-
onFatalFailure(searchTask, exc, true, "fatal failure: addCompletionListener", submitListener);
180+
onFatalFailure(searchTask, exc, true, "fatal failure: addCompletionListener", submitListenerWithHeaders);
167181
}
168182
}, request.getWaitForCompletionTimeout());
169183
}

0 commit comments

Comments
 (0)