Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00);
public static final TransportVersion BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE = def(9_053_0_00);
public static final TransportVersion COMPRESS_DELAYABLE_WRITEABLE = def(9_054_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ public ReleasableBytesReference readReleasableBytesReference() throws IOExceptio
return retainAndSkip(len);
}

@Override
public ReleasableBytesReference readReleasableBytesReference(int len) throws IOException {
return retainAndSkip(len);
}

@Override
public ReleasableBytesReference readAllToReleasableBytesReference() throws IOException {
return retainAndSkip(length() - offset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ public void close() throws IOException {
public BytesReference bytes() {
return delegate.bytes();
}

@Override
public void seek(long position) {
delegate.seek(position);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,28 @@
package org.elasticsearch.common.io.stream;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.core.Streams;

import java.io.IOException;

public abstract class BytesStream extends StreamOutput {

@Override
public void writeWithSizePrefix(Writeable writeable) throws IOException {
long pos = position();
seek(pos + Integer.BYTES);
try (var out = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(this)))) {
out.setTransportVersion(getTransportVersion());
writeable.writeTo(out);
}
long newPos = position();
seek(pos);
writeInt(Math.toIntExact(newPos - pos - Integer.BYTES));
seek(newPos);
}

public abstract BytesReference bytes();

public abstract void seek(long position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void flush() {
// nothing to do
}

@Override
public void seek(long position) {
ensureCapacity(position);
count = (int) position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
package org.elasticsearch.common.io.stream;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;

Expand Down Expand Up @@ -53,11 +55,22 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(T referenc
* when {@link #expand()} is called.
*/
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
return new Serialized<>(reader, in.getTransportVersion(), in.namedWriteableRegistry(), in.readReleasableBytesReference());
return new Serialized<>(
reader,
in.getTransportVersion(),
in.namedWriteableRegistry(),
in.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)
? in.readReleasableBytesReference(in.readInt())
: in.readReleasableBytesReference()
);
}

public static <T extends Writeable> DelayableWriteable<T> referencing(Writeable.Reader<T> reader, StreamInput in) throws IOException {
try (ReleasableBytesReference serialized = in.readReleasableBytesReference()) {
try (
ReleasableBytesReference serialized = in.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)
? in.readReleasableBytesReference(in.readInt())
: in.readReleasableBytesReference()
) {
return new Referencing<>(deserialize(reader, in.getTransportVersion(), in.namedWriteableRegistry(), serialized));
}
}
Expand Down Expand Up @@ -95,7 +108,11 @@ private Referencing(T reference) {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeWithSizePrefix(reference);
if (out.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)) {
out.writeWithSizePrefix(reference);
} else {
out.legacyWriteWithSizePrefix(reference);
}
}

@Override
Expand All @@ -105,13 +122,14 @@ public T expand() {

@Override
public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry registry) {
BytesStreamOutput buffer;
try {
buffer = writeToBuffer(TransportVersion.current());
// TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers
BytesStreamOutput buffer = new BytesStreamOutput();
try (var out = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(buffer))) {
out.setTransportVersion(TransportVersion.current());
reference.writeTo(out);
} catch (IOException e) {
throw new RuntimeException("unexpected error writing writeable to buffer", e);
}
// TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers
return new Serialized<>(reader, TransportVersion.current(), registry, ReleasableBytesReference.wrap(buffer.bytes()));
}

Expand All @@ -125,14 +143,6 @@ public long getSerializedSize() {
return DelayableWriteable.getSerializedSize(reference);
}

private BytesStreamOutput writeToBuffer(TransportVersion version) throws IOException {
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
buffer.setTransportVersion(version);
reference.writeTo(buffer);
return buffer;
}
}

@Override
public void close() {
// noop
Expand Down Expand Up @@ -169,7 +179,12 @@ public void writeTo(StreamOutput out) throws IOException {
* which is good because this is how shard request caching
* works.
*/
out.writeBytesReference(serialized);
if (out.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)) {
out.writeInt(serialized.length());
serialized.writeTo(out);
} else {
out.writeBytesReference(serialized);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the above comment is no longer accurate with the new branch since we will no longer copy the raw bytes, but instead reencode?

Copy link
Contributor Author

@original-brownbear original-brownbear Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still copy the raw bytes, it's just that the prefix is a full int now.
writeBytesReference is nothing else but:

                 out.writeVInt(serialized.length());
                 serialized.writeTo(out);

all that happens here is going from a vint to an int?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, I understand now.

}
} else {
/*
* If the version doesn't line up then we have to deserialize
Expand Down Expand Up @@ -211,6 +226,7 @@ public long getSerializedSize() {
public void close() {
serialized.close();
}

}

/**
Expand All @@ -232,7 +248,11 @@ private static <T> T deserialize(
NamedWriteableRegistry registry,
BytesReference serialized
) throws IOException {
try (StreamInput in = serialized.streamInput()) {
try (
StreamInput in = serializedAtVersion.onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)
? CompressorFactory.COMPRESSOR.threadLocalStreamInput(serialized.streamInput())
: serialized.streamInput()
) {
return reader.read(wrapWithDeduplicatorStreamInput(in, serializedAtVersion, registry));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public ReleasableBytesReference readReleasableBytesReference() throws IOExceptio
return delegate.readReleasableBytesReference();
}

@Override
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
return delegate.readReleasableBytesReference(length);
}

@Override
public boolean supportReadAllToReleasableBytesReference() {
return delegate.supportReadAllToReleasableBytesReference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void writeLongLE(long i) throws IOException {
}

@Override
public void writeWithSizePrefix(Writeable writeable) throws IOException {
public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException {
// TODO: do this without copying the bytes from tmp by calling writeBytes and just use the pages in tmp directly through
// manipulation of the offsets on the pages after writing to tmp. This will require adjustments to the places in this class
// that make assumptions about the page size
Expand Down Expand Up @@ -214,6 +214,7 @@ public void flush() {
// nothing to do
}

@Override
public void seek(long position) {
ensureCapacityFromPosition(position);
int offsetInPage = (int) (position % pageSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ public ReleasableBytesReference readReleasableBytesReference() throws IOExceptio
return ReleasableBytesReference.wrap(readBytesReference());
}

/**
* Same as {@link #readBytesReference()} but with an explicitly provided length.
* @param length number of bytes to read
*/
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
return ReleasableBytesReference.wrap(readBytesReference(length));
}

/**
* Reads the same bytes returned by {@link #readReleasableBytesReference()} but does not retain a reference to these bytes.
* The returned {@link BytesReference} thus only contains valid content as long as the underlying buffer has not been released.
Expand All @@ -138,6 +146,10 @@ public BytesReference readSlicedBytesReference() throws IOException {
return readBytesReference();
}

public BytesReference readSlicedBytesReference(int bytes) throws IOException {
return readBytesReference(bytes);
}

/**
* Checks if this {@link InputStream} supports {@link #readAllToReleasableBytesReference()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Writer;
import org.elasticsearch.common.settings.SecureString;
Expand Down Expand Up @@ -131,14 +132,34 @@ public void writeByteArray(byte[] b) throws IOException {
* Serializes a writable just like {@link Writeable#writeTo(StreamOutput)} would but prefixes it with the serialized size of the result.
*
* @param writeable {@link Writeable} to serialize
* @deprecated use {@link #writeWithSizePrefix} instead
*/
public void writeWithSizePrefix(Writeable writeable) throws IOException {
@Deprecated
public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException {
final BytesStreamOutput tmp = new BytesStreamOutput();
tmp.setTransportVersion(version);
writeable.writeTo(tmp);
writeBytesReference(tmp.bytes());
}

/**
* Serializes a writable just like {@link Writeable#writeTo(StreamOutput)} would but also compresses and prefixes it with the serialized
* size of the result.

*
* @param writeable {@link Writeable} to serialize
*/
public void writeWithSizePrefix(Writeable writeable) throws IOException {
final BytesStreamOutput tmp = new BytesStreamOutput();
try (var o = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(tmp))) {
o.setTransportVersion(version);
writeable.writeTo(o);
}
var bytes = tmp.bytes();
this.writeInt(bytes.length());
bytes.writeTo(this);
}

/**
* Writes the bytes reference, including a length header.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,11 @@ public BytesReference bytes() {
return bytesReference;
}

@Override
public void seek(long position) {
output.seek(position);
}

public int size() {
int size = output.size();
assertThat((long) size, equalTo(counting.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -1971,11 +1970,10 @@ protected static <T extends Writeable> T copyInstance(
} else {
BytesReference bytesReference = output.copyBytes();
output.reset();
output.writeBytesReference(bytesReference);
bytesReference.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setTransportVersion(version);
DelayableWriteable<T> delayableWriteable = DelayableWriteable.delayed(reader, in);
return delayableWriteable.expand();
return reader.read(in);
}
}
}
Expand Down