Skip to content

Commit 03cb385

Browse files
authored
ESQL: Heap attack tests (#120313)
* ESQL: Heap attack tests Adds tests for requests that would fill up the heap and crash elasticsearch but for our circuit breakers. In an ideal world we'd stream these results back to and this wouldn't crash anything. But we don't have that at the moment. * Better date * Update
1 parent 04358fa commit 03cb385

File tree

2 files changed

+140
-9
lines changed

2 files changed

+140
-9
lines changed

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.elasticsearch.common.unit.ByteSizeValue;
2424
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2525
import org.elasticsearch.core.TimeValue;
26+
import org.elasticsearch.index.IndexMode;
27+
import org.elasticsearch.index.IndexSettings;
28+
import org.elasticsearch.index.mapper.DateFieldMapper;
2629
import org.elasticsearch.test.ListMatcher;
2730
import org.elasticsearch.test.MapMatcher;
2831
import org.elasticsearch.test.cluster.ElasticsearchCluster;
@@ -42,6 +45,7 @@
4245
import java.util.List;
4346
import java.util.Locale;
4447
import java.util.Map;
48+
import java.util.function.IntFunction;
4549
import java.util.stream.Collectors;
4650
import java.util.stream.IntStream;
4751

@@ -57,8 +61,8 @@
5761
import static org.hamcrest.Matchers.matchesRegex;
5862

5963
/**
60-
* Tests that run ESQL queries that have, in the past, used so much memory they
61-
* crash Elasticsearch.
64+
* Tests that run ESQL queries that use a ton of memory. We want to make
65+
* sure they don't consume the entire heap and crash Elasticsearch.
6266
*/
6367
public class HeapAttackIT extends ESRestTestCase {
6468
@ClassRule
@@ -624,6 +628,49 @@ private Response fetchMvLongs() throws IOException {
624628
return query(query.toString(), "columns");
625629
}
626630

631+
public void testLookupExplosion() throws IOException {
632+
int sensorDataCount = 7500;
633+
int lookupEntries = 10000;
634+
Map<?, ?> map = responseAsMap(lookupExplosion(sensorDataCount, lookupEntries));
635+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
636+
}
637+
638+
public void testLookupExplosionManyMatches() throws IOException {
639+
assertCircuitBreaks(() -> lookupExplosion(8500, 10000));
640+
}
641+
642+
private Response lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException {
643+
initSensorData(sensorDataCount, 1);
644+
initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484");
645+
StringBuilder query = startQuery();
646+
query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}");
647+
return query(query.toString(), null);
648+
}
649+
650+
public void testEnrichExplosion() throws IOException {
651+
int sensorDataCount = 1000;
652+
int lookupEntries = 100;
653+
Map<?, ?> map = responseAsMap(enrichExplosion(sensorDataCount, lookupEntries));
654+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount))));
655+
}
656+
657+
public void testEnrichExplosionManyMatches() throws IOException {
658+
assertCircuitBreaks(() -> enrichExplosion(1000, 10000));
659+
}
660+
661+
private Response enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
662+
initSensorData(sensorDataCount, 1);
663+
initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
664+
try {
665+
StringBuilder query = startQuery();
666+
query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}");
667+
return query(query.toString(), null);
668+
} finally {
669+
Request delete = new Request("DELETE", "/_enrich/policy/sensor");
670+
assertMap(responseAsMap(client().performRequest(delete)), matchesMap().entry("acknowledged", true));
671+
}
672+
}
673+
627674
private void initManyLongs() throws IOException {
628675
logger.info("loading many documents with longs");
629676
StringBuilder bulk = new StringBuilder();
@@ -647,7 +694,7 @@ private void initManyLongs() throws IOException {
647694
}
648695

649696
private void initSingleDocIndex() throws IOException {
650-
logger.info("loading many documents with a single document");
697+
logger.info("loading a single document");
651698
initIndex("single", """
652699
{"create":{}}
653700
{"a":1}
@@ -730,6 +777,77 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
730777
initIndex("mv_longs", bulk.toString());
731778
}
732779

780+
private void initSensorData(int docCount, int sensorCount) throws IOException {
781+
logger.info("loading sensor data");
782+
createIndex("sensor_data", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
783+
{
784+
"properties": {
785+
"@timestamp": { "type": "date" },
786+
"id": { "type": "long" },
787+
"value": { "type": "double" }
788+
}
789+
}""");
790+
int docsPerBulk = 1000;
791+
long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z");
792+
793+
StringBuilder data = new StringBuilder();
794+
for (int i = 0; i < docCount; i++) {
795+
data.append(String.format(Locale.ROOT, """
796+
{"create":{}}
797+
{"timestamp":"%s", "id": %d, "value": %f}
798+
""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate), i % sensorCount, i * 1.1));
799+
if (i % docsPerBulk == docsPerBulk - 1) {
800+
bulk("sensor_data", data.toString());
801+
data.setLength(0);
802+
}
803+
}
804+
initIndex("sensor_data", data.toString());
805+
}
806+
807+
private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
808+
logger.info("loading sensor lookup");
809+
createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
810+
{
811+
"properties": {
812+
"id": { "type": "long" },
813+
"location": { "type": "geo_point" }
814+
}
815+
}""");
816+
int docsPerBulk = 1000;
817+
StringBuilder data = new StringBuilder();
818+
for (int i = 0; i < lookupEntries; i++) {
819+
int sensor = i % sensorCount;
820+
data.append(String.format(Locale.ROOT, """
821+
{"create":{}}
822+
{"id": %d, "location": "POINT(%s)"}
823+
""", sensor, location.apply(sensor)));
824+
if (i % docsPerBulk == docsPerBulk - 1) {
825+
bulk("sensor_lookup", data.toString());
826+
data.setLength(0);
827+
}
828+
}
829+
initIndex("sensor_lookup", data.toString());
830+
}
831+
832+
private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
833+
initSensorLookup(lookupEntries, sensorCount, location);
834+
logger.info("loading sensor enrich");
835+
836+
Request create = new Request("PUT", "/_enrich/policy/sensor");
837+
create.setJsonEntity("""
838+
{
839+
"match": {
840+
"indices": "sensor_lookup",
841+
"match_field": "id",
842+
"enrich_fields": ["location"]
843+
}
844+
}
845+
""");
846+
assertMap(responseAsMap(client().performRequest(create)), matchesMap().entry("acknowledged", true));
847+
Request execute = new Request("POST", "/_enrich/policy/sensor/_execute");
848+
assertMap(responseAsMap(client().performRequest(execute)), matchesMap().entry("status", Map.of("phase", "COMPLETE")));
849+
}
850+
733851
private void bulk(String name, String bulk) throws IOException {
734852
Request request = new Request("POST", "/" + name + "/_bulk");
735853
request.setJsonEntity(bulk);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.routing.ShardRouting;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.CheckedBiFunction;
24+
import org.elasticsearch.common.collect.Iterators;
2425
import org.elasticsearch.common.io.stream.StreamInput;
2526
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.common.util.BigArrays;
@@ -409,13 +410,25 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
409410
driver.cancel(reason);
410411
});
411412
var threadContext = transportService.getThreadPool().getThreadContext();
412-
Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> {
413-
List<Page> out = collectedPages;
414-
if (mergePages && out.isEmpty()) {
415-
out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
413+
Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, new ActionListener<Void>() {
414+
@Override
415+
public void onResponse(Void unused) {
416+
List<Page> out = collectedPages;
417+
if (mergePages && out.isEmpty()) {
418+
out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
419+
}
420+
listener.onResponse(out);
421+
}
422+
423+
@Override
424+
public void onFailure(Exception e) {
425+
Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(collectedPages.iterator(), p -> () -> {
426+
p.allowPassingToDifferentDriver();
427+
p.releaseBlocks();
428+
})));
429+
listener.onFailure(e);
416430
}
417-
return out;
418-
}));
431+
});
419432
started = true;
420433
} catch (Exception e) {
421434
listener.onFailure(e);

0 commit comments

Comments
 (0)