Skip to content

Commit d5d9d3a

Browse files
[ES|QL] Limit the number of concurrent subquery and fork branches at execution time (#144687)
* limit the number of concurrent subquery and fork branches at execution time.
1 parent fab219c commit d5d9d3a

File tree

6 files changed

+1290
-136
lines changed

6 files changed

+1290
-136
lines changed

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackSubqueryIT.java

Lines changed: 109 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,22 @@ public class HeapAttackSubqueryIT extends HeapAttackTestCase {
4242

4343
private static final int MAX_SUBQUERIES_SERVERLESS = 5;
4444

45-
private static final int MAX_STRING_FIELDS = 1000;
45+
private static final int STRING_FIELDS_1K = 1000;
4646

47-
private static final int MAX_STRING_FIELD_SERVERLESS = 700;
47+
private static final int STRING_FIELD_700 = 700;
4848

49-
private static final int MAX_DOC = 200;
50-
51-
private static final int MAX_DOC_SERVERLESS = 100;
49+
private static final int MAX_DOC = 100;
5250

5351
@Before
5452
public void checkCapability() {
5553
assumeTrue("Subquery is behind snapshot", Build.current().isSnapshot());
5654
}
5755

5856
public void testManyKeywordFieldsWith10UniqueValuesInSubqueryIntermediateResults() throws IOException {
59-
heapAttackIT.initManyBigFieldsIndex(MAX_DOC, "keyword", false, MAX_STRING_FIELDS);
57+
heapAttackIT.initManyBigFieldsIndex(MAX_DOC, "keyword", false, STRING_FIELDS_1K);
6058
for (int subquery : List.of(MIN_SUBQUERIES, MAX_SUBQUERIES)) {
6159
ListMatcher columns = matchesList();
62-
int fieldsToRead = subquery < MAX_SUBQUERIES ? 1000 : 600; // with 1000 fields we circuit break
60+
int fieldsToRead = subquery < MAX_SUBQUERIES ? STRING_FIELDS_1K : STRING_FIELD_700; // with 1000 fields we circuit break
6361
StringBuilder query = new StringBuilder("manybigfields | KEEP ");
6462
for (int f = 0; f < fieldsToRead; f++) {
6563
String fieldName = "f" + String.format(Locale.ROOT, "%03d", f);
@@ -76,54 +74,51 @@ public void testManyKeywordFieldsWith10UniqueValuesInSubqueryIntermediateResults
7674

7775
public void testManyRandomKeywordFieldsInSubqueryIntermediateResults() throws IOException {
7876
int docs = docs();
79-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, fields());
80-
assertCircuitBreaks(attempt -> buildSubqueries(maxSubqueries(), "manybigfields"));
77+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, STRING_FIELD_700);
78+
ListMatcher columns = matchesList();
79+
for (int f = 0; f < STRING_FIELD_700; f++) {
80+
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword"));
81+
}
82+
try {
83+
Map<?, ?> response = buildSubqueries(subqueries(false), "manybigfields");
84+
assertMap(response, matchesMap().entry("columns", columns));
85+
} catch (ResponseException e) {
86+
verifyCircuitBreakingException(e);
87+
}
8188
}
8289

8390
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortOneField() throws IOException {
8491
int docs = docs();
85-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, fields());
86-
int subqueries = isServerless() ? MIN_SUBQUERIES : MAX_SUBQUERIES;
92+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, STRING_FIELD_700);
8793
ListMatcher columns = matchesList();
88-
for (int f = 0; f < fields(); f++) {
94+
for (int f = 0; f < STRING_FIELD_700; f++) {
8995
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword"));
9096
}
91-
// serverless behaves differently from stateful
9297
try {
93-
Map<?, ?> response = buildSubqueriesWithSort(subqueries, "manybigfields", "f000");
98+
Map<?, ?> response = buildSubqueriesWithSort(subqueries(true), "manybigfields", "f000");
9499
assertMap(response, matchesMap().entry("columns", columns));
95100
} catch (ResponseException e) {
96-
Map<?, ?> map = responseAsMap(e.getResponse());
97-
assertMap(
98-
map,
99-
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
100-
);
101+
verifyCircuitBreakingException(e);
101102
}
102103
}
103104

104105
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortManyFields() throws IOException {
105106
int docs = docs();
106-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, fields());
107+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, STRING_FIELD_700);
107108
StringBuilder sortKeys = new StringBuilder();
108109
sortKeys.append("f000");
109110
for (int f = 1; f < 11; f++) {
110111
sortKeys.append(", f").append(String.format(Locale.ROOT, "%03d", f));
111112
}
112-
int subqueries = isServerless() ? MIN_SUBQUERIES : MAX_SUBQUERIES;
113113
ListMatcher columns = matchesList();
114-
for (int f = 0; f < fields(); f++) {
114+
for (int f = 0; f < STRING_FIELD_700; f++) {
115115
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword"));
116116
}
117-
// serverless behaves differently from stateful
118117
try {
119-
Map<?, ?> response = buildSubqueriesWithSort(subqueries, "manybigfields", sortKeys.toString());
118+
Map<?, ?> response = buildSubqueriesWithSort(subqueries(true), "manybigfields", sortKeys.toString());
120119
assertMap(response, matchesMap().entry("columns", columns));
121120
} catch (ResponseException e) {
122-
Map<?, ?> map = responseAsMap(e.getResponse());
123-
assertMap(
124-
map,
125-
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
126-
);
121+
verifyCircuitBreakingException(e);
127122
}
128123
}
129124

@@ -146,40 +141,63 @@ public void testManyRandomNumericFieldsInSubqueryIntermediateResultsWithSortMany
146141
Map<?, ?> response = buildSubqueriesWithSort(MAX_SUBQUERIES, "manybigfields", sortKeys.toString());
147142
assertMap(response, matchesMap().entry("columns", columns));
148143
} catch (ResponseException e) {
149-
Map<?, ?> map = responseAsMap(e.getResponse());
150-
assertMap(
151-
map,
152-
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
153-
);
144+
verifyCircuitBreakingException(e);
154145
}
155146
}
156147

157148
public void testManyRandomTextFieldsInSubqueryIntermediateResults() throws IOException {
158149
int docs = docs();
159-
heapAttackIT.initManyBigFieldsIndex(docs, "text", true, fields());
160-
assertCircuitBreaks(attempt -> buildSubqueries(maxSubqueries(), "manybigfields"));
150+
heapAttackIT.initManyBigFieldsIndex(docs, "text", true, STRING_FIELD_700);
151+
ListMatcher columns = matchesList();
152+
for (int f = 0; f < STRING_FIELD_700; f++) {
153+
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "text"));
154+
}
155+
try {
156+
Map<?, ?> response = buildSubqueries(subqueries(false), "manybigfields");
157+
assertMap(response, matchesMap().entry("columns", columns));
158+
} catch (ResponseException e) {
159+
verifyCircuitBreakingException(e);
160+
}
161161
}
162162

163163
public void testManyRandomTextFieldsInSubqueryIntermediateResultsWithSortOneField() throws IOException {
164164
int docs = docs();
165-
heapAttackIT.initManyBigFieldsIndex(docs, "text", true, fields());
166-
assertCircuitBreaks(attempt -> buildSubqueriesWithSort(maxSubqueries(), "manybigfields", " substring(f000, 5) "));
165+
heapAttackIT.initManyBigFieldsIndex(docs, "text", true, STRING_FIELD_700);
166+
ListMatcher columns = matchesList();
167+
for (int f = 0; f < STRING_FIELD_700; f++) {
168+
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "text"));
169+
}
170+
try {
171+
Map<?, ?> response = buildSubqueriesWithSort(subqueries(true), "manybigfields", " substring(f000, 5) ");
172+
assertMap(response, matchesMap().entry("columns", columns));
173+
} catch (ResponseException e) {
174+
verifyCircuitBreakingException(e);
175+
}
167176
}
168177

169178
public void testManyRandomTextFieldsInSubqueryIntermediateResultsWithSortManyFields() throws IOException {
170179
int docs = docs();
171-
heapAttackIT.initManyBigFieldsIndex(docs, "text", true, fields());
180+
heapAttackIT.initManyBigFieldsIndex(docs, "text", true, STRING_FIELD_700);
172181
StringBuilder sortKeys = new StringBuilder();
173182
sortKeys.append(" substring(f000, 5) ");
174183
for (int f = 1; f < 5; f++) {
175184
sortKeys.append(", substring(f").append(String.format(Locale.ROOT, "%03d", f)).append(", 5) ");
176185
}
177-
assertCircuitBreaks(attempt -> buildSubqueriesWithSort(maxSubqueries(), "manybigfields", sortKeys.toString()));
186+
ListMatcher columns = matchesList();
187+
for (int f = 0; f < STRING_FIELD_700; f++) {
188+
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "text"));
189+
}
190+
try {
191+
Map<?, ?> response = buildSubqueriesWithSort(subqueries(true), "manybigfields", sortKeys.toString());
192+
assertMap(response, matchesMap().entry("columns", columns));
193+
} catch (ResponseException e) {
194+
verifyCircuitBreakingException(e);
195+
}
178196
}
179197

180198
public void testManyKeywordFieldsWith10UniqueValuesInSubqueryIntermediateResultsWithAggNoGrouping() throws IOException {
181199
int docs = docs();
182-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", false, MAX_STRING_FIELDS);
200+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", false, STRING_FIELDS_1K);
183201
ListMatcher columns = matchesList().item(matchesMap().entry("name", "sum").entry("type", "long"));
184202
Map<?, ?> response = buildSubqueriesWithAgg(MAX_SUBQUERIES, "manybigfields", "sum = SUM(LENGTH(f999))", null);
185203
ListMatcher values = matchesList();
@@ -191,7 +209,7 @@ public void testManyKeywordFieldsWith10UniqueValuesInSubqueryIntermediateResults
191209

192210
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithAggNoGrouping() throws IOException {
193211
int docs = docs();
194-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, MAX_STRING_FIELDS);
212+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, STRING_FIELDS_1K);
195213
ListMatcher columns = matchesList().item(matchesMap().entry("name", "sum").entry("type", "long"));
196214
Map<?, ?> response = buildSubqueriesWithAgg(MAX_SUBQUERIES, "manybigfields", "sum = SUM(LENGTH(f999))", null);
197215
ListMatcher values = matchesList();
@@ -203,7 +221,7 @@ public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithAggNoGro
203221

204222
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithAggWithGBYOneField() throws IOException {
205223
int docs = docs();
206-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, MAX_STRING_FIELDS);
224+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, STRING_FIELDS_1K);
207225
var columns = List.of(Map.of("name", "sum", "type", "long"), Map.of("name", "f000", "type", "keyword"));
208226
Map<?, ?> response = buildSubqueriesWithAgg(MAX_SUBQUERIES, "manybigfields", "sum = SUM(LENGTH(f999))", "f000");
209227
var values = response.get("values");
@@ -213,35 +231,41 @@ public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithAggWithG
213231

214232
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithAggGBYManyFields() throws IOException {
215233
int docs = docs();
216-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, fields());
234+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true, STRING_FIELD_700);
217235
StringBuilder grouping = new StringBuilder();
218236
grouping.append("f000");
219237
int groupBySize = 100;
220238
for (int f = 1; f < groupBySize; f++) {
221239
grouping.append(", f").append(String.format(Locale.ROOT, "%03d", f));
222240
}
223241
try {
224-
Map<?, ?> response = buildSubqueriesWithAgg(maxSubqueries(), "manybigfields", "c = COUNT_DISTINCT(f499)", grouping.toString());
242+
Map<?, ?> response = buildSubqueriesWithAgg(
243+
subqueries(false),
244+
"manybigfields",
245+
"c = COUNT_DISTINCT(f499)",
246+
grouping.toString()
247+
);
225248
assertTrue(response.get("columns") instanceof List<?> l && l.size() == (groupBySize + 1));
226249
} catch (ResponseException e) {
227-
Map<?, ?> map = responseAsMap(e.getResponse());
228-
assertMap(
229-
map,
230-
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
231-
);
250+
verifyCircuitBreakingException(e);
232251
}
233252
}
234253

235254
public void testGiantTextFieldInSubqueryIntermediateResults() throws IOException {
236255
int docs = 20;
237256
heapAttackIT.initGiantTextField(docs, false, 5);
238-
assertCircuitBreaks(attempt -> buildSubqueries(maxSubqueries(), "bigtext"));
257+
assertCircuitBreaks(attempt -> buildSubqueries(subqueries(false), "bigtext"));
239258
}
240259

241260
public void testGiantTextFieldInSubqueryIntermediateResultsWithSort() throws IOException {
242-
int docs = 20;
261+
int docs = 10;
243262
heapAttackIT.initGiantTextField(docs, false, 5);
244-
assertCircuitBreaks(attempt -> buildSubqueriesWithSort(maxSubqueries(), "bigtext", " substring(f, 5) "));
263+
try {
264+
Map<?, ?> response = buildSubqueriesWithSort(subqueries(true), "bigtext", " substring(f, 5) ");
265+
assertTrue(response.get("columns") instanceof List<?> l && l.size() == 1);
266+
} catch (ResponseException e) {
267+
verifyCircuitBreakingException(e);
268+
}
245269
}
246270

247271
public void testGiantTextFieldInSubqueryIntermediateResultsWithAggNoGrouping() throws IOException {
@@ -258,19 +282,19 @@ public void testGiantTextFieldInSubqueryIntermediateResultsWithAggNoGrouping() t
258282

259283
public void testLoadDocSequenceReturnsCorrectResultsKeyword() throws IOException {
260284
int docs = 20;
261-
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", false, MAX_STRING_FIELDS);
285+
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", false, STRING_FIELDS_1K);
262286
verifyLoadDocSequenceResults(docs, "keyword");
263287
}
264288

265289
public void testLoadDocSequenceReturnsCorrectResultsText() throws IOException {
266290
int docs = 20;
267-
heapAttackIT.initManyBigFieldsIndex(docs, "text", false, MAX_STRING_FIELDS);
291+
heapAttackIT.initManyBigFieldsIndex(docs, "text", false, STRING_FIELDS_1K);
268292
verifyLoadDocSequenceResults(docs, "text");
269293
}
270294

271295
private void verifyLoadDocSequenceResults(int docs, String dataType) throws IOException {
272296
List<Map<String, String>> columns = new ArrayList<>();
273-
for (int f = 0; f < MAX_STRING_FIELDS; f++) {
297+
for (int f = 0; f < STRING_FIELDS_1K; f++) {
274298
Map<String, String> column = Map.of("name", "f" + String.format(Locale.ROOT, "%03d", f), "type", dataType);
275299
columns.add(column);
276300
}
@@ -291,11 +315,7 @@ private void verifyLoadDocSequenceResults(int docs, String dataType) throws IOEx
291315
}
292316
}
293317
} catch (ResponseException e) {
294-
Map<?, ?> map = responseAsMap(e.getResponse());
295-
assertMap(
296-
map,
297-
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
298-
);
318+
verifyCircuitBreakingException(e);
299319
}
300320
}
301321

@@ -306,7 +326,7 @@ private Map<String, Object> buildSubqueries(int subqueries, String indexName) th
306326
for (int i = 1; i < subqueries; i++) {
307327
query.append(", ").append(subquery);
308328
}
309-
query.append(" \"}");
329+
query.append(endQuery());
310330
return responseAsMap(query(query.toString(), "columns"));
311331
}
312332

@@ -323,7 +343,7 @@ private Map<String, Object> buildSubqueriesWithAgg(int subqueries, String indexN
323343
for (int i = 1; i < subqueries; i++) {
324344
query.append(", ").append(subquery);
325345
}
326-
query.append(" \"}");
346+
query.append(endQuery());
327347
return responseAsMap(query(query.toString(), "columns,values"));
328348
}
329349

@@ -335,7 +355,7 @@ private Map<String, Object> buildSubqueriesWithSort(int subqueries, String index
335355
for (int i = 1; i < subqueries; i++) {
336356
query.append(", ").append(subquery);
337357
}
338-
query.append(" \"}");
358+
query.append(endQuery());
339359
return responseAsMap(query(query.toString(), "columns"));
340360
}
341361

@@ -346,25 +366,37 @@ private Map<String, Object> buildSubqueriesWithSortInMainQuery(int subqueries, S
346366
for (int i = 1; i < subqueries; i++) {
347367
query.append(", ").append(subquery);
348368
}
349-
query.append(" | SORT ").append(sortKeys).append(" \"}");
369+
query.append(" | SORT ").append(sortKeys).append(endQuery());
350370
return responseAsMap(query(query.toString(), "columns, values"));
351371
}
352372

353-
private static int docs() throws IOException {
354-
// serverless has 6 shards, non-serverless has 1 shard,
355-
// limiting the number of docs to reduce the gc lagging and intermittent OOMs in serverless
356-
return isServerless() ? MAX_DOC_SERVERLESS : MAX_DOC;
373+
private static String endQuery() {
374+
return " \"}";
375+
}
376+
377+
private static int docs() {
378+
return MAX_DOC;
357379
}
358380

359-
private static int fields() throws IOException {
360-
// serverless has 6 shards, non-serverless has 1 shard,
361-
// limiting the number of keyword/text fields to reduce the gc lagging and intermittent OOMs in serverless
362-
return isServerless() ? MAX_STRING_FIELD_SERVERLESS : MAX_STRING_FIELDS;
381+
/*
382+
* Serverless has 6 shards, non-serverless has 1 shard.
383+
* The number of exchange operators increases when the number of subqueries increase.
384+
* Sort is expensive as these tests preserve all fields comparing to stats.
385+
* So, limiting the number of subqueries to reduce the gc lagging and intermittent OOM.
386+
*/
387+
private static int subqueries(boolean hasSort) throws IOException {
388+
if (isServerless()) {
389+
return hasSort ? MIN_SUBQUERIES : MAX_SUBQUERIES_SERVERLESS;
390+
} else {
391+
return MAX_SUBQUERIES;
392+
}
363393
}
364394

365-
private static int maxSubqueries() throws IOException {
366-
// serverless has 6 shards, non-serverless has 1 shard, the number of exchange operators increases when the number of subqueries
367-
// increase, limiting the number of subqueries to reduce the gc lagging and intermittent OOMs in serverless
368-
return isServerless() ? MAX_SUBQUERIES_SERVERLESS : MAX_SUBQUERIES;
395+
private static void verifyCircuitBreakingException(ResponseException re) throws IOException {
396+
Map<?, ?> map = responseAsMap(re.getResponse());
397+
assertMap(
398+
map,
399+
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
400+
);
369401
}
370402
}

0 commit comments

Comments
 (0)