diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 3912a63ef1514..74b7d307c384c 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -585,6 +585,56 @@ public void testFetchTooManyBigFields() throws IOException { assertCircuitBreaks(attempt -> fetchManyBigFields(attempt * 500)); } + public void testStatsOnLargeKeywords() throws IOException { + initVeryLargeText(450, 1, 3_000_000, 1); + StringBuilder query = startQuery(); + query.append("FROM large_text_idx | STATS SUM(LENGTH(large_text0))\"}"); + assertCircuitBreaks(attempt -> responseAsMap(query(query.toString(), "columns"))); + } + + private void initVeryLargeText(int docs, int nFields, int fieldSize, int docsPerBulk) throws IOException { + logger.info("loading many documents a very large string field"); + Request request = new Request("PUT", "/large_text_idx"); + XContentBuilder config = JsonXContent.contentBuilder().startObject(); + config.startObject("mappings").startObject("properties"); + for (int i = 0; i < nFields; i++) { + config.startObject("large_text" + i).field("type", "text").endObject(); + } + + config.endObject().endObject(); + request.setJsonEntity(Strings.toString(config.endObject())); + Response response = client().performRequest(request); + assertThat( + EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), + equalTo("{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"large_text_idx\"}") + ); + + StringBuilder bulk = new StringBuilder(); + for (int d = 0; d < docs; d++) { + bulk.append("{ \"index\" : { \"_index\" : \"large_text_idx\"} }\n"); + + bulk.append('{'); + for (int i = 0; i < nFields; i++) { + if (i > 0) { + bulk.append(","); + } + bulk.append('"').append("").append("large_text" + i).append("\": \""); + bulk.append(randomAlphanumericOfLength(fieldSize)); + bulk.append('"'); + } + + bulk.append("}\n"); + if (d % docsPerBulk == docsPerBulk - 1 && d != docs - 1) { + bulk("large_text_idx", bulk.toString()); + bulk.setLength(0); + logger.error("loaded {} docs", d); + } + } + if (bulk.length() > 0) { + bulk("large_text_idx", bulk.toString()); + } + } + /** * Fetches documents containing 1000 fields which are {@code 1kb} each. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java index 4755b1d609cfb..31b2af8750fe4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java @@ -166,7 +166,7 @@ public final void close() { */ protected void extraClose() {} - protected void adjustBreaker(long deltaBytes) { + public void adjustBreaker(long deltaBytes) { blockFactory.adjustBreaker(deltaBytes); estimatedBytes += deltaBytes; assert estimatedBytes >= 0; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 500bef6d2a597..2aa8ac6274ca5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.AbstractBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; @@ -517,7 +518,15 @@ private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Bui implements Releasable { void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException { - reader.read(doc, storedFields, builder); + int size = storedFields.source() == null ? -1 : storedFields.source().internalSourceRef().length(); + if (size > 500_000 && builder instanceof AbstractBlockBuilder ab) { + // accounting for twice the source size, one for the binary, one for the map + ab.adjustBreaker(size * 2); + reader.read(doc, storedFields, builder); + ab.adjustBreaker(-size * 2); + } else { + reader.read(doc, storedFields, builder); + } } Block build() {