Skip to content

Commit 1472b06

Browse files
authored
Merge branch 'main' into esql/ds/snapshot_to_feat_flag
2 parents 44a139f + a7e2068 commit 1472b06

File tree

38 files changed

+6932
-165
lines changed

38 files changed

+6932
-165
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.fetch;
11+
12+
import org.elasticsearch.action.ActionFuture;
13+
import org.elasticsearch.action.search.SearchResponse;
14+
import org.elasticsearch.action.search.TransportSearchAction;
15+
import org.elasticsearch.common.breaker.CircuitBreaker;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
18+
import org.elasticsearch.script.Script;
19+
import org.elasticsearch.script.ScriptType;
20+
import org.elasticsearch.search.SearchService;
21+
import org.elasticsearch.test.AbstractSearchCancellationTestCase;
22+
import org.elasticsearch.test.ESIntegTestCase;
23+
24+
import java.util.Collections;
25+
26+
import static org.elasticsearch.test.AbstractSearchCancellationTestCase.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME;
27+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
28+
29+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
30+
public class ChunkedFetchPhaseCancellationIT extends AbstractSearchCancellationTestCase {
31+
32+
@Override
33+
protected boolean enableConcurrentSearch() {
34+
return false;
35+
}
36+
37+
@Override
38+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
39+
return Settings.builder()
40+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
41+
.put("indices.breaker.request.type", "memory")
42+
.put("indices.breaker.request.limit", "100mb")
43+
.put(SearchService.FETCH_PHASE_CHUNKED_ENABLED.getKey(), true)
44+
.build();
45+
}
46+
47+
public void testTaskCancellationReleasesCoordinatorBreakerBytes() throws Exception {
48+
internalCluster().startNode();
49+
String coordinatorNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
50+
51+
createIndex("test", 2, 0);
52+
indexTestData();
53+
ensureGreen("test");
54+
55+
var plugins = initBlockFactory();
56+
long breakerBefore = getRequestBreakerUsed(coordinatorNode);
57+
58+
ActionFuture<SearchResponse> searchResponse = internalCluster().client(coordinatorNode)
59+
.prepareSearch("test")
60+
.addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap()))
61+
.setAllowPartialSearchResults(true)
62+
.execute();
63+
64+
awaitForBlock(plugins);
65+
cancelSearch(TransportSearchAction.TYPE.name());
66+
disableBlocks(plugins);
67+
ensureSearchWasCancelled(searchResponse);
68+
69+
assertBusy(
70+
() -> assertThat(
71+
"Coordinator breaker bytes should be released after cancellation",
72+
getRequestBreakerUsed(coordinatorNode),
73+
lessThanOrEqualTo(breakerBefore)
74+
)
75+
);
76+
}
77+
78+
private long getRequestBreakerUsed(String node) {
79+
CircuitBreakerService breakerService = internalCluster().getInstance(CircuitBreakerService.class, node);
80+
CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
81+
return breaker.getUsed();
82+
}
83+
}

0 commit comments

Comments
 (0)