From 53bc41cb1fbc99d2c8bdb264a56addec5ad893c9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 19 Aug 2025 11:49:35 -0400 Subject: [PATCH] ESQL: Add a stream view to BreakingBytesRefBuilder Adds a `.stream()` method to get a `StreamOutput` view into `BreakingBytesRefBuilder`. This is useful in `COLLECT` because we need to write `XContent` into a `BreakingBytesRefBuilder` per document. --- .../operator/BreakingBytesRefBuilder.java | 36 ++++++ .../BreakingBytesRefBuilderTests.java | 113 +++++++++++++++--- 2 files changed, 130 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilder.java index 2578452ad9062..06a6a2c98dea7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilder.java @@ -12,6 +12,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Releasable; /** @@ -162,6 +163,13 @@ public long ramBytesUsed() { return SHALLOW_SIZE + bytesArrayRamBytesUsed(bytes.bytes.length); } + /** + * Builds a {@link StreamOutput} view into the {@link BreakingBytesRefBuilder}. + */ + public StreamOutput stream() { + return new Stream(); + } + private static long bytesArrayRamBytesUsed(long capacity) { return RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + capacity); } @@ -170,4 +178,32 @@ private static long bytesArrayRamBytesUsed(long capacity) { public void close() { breaker.addWithoutBreaking(-ramBytesUsed()); } + + private class Stream extends StreamOutput { + @Override + public long position() { + return length(); + } + + @Override + public void writeByte(byte b) { + append(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) { + append(b, offset, length); + } + + @Override + public void flush() {} + + /** + * Closes this stream to further operations. NOOP because we don't want to + * close the builder when we close. + */ + @Override + public void close() {} + } + } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilderTests.java index 266c17febc5b3..8c2257a63168e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilderTests.java @@ -14,10 +14,16 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.io.stream.BytesRefStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; @@ -34,11 +40,6 @@ public void testAddByte() { testAgainstOracle(() -> new TestIteration() { final byte b = randomByte(); - @Override - public int size() { - return 1; - } - @Override public void applyToBuilder(BreakingBytesRefBuilder builder) { builder.append(b); @@ -55,11 +56,6 @@ public void testAddBytesRef() { testAgainstOracle(() -> new TestIteration() { final BytesRef ref = new BytesRef(randomAlphaOfLengthBetween(1, 100)); - @Override - public int size() { - return ref.length; - } - @Override public void applyToBuilder(BreakingBytesRefBuilder builder) { builder.append(ref); @@ -90,11 +86,6 @@ public void testGrow() { final int length = between(1, 100); final byte b = randomByte(); - @Override - public int size() { - return length; - } - @Override public void applyToBuilder(BreakingBytesRefBuilder builder) { builder.grow(builder.length() + length); @@ -111,9 +102,90 @@ public void applyToOracle(BytesRefBuilder oracle) { }); } - interface TestIteration { - int size(); + public void testStream() { + testAgainstOracle(() -> switch (between(0, 3)) { + case 0 -> new XContentTestIteration() { + @Override + protected void apply(XContentBuilder builder) throws IOException { + // Noop + } + + @Override + public String toString() { + return "noop"; + } + }; + case 1 -> new XContentTestIteration() { + private final String value = randomAlphanumericOfLength(10); + + @Override + protected void apply(XContentBuilder builder) throws IOException { + builder.value(value); + } + + @Override + public String toString() { + return '"' + value + '"'; + } + }; + case 2 -> new XContentTestIteration() { + private final long value = randomLong(); + + @Override + protected void apply(XContentBuilder builder) throws IOException { + builder.value(value); + } + + @Override + public String toString() { + return Long.toString(value); + } + }; + case 3 -> new XContentTestIteration() { + private final String name = randomAlphanumericOfLength(5); + private final String value = randomAlphanumericOfLength(5); + + @Override + protected void apply(XContentBuilder builder) throws IOException { + builder.startObject().field(name, value).endObject(); + } + + @Override + public String toString() { + return name + ": " + value; + } + }; + default -> throw new UnsupportedOperationException(); + }); + } + + private abstract static class XContentTestIteration implements TestIteration { + protected abstract void apply(XContentBuilder builder) throws IOException; + + @Override + public void applyToBuilder(BreakingBytesRefBuilder builder) { + applyToStream(builder.stream()); + } + + @Override + public void applyToOracle(BytesRefBuilder oracle) { + BytesRefStreamOutput out = new BytesRefStreamOutput(); + applyToStream(out); + oracle.append(out.get()); + } + + private void applyToStream(StreamOutput out) { + try { + try (XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, out)) { + apply(builder); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + interface TestIteration { void applyToBuilder(BreakingBytesRefBuilder builder); void applyToOracle(BytesRefBuilder oracle); @@ -131,7 +203,11 @@ private void testAgainstOracle(Supplier iterations) { assertThat(builder.bytesRefView(), equalTo(oracle.get())); while (true) { TestIteration iteration = iterations.get(); - int targetSize = builder.length() + iteration.size(); + + int prevOracle = oracle.length(); + iteration.applyToOracle(oracle); + int size = oracle.length() - prevOracle; + int targetSize = builder.length() + size; boolean willResize = targetSize >= builder.bytes().length; if (willResize) { long resizeMemoryUsage = BreakingBytesRefBuilder.SHALLOW_SIZE + ramForArray(builder.bytes().length); @@ -143,7 +219,6 @@ private void testAgainstOracle(Supplier iterations) { } } iteration.applyToBuilder(builder); - iteration.applyToOracle(oracle); assertThat(builder.bytesRefView(), equalTo(oracle.get())); assertThat( builder.ramBytesUsed(),