Skip to content

Commit b9940e0

Browse files
Fix leak in DfsQueryPhase and introduce search disconnect stress test (#116060) (#117384)
Fixing an obvious leak and finally adding a stress test for search disconnects.
1 parent 20e02fa commit b9940e0

File tree

4 files changed

+112
-3
lines changed

4 files changed

+112
-3
lines changed

docs/changelog/116060.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 116060
2+
summary: Fix leak in `DfsQueryPhase` and introduce search disconnect stress test
3+
area: Search
4+
type: bug
5+
issues:
6+
- 115056
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.search.basic;
10+
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
13+
import org.elasticsearch.action.search.SearchRequestBuilder;
14+
import org.elasticsearch.action.search.SearchResponse;
15+
import org.elasticsearch.action.support.PlainActionFuture;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.discovery.AbstractDisruptionTestCase;
18+
import org.elasticsearch.index.IndexModule;
19+
import org.elasticsearch.index.IndexSettings;
20+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
21+
import org.elasticsearch.test.disruption.NetworkDisruption;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.stream.IntStream;
28+
29+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
30+
31+
public class SearchWithRandomDisconnectsIT extends AbstractDisruptionTestCase {
32+
33+
public void testSearchWithRandomDisconnects() throws InterruptedException, ExecutionException {
34+
// make sure we have a couple data nodes
35+
int minDataNodes = randomIntBetween(3, 7);
36+
internalCluster().ensureAtLeastNumDataNodes(minDataNodes);
37+
final int indexCount = randomIntBetween(minDataNodes, 10 * minDataNodes);
38+
final String[] indexNames = IntStream.range(0, indexCount).mapToObj(i -> "test-" + i).toArray(String[]::new);
39+
final Settings indexSettings = indexSettings(1, 0).put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
40+
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false)
41+
.build();
42+
for (String indexName : indexNames) {
43+
createIndex(indexName, indexSettings);
44+
}
45+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
46+
for (String indexName : indexNames) {
47+
for (int i = 0; i < randomIntBetween(1, 10); i++) {
48+
bulkRequestBuilder = bulkRequestBuilder.add(prepareIndex(indexName).setCreate(false).setSource("foo", "bar-" + i));
49+
}
50+
}
51+
assertFalse(bulkRequestBuilder.get().hasFailures());
52+
final AtomicBoolean done = new AtomicBoolean();
53+
final int concurrentSearches = randomIntBetween(2, 5);
54+
final List<PlainActionFuture<Void>> futures = new ArrayList<>(concurrentSearches);
55+
for (int i = 0; i < concurrentSearches; i++) {
56+
final PlainActionFuture<Void> finishFuture = new PlainActionFuture<>();
57+
futures.add(finishFuture);
58+
prepareRandomSearch().execute(new ActionListener<>() {
59+
@Override
60+
public void onResponse(SearchResponse searchResponse) {
61+
runMoreSearches();
62+
}
63+
64+
@Override
65+
public void onFailure(Exception e) {
66+
runMoreSearches();
67+
}
68+
69+
private void runMoreSearches() {
70+
if (done.get() == false) {
71+
prepareRandomSearch().execute(this);
72+
} else {
73+
finishFuture.onResponse(null);
74+
}
75+
}
76+
});
77+
}
78+
for (int i = 0, n = randomIntBetween(50, 100); i < n; i++) {
79+
NetworkDisruption networkDisruption = new NetworkDisruption(
80+
isolateNode(internalCluster().getRandomNodeName()),
81+
NetworkDisruption.DISCONNECT
82+
);
83+
setDisruptionScheme(networkDisruption);
84+
networkDisruption.startDisrupting();
85+
networkDisruption.stopDisrupting();
86+
internalCluster().clearDisruptionScheme();
87+
ensureFullyConnectedCluster();
88+
}
89+
done.set(true);
90+
for (PlainActionFuture<Void> future : futures) {
91+
future.get();
92+
}
93+
ensureGreen(DISRUPTION_HEALING_OVERHEAD, indexNames);
94+
assertAcked(indicesAdmin().prepareDelete(indexNames));
95+
}
96+
97+
private static SearchRequestBuilder prepareRandomSearch() {
98+
return prepareSearch("*").setQuery(new MatchAllQueryBuilder())
99+
.setSize(9999)
100+
.setFetchSource(true)
101+
.setAllowPartialSearchResults(randomBoolean());
102+
}
103+
}

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void run() {
9696
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
9797
} catch (Exception e) {
9898
shardFailure(e, querySearchRequest, shardIndex, shardTarget, counter);
99-
return;
99+
continue;
100100
}
101101
searchTransportService.sendExecuteQuery(
102102
connection,

server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
5050

51-
static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
51+
public static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
5252

5353
@Override
5454
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
@@ -220,7 +220,7 @@ NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
220220
return partition;
221221
}
222222

223-
TwoPartitions isolateNode(String isolatedNode) {
223+
protected TwoPartitions isolateNode(String isolatedNode) {
224224
Set<String> side1 = new HashSet<>();
225225
Set<String> side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
226226
side1.add(isolatedNode);

0 commit comments

Comments
 (0)