Skip to content

Commit 70aea84

Browse files
Fix OpenPIT with older versions
1 parent a7d26f8 commit 70aea84

File tree

2 files changed

+63
-13
lines changed

2 files changed

+63
-13
lines changed

x-pack/plugin/eql/qa/mixed-node/src/javaRestTest/java/org/elasticsearch/xpack/eql/qa/mixed_node/EqlSearchIT.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,19 @@ public void cleanUpIndex() throws IOException {
7777
}
7878

7979
public void testEventsWithRequestToOldNodes() throws Exception {
80-
assertEventsQueryOnNodes(bwcNodes);
80+
assertEventsQueryOnNodes(bwcNodes, true);
8181
}
8282

8383
public void testEventsWithRequestToUpgradedNodes() throws Exception {
84-
assertEventsQueryOnNodes(newNodes);
84+
assertEventsQueryOnNodes(newNodes, false);
8585
}
8686

8787
public void testSequencesWithRequestToOldNodes() throws Exception {
88-
assertSequncesQueryOnNodes(bwcNodes);
88+
assertSequncesQueryOnNodes(bwcNodes, true);
8989
}
9090

9191
public void testSequencesWithRequestToUpgradedNodes() throws Exception {
92-
assertSequncesQueryOnNodes(newNodes);
92+
assertSequncesQueryOnNodes(newNodes, false);
9393
}
9494

9595
/**
@@ -268,7 +268,7 @@ public void testMultiValueFields() throws Exception {
268268
assertTrue(testedFunctions.containsAll(availableFunctions));
269269
}
270270

271-
private void assertEventsQueryOnNodes(List<TestNode> nodesList) throws Exception {
271+
private void assertEventsQueryOnNodes(List<TestNode> nodesList, boolean oldNodes) throws Exception {
272272
final String event = randomEvent();
273273
Map<String, Object> expectedResponse = prepareEventsTestData(event);
274274
try (
@@ -278,12 +278,25 @@ private void assertEventsQueryOnNodes(List<TestNode> nodesList) throws Exception
278278
String filterPath = "filter_path=hits.events._source.@timestamp,hits.events._source.event_type,hits.events._source.sequence";
279279

280280
Request request = new Request("POST", index + "/_eql/search?" + filterPath);
281-
request.setJsonEntity("{\"query\":\"" + event + " where true\",\"size\":15}");
282-
assertBusy(() -> { assertResponse(expectedResponse, runEql(client, request)); });
281+
StringBuilder payload = new StringBuilder("{\"query\":\"" + event + " where true\",\"size\":15");
282+
// Old versions don't support this option
283+
if (oldNodes == false) {
284+
if (randomBoolean()) {
285+
payload.append(", \"allow_partial_search_results\": " + randomBoolean());
286+
}
287+
if (randomBoolean()) {
288+
payload.append(", \"allow_partial_sequence_results\": " + randomBoolean());
289+
}
290+
}
291+
payload.append("}");
292+
request.setJsonEntity(payload.toString());
293+
assertBusy(() -> {
294+
assertResponse(expectedResponse, runEql(client, request));
295+
});
283296
}
284297
}
285298

286-
private void assertSequncesQueryOnNodes(List<TestNode> nodesList) throws Exception {
299+
private void assertSequncesQueryOnNodes(List<TestNode> nodesList, boolean oldNodes) throws Exception {
287300
Map<String, Object> expectedResponse = prepareSequencesTestData();
288301
try (
289302
RestClient client = buildClient(restClientSettings(), nodesList.stream().map(TestNode::publishAddress).toArray(HttpHost[]::new))
@@ -294,7 +307,19 @@ private void assertSequncesQueryOnNodes(List<TestNode> nodesList) throws Excepti
294307
String filter = "{\"range\":{\"@timestamp\":{\"gte\":\"1970-05-01\"}}}";
295308

296309
Request request = new Request("POST", index + "/_eql/search?" + filterPath);
297-
request.setJsonEntity("{\"query\":\"" + query + "\",\"filter\":" + filter + "}");
310+
311+
StringBuilder payload = new StringBuilder("{\"query\":\"" + query + "\",\"filter\":" + filter);
312+
// Old versions don't support this option
313+
if (oldNodes == false) {
314+
if (randomBoolean()) {
315+
payload.append(", \"allow_partial_search_results\": " + randomBoolean());
316+
}
317+
if (randomBoolean()) {
318+
payload.append(", \"allow_partial_sequence_results\": " + randomBoolean());
319+
}
320+
}
321+
payload.append("}");
322+
request.setJsonEntity(payload.toString());
298323
assertBusy(() -> { assertResponse(expectedResponse, runEql(client, request)); });
299324
}
300325
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77

88
package org.elasticsearch.xpack.eql.execution.search;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.search.ClosePointInTimeRequest;
1213
import org.elasticsearch.action.search.ClosePointInTimeResponse;
1314
import org.elasticsearch.action.search.MultiSearchRequest;
1415
import org.elasticsearch.action.search.MultiSearchResponse;
1516
import org.elasticsearch.action.search.OpenPointInTimeRequest;
17+
import org.elasticsearch.action.search.OpenPointInTimeResponse;
1618
import org.elasticsearch.action.search.SearchRequest;
1719
import org.elasticsearch.action.search.SearchResponse;
1820
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
@@ -140,10 +142,33 @@ private <Response> void openPIT(ActionListener<Response> listener, Runnable runn
140142
.keepAlive(keepAlive)
141143
.allowPartialSearchResults(allowPartialSearchResults);
142144
request.indexFilter(filter);
143-
client.execute(TransportOpenPointInTimeAction.TYPE, request, listener.delegateFailureAndWrap((l, r) -> {
144-
pitId = r.getPointInTimeId();
145-
runnable.run();
146-
}));
145+
146+
client.execute(TransportOpenPointInTimeAction.TYPE, request, new ActionListener<>() {
147+
@Override
148+
public void onResponse(OpenPointInTimeResponse r) {
149+
try {
150+
pitId = r.getPointInTimeId();
151+
runnable.run();
152+
} catch (Exception e) {
153+
onFailure(e);
154+
}
155+
}
156+
157+
@Override
158+
public void onFailure(Exception e) {
159+
if (allowPartialSearchResults
160+
&& e instanceof ElasticsearchStatusException
161+
&& e.getMessage()
162+
.contains("The [allow_partial_search_results] parameter cannot be used while the cluster is still upgrading.")) {
163+
// This is for pre-8.16
164+
// We cannot use allow_partial_search_results during upgrades, so let's try without and hope to get no shard failures,
165+
// it's the best we can do, the query would fail anyway
166+
openPIT(listener, runnable, false);
167+
} else {
168+
listener.onFailure(e);
169+
}
170+
}
171+
});
147172
}
148173

149174
@Override

0 commit comments

Comments
 (0)