Skip to content

Commit c38d438

Browse files
authored
ESQL: Heap attack tests (elastic#120313) (elastic#120525)
Adds tests for requests that would fill up the heap and crash elasticsearch but for our circuit breakers. In an ideal world we'd stream these results back to and this wouldn't crash anything. But we don't have that at the moment.
1 parent 229a4ae commit c38d438

File tree

2 files changed

+140
-9
lines changed

2 files changed

+140
-9
lines changed

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.elasticsearch.common.unit.ByteSizeValue;
2424
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2525
import org.elasticsearch.core.TimeValue;
26+
import org.elasticsearch.index.IndexMode;
27+
import org.elasticsearch.index.IndexSettings;
28+
import org.elasticsearch.index.mapper.DateFieldMapper;
2629
import org.elasticsearch.test.ListMatcher;
2730
import org.elasticsearch.test.MapMatcher;
2831
import org.elasticsearch.test.cluster.ElasticsearchCluster;
@@ -42,6 +45,7 @@
4245
import java.util.List;
4346
import java.util.Locale;
4447
import java.util.Map;
48+
import java.util.function.IntFunction;
4549
import java.util.stream.Collectors;
4650
import java.util.stream.IntStream;
4751

@@ -57,8 +61,8 @@
5761
import static org.hamcrest.Matchers.matchesRegex;
5862

5963
/**
60-
* Tests that run ESQL queries that have, in the past, used so much memory they
61-
* crash Elasticsearch.
64+
* Tests that run ESQL queries that use a ton of memory. We want to make
65+
* sure they don't consume the entire heap and crash Elasticsearch.
6266
*/
6367
public class HeapAttackIT extends ESRestTestCase {
6468
@ClassRule
@@ -624,6 +628,49 @@ private Response fetchMvLongs() throws IOException {
624628
return query(query.toString(), "columns");
625629
}
626630

631+
public void testLookupExplosion() throws IOException {
632+
int sensorDataCount = 7500;
633+
int lookupEntries = 10000;
634+
Map<?, ?> map = responseAsMap(lookupExplosion(sensorDataCount, lookupEntries));
635+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
636+
}
637+
638+
public void testLookupExplosionManyMatches() throws IOException {
639+
assertCircuitBreaks(() -> lookupExplosion(8500, 10000));
640+
}
641+
642+
private Response lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException {
643+
initSensorData(sensorDataCount, 1);
644+
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484");
645+
StringBuilder query = startQuery();
646+
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}");
647+
return query(query.toString(), null);
648+
}
649+
650+
public void testEnrichExplosion() throws IOException {
651+
int sensorDataCount = 1000;
652+
int lookupEntries = 100;
653+
Map<?, ?> map = responseAsMap(enrichExplosion(sensorDataCount, lookupEntries));
654+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount))));
655+
}
656+
657+
public void testEnrichExplosionManyMatches() throws IOException {
658+
assertCircuitBreaks(() -> enrichExplosion(1000, 10000));
659+
}
660+
661+
private Response enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
662+
initSensorData(sensorDataCount, 1);
663+
initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
664+
try {
665+
StringBuilder query = startQuery();
666+
query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}");
667+
return query(query.toString(), null);
668+
} finally {
669+
Request delete = new Request("DELETE", "/_enrich/policy/sensor");
670+
assertMap(responseAsMap(client().performRequest(delete)), matchesMap().entry("acknowledged", true));
671+
}
672+
}
673+
627674
private void initManyLongs() throws IOException {
628675
logger.info("loading many documents with longs");
629676
StringBuilder bulk = new StringBuilder();
@@ -647,7 +694,7 @@ private void initManyLongs() throws IOException {
647694
}
648695

649696
private void initSingleDocIndex() throws IOException {
650-
logger.info("loading many documents with a single document");
697+
logger.info("loading a single document");
651698
initIndex("single", """
652699
{"create":{}}
653700
{"a":1}
@@ -730,6 +777,77 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
730777
initIndex("mv_longs", bulk.toString());
731778
}
732779

780+
private void initSensorData(int docCount, int sensorCount) throws IOException {
781+
logger.info("loading sensor data");
782+
createIndex("sensor_data", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
783+
{
784+
"properties": {
785+
"@timestamp": { "type": "date" },
786+
"id": { "type": "long" },
787+
"value": { "type": "double" }
788+
}
789+
}""");
790+
int docsPerBulk = 1000;
791+
long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z");
792+
793+
StringBuilder data = new StringBuilder();
794+
for (int i = 0; i < docCount; i++) {
795+
data.append(String.format(Locale.ROOT, """
796+
{"create":{}}
797+
{"timestamp":"%s", "id": %d, "value": %f}
798+
""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate), i % sensorCount, i * 1.1));
799+
if (i % docsPerBulk == docsPerBulk - 1) {
800+
bulk("sensor_data", data.toString());
801+
data.setLength(0);
802+
}
803+
}
804+
initIndex("sensor_data", data.toString());
805+
}
806+
807+
private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
808+
logger.info("loading sensor lookup");
809+
createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
810+
{
811+
"properties": {
812+
"id": { "type": "long" },
813+
"location": { "type": "geo_point" }
814+
}
815+
}""");
816+
int docsPerBulk = 1000;
817+
StringBuilder data = new StringBuilder();
818+
for (int i = 0; i < lookupEntries; i++) {
819+
int sensor = i % sensorCount;
820+
data.append(String.format(Locale.ROOT, """
821+
{"create":{}}
822+
{"id": %d, "location": "POINT(%s)"}
823+
""", sensor, location.apply(sensor)));
824+
if (i % docsPerBulk == docsPerBulk - 1) {
825+
bulk("sensor_lookup", data.toString());
826+
data.setLength(0);
827+
}
828+
}
829+
initIndex("sensor_lookup", data.toString());
830+
}
831+
832+
private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
833+
initSensorLookup(lookupEntries, sensorCount, location);
834+
logger.info("loading sensor enrich");
835+
836+
Request create = new Request("PUT", "/_enrich/policy/sensor");
837+
create.setJsonEntity("""
838+
{
839+
"match": {
840+
"indices": "sensor_lookup",
841+
"match_field": "id",
842+
"enrich_fields": ["location"]
843+
}
844+
}
845+
""");
846+
assertMap(responseAsMap(client().performRequest(create)), matchesMap().entry("acknowledged", true));
847+
Request execute = new Request("POST", "/_enrich/policy/sensor/_execute");
848+
assertMap(responseAsMap(client().performRequest(execute)), matchesMap().entry("status", Map.of("phase", "COMPLETE")));
849+
}
850+
733851
private void bulk(String name, String bulk) throws IOException {
734852
Request request = new Request("POST", "/" + name + "/_bulk");
735853
request.setJsonEntity(bulk);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.routing.ShardRouting;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.CheckedBiFunction;
24+
import org.elasticsearch.common.collect.Iterators;
2425
import org.elasticsearch.common.io.stream.StreamInput;
2526
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.common.util.BigArrays;
@@ -410,13 +411,25 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
410411
driver.cancel(reason);
411412
});
412413
var threadContext = transportService.getThreadPool().getThreadContext();
413-
Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> {
414-
List<Page> out = collectedPages;
415-
if (mergePages && out.isEmpty()) {
416-
out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
414+
Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, new ActionListener<Void>() {
415+
@Override
416+
public void onResponse(Void unused) {
417+
List<Page> out = collectedPages;
418+
if (mergePages && out.isEmpty()) {
419+
out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
420+
}
421+
listener.onResponse(out);
422+
}
423+
424+
@Override
425+
public void onFailure(Exception e) {
426+
Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(collectedPages.iterator(), p -> () -> {
427+
p.allowPassingToDifferentDriver();
428+
p.releaseBlocks();
429+
})));
430+
listener.onFailure(e);
417431
}
418-
return out;
419-
}));
432+
});
420433
started = true;
421434
} catch (Exception e) {
422435
listener.onFailure(e);

0 commit comments

Comments
 (0)