Skip to content

Commit 883f1fc

Browse files
pmpailiscbuescher
authored andcommitted
Adding support for allow_partial_search_results in PIT (elastic#111516)
1 parent 61d824f commit 883f1fc

File tree

24 files changed

+530
-93
lines changed

24 files changed

+530
-93
lines changed

docs/changelog/111516.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 111516
2+
summary: Adding support for `allow_partial_search_results` in PIT
3+
area: Search
4+
type: enhancement
5+
issues: []

docs/reference/search/point-in-time-api.asciidoc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,44 @@ IMPORTANT: The open point in time request and each subsequent search request can
7878
return different `id`; thus always use the most recently received `id` for the
7979
next search request.
8080

81+
In addition to the `keep_alive` parameter, the `allow_partial_search_results` parameter
82+
can also be defined.
83+
This parameter determines whether the <<point-in-time-api, point in time (PIT)>>
84+
should tolerate unavailable shards or <<shard-failures, shard failures>> when
85+
initially creating the PIT.
86+
If set to true, the PIT will be created with the available shards, along with a
87+
reference to any missing ones.
88+
If set to false, the operation will fail if any shard is unavailable.
89+
The default value is false.
90+
91+
The PIT response includes a summary of the total number of shards, as well as the number
92+
of successful shards when creating the PIT.
93+
94+
[source,console]
95+
--------------------------------------------------
96+
POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true
97+
--------------------------------------------------
98+
// TEST[setup:my_index]
99+
100+
[source,js]
101+
--------------------------------------------------
102+
{
103+
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=",
104+
"_shards": {
105+
"total": 10,
106+
"successful": 10,
107+
"skipped": 0,
108+
"failed": 0
109+
}
110+
}
111+
--------------------------------------------------
112+
// NOTCONSOLE
113+
114+
When a PIT that contains shard failures is used in a search request, the missing are
115+
always reported in the search response as a NoShardAvailableActionException exception.
116+
To get rid of these exceptions, a new PIT needs to be created so that shards missing
117+
from the previous PIT can be handled, assuming they become available in the meantime.
118+
81119
[[point-in-time-keep-alive]]
82120
==== Keeping point in time alive
83121
The `keep_alive` parameter, which is passed to a open point in time request and

docs/reference/search/search-your-data/paginate-search-results.asciidoc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ The search response includes an array of `sort` values for each hit:
106106
"_id" : "654322",
107107
"_score" : null,
108108
"_source" : ...,
109-
"sort" : [
109+
"sort" : [
110110
1463538855,
111-
"654322"
111+
"654322"
112112
]
113113
},
114114
{
@@ -118,7 +118,7 @@ The search response includes an array of `sort` values for each hit:
118118
"_source" : ...,
119119
"sort" : [ <1>
120120
1463538857,
121-
"654323"
121+
"654323"
122122
]
123123
}
124124
]
@@ -150,7 +150,7 @@ GET twitter/_search
150150
--------------------------------------------------
151151
//TEST[continued]
152152

153-
Repeat this process by updating the `search_after` array every time you retrieve a
153+
Repeat this process by updating the `search_after` array every time you retrieve a
154154
new page of results. If a <<near-real-time,refresh>> occurs between these requests,
155155
the order of your results may change, causing inconsistent results across pages. To
156156
prevent this, you can create a <<point-in-time-api,point in time (PIT)>> to
@@ -167,10 +167,12 @@ The API returns a PIT ID.
167167
[source,console-result]
168168
----
169169
{
170-
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
170+
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==",
171+
"_shards": ...
171172
}
172173
----
173174
// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
175+
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": "$body._shards"/]
174176

175177
To get the first page of results, submit a search request with a `sort`
176178
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit

server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java

Lines changed: 193 additions & 12 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ static TransportVersion def(int id) {
195195
public static final TransportVersion ESQL_PROFILE_SLEEPS = def(8_725_00_0);
196196
public static final TransportVersion ZDT_NANOS_SUPPORT = def(8_726_00_0);
197197
public static final TransportVersion LTR_SERVERLESS_RELEASE = def(8_727_00_0);
198+
public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_728_00_0);
198199
/*
199200
* STOP! READ THIS FIRST! No, really,
200201
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
707707
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null;
708708
final BytesReference searchContextId;
709709
if (buildPointInTimeFromSearchResults()) {
710-
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion);
710+
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion, failures);
711711
} else {
712712
if (request.source() != null
713713
&& request.source().pointInTimeBuilder() != null

server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ public static void closeContexts(
166166
final var successes = new AtomicInteger();
167167
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
168168
for (SearchContextIdForNode contextId : contextIds) {
169+
if (contextId.getNode() == null) {
170+
// the shard was missing when creating the PIT, ignore.
171+
continue;
172+
}
169173
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
170174
if (node != null) {
171175
try {

server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic
4141

4242
private QueryBuilder indexFilter;
4343

44+
private boolean allowPartialSearchResults = false;
45+
4446
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS;
4547

4648
public OpenPointInTimeRequest(String... indices) {
@@ -60,6 +62,9 @@ public OpenPointInTimeRequest(StreamInput in) throws IOException {
6062
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
6163
this.indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class);
6264
}
65+
if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) {
66+
this.allowPartialSearchResults = in.readBoolean();
67+
}
6368
}
6469

6570
@Override
@@ -76,6 +81,11 @@ public void writeTo(StreamOutput out) throws IOException {
7681
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
7782
out.writeOptionalWriteable(indexFilter);
7883
}
84+
if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) {
85+
out.writeBoolean(allowPartialSearchResults);
86+
} else if (allowPartialSearchResults) {
87+
throw new IOException("[allow_partial_search_results] is not supported on nodes with version " + out.getTransportVersion());
88+
}
7989
}
8090

8191
@Override
@@ -180,6 +190,15 @@ public boolean includeDataStreams() {
180190
return true;
181191
}
182192

193+
public boolean allowPartialSearchResults() {
194+
return allowPartialSearchResults;
195+
}
196+
197+
public OpenPointInTimeRequest allowPartialSearchResults(boolean allowPartialSearchResults) {
198+
this.allowPartialSearchResults = allowPartialSearchResults;
199+
return this;
200+
}
201+
183202
@Override
184203
public String getDescription() {
185204
return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]";
@@ -200,6 +219,8 @@ public String toString() {
200219
+ ", preference='"
201220
+ preference
202221
+ '\''
222+
+ ", allowPartialSearchResults="
223+
+ allowPartialSearchResults
203224
+ '}';
204225
}
205226

@@ -218,12 +239,13 @@ public boolean equals(Object o) {
218239
&& indicesOptions.equals(that.indicesOptions)
219240
&& keepAlive.equals(that.keepAlive)
220241
&& Objects.equals(routing, that.routing)
221-
&& Objects.equals(preference, that.preference);
242+
&& Objects.equals(preference, that.preference)
243+
&& Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
222244
}
223245

224246
@Override
225247
public int hashCode() {
226-
int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference);
248+
int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference, allowPartialSearchResults);
227249
result = 31 * result + Arrays.hashCode(indices);
228250
return result;
229251
}

server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeResponse.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.search;
1010

11+
import org.elasticsearch.TransportVersions;
1112
import org.elasticsearch.action.ActionResponse;
1213
import org.elasticsearch.common.bytes.BytesReference;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -18,22 +19,46 @@
1819
import java.util.Base64;
1920
import java.util.Objects;
2021

22+
import static org.elasticsearch.rest.action.RestActions.buildBroadcastShardsHeader;
23+
2124
public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject {
2225
private final BytesReference pointInTimeId;
2326

24-
public OpenPointInTimeResponse(BytesReference pointInTimeId) {
27+
private final int totalShards;
28+
private final int successfulShards;
29+
private final int failedShards;
30+
private final int skippedShards;
31+
32+
public OpenPointInTimeResponse(
33+
BytesReference pointInTimeId,
34+
int totalShards,
35+
int successfulShards,
36+
int failedShards,
37+
int skippedShards
38+
) {
2539
this.pointInTimeId = Objects.requireNonNull(pointInTimeId, "Point in time parameter must be not null");
40+
this.totalShards = totalShards;
41+
this.successfulShards = successfulShards;
42+
this.failedShards = failedShards;
43+
this.skippedShards = skippedShards;
2644
}
2745

2846
@Override
2947
public void writeTo(StreamOutput out) throws IOException {
3048
out.writeBytesReference(pointInTimeId);
49+
if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) {
50+
out.writeVInt(totalShards);
51+
out.writeVInt(successfulShards);
52+
out.writeVInt(failedShards);
53+
out.writeVInt(skippedShards);
54+
}
3155
}
3256

3357
@Override
3458
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
3559
builder.startObject();
3660
builder.field("id", Base64.getUrlEncoder().encodeToString(BytesReference.toBytes(pointInTimeId)));
61+
buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, failedShards, skippedShards, null);
3762
builder.endObject();
3863
return builder;
3964
}
@@ -42,4 +67,19 @@ public BytesReference getPointInTimeId() {
4267
return pointInTimeId;
4368
}
4469

70+
public int getTotalShards() {
71+
return totalShards;
72+
}
73+
74+
public int getSuccessfulShards() {
75+
return successfulShards;
76+
}
77+
78+
public int getFailedShards() {
79+
return failedShards;
80+
}
81+
82+
public int getSkippedShards() {
83+
return skippedShards;
84+
}
4585
}

server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
4747
openRequest.routing(request.param("routing"));
4848
openRequest.preference(request.param("preference"));
4949
openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
50+
openRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", false));
5051
if (request.hasParam("max_concurrent_shard_requests")) {
5152
final int maxConcurrentShardRequests = request.paramAsInt(
5253
"max_concurrent_shard_requests",

0 commit comments

Comments
 (0)