Skip to content

Commit 997dc84

Browse files
Merge remote-tracking branch 'elastic/main' into drop-redundant-single-b-interface
2 parents 0349803 + c662590 commit 997dc84

File tree

12 files changed

+117
-23
lines changed

12 files changed

+117
-23
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ static TransportVersion def(int id) {
230230
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_056_0_00);
231231
public static final TransportVersion BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE = def(9_057_0_00);
232232
public static final TransportVersion SEARCH_INCREMENTAL_TOP_DOCS_NULL = def(9_058_0_00);
233+
public static final TransportVersion COMPRESS_DELAYABLE_WRITEABLE = def(9_059_0_00);
233234

234235
/*
235236
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ public ReleasableBytesReference readReleasableBytesReference() throws IOExceptio
178178
return retainAndSkip(len);
179179
}
180180

181+
@Override
182+
public ReleasableBytesReference readReleasableBytesReference(int len) throws IOException {
183+
return retainAndSkip(len);
184+
}
185+
181186
@Override
182187
public ReleasableBytesReference readAllToReleasableBytesReference() throws IOException {
183188
return retainAndSkip(length() - offset());

server/src/main/java/org/elasticsearch/common/io/Streams.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ public void close() throws IOException {
235235
public BytesReference bytes() {
236236
return delegate.bytes();
237237
}
238+
239+
@Override
240+
public void seek(long position) {
241+
delegate.seek(position);
242+
}
238243
}
239244

240245
/**

server/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,28 @@
1010
package org.elasticsearch.common.io.stream;
1111

1212
import org.elasticsearch.common.bytes.BytesReference;
13+
import org.elasticsearch.common.compress.CompressorFactory;
14+
import org.elasticsearch.core.Streams;
15+
16+
import java.io.IOException;
1317

1418
public abstract class BytesStream extends StreamOutput {
1519

20+
@Override
21+
public void writeWithSizePrefix(Writeable writeable) throws IOException {
22+
long pos = position();
23+
seek(pos + Integer.BYTES);
24+
try (var out = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(this)))) {
25+
out.setTransportVersion(getTransportVersion());
26+
writeable.writeTo(out);
27+
}
28+
long newPos = position();
29+
seek(pos);
30+
writeInt(Math.toIntExact(newPos - pos - Integer.BYTES));
31+
seek(newPos);
32+
}
33+
1634
public abstract BytesReference bytes();
35+
36+
public abstract void seek(long position);
1737
}

server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public void flush() {
106106
// nothing to do
107107
}
108108

109+
@Override
109110
public void seek(long position) {
110111
ensureCapacity(position);
111112
count = (int) position;

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
package org.elasticsearch.common.io.stream;
1111

1212
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.TransportVersions;
1314
import org.elasticsearch.common.bytes.BytesReference;
1415
import org.elasticsearch.common.bytes.ReleasableBytesReference;
16+
import org.elasticsearch.common.compress.CompressorFactory;
1517
import org.elasticsearch.core.Nullable;
1618
import org.elasticsearch.core.Releasable;
1719

@@ -53,11 +55,22 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(T referenc
5355
* when {@link #expand()} is called.
5456
*/
5557
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
56-
return new Serialized<>(reader, in.getTransportVersion(), in.namedWriteableRegistry(), in.readReleasableBytesReference());
58+
return new Serialized<>(
59+
reader,
60+
in.getTransportVersion(),
61+
in.namedWriteableRegistry(),
62+
in.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)
63+
? in.readReleasableBytesReference(in.readInt())
64+
: in.readReleasableBytesReference()
65+
);
5766
}
5867

5968
public static <T extends Writeable> DelayableWriteable<T> referencing(Writeable.Reader<T> reader, StreamInput in) throws IOException {
60-
try (ReleasableBytesReference serialized = in.readReleasableBytesReference()) {
69+
try (
70+
ReleasableBytesReference serialized = in.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)
71+
? in.readReleasableBytesReference(in.readInt())
72+
: in.readReleasableBytesReference()
73+
) {
6174
return new Referencing<>(deserialize(reader, in.getTransportVersion(), in.namedWriteableRegistry(), serialized));
6275
}
6376
}
@@ -95,7 +108,11 @@ private Referencing(T reference) {
95108

96109
@Override
97110
public void writeTo(StreamOutput out) throws IOException {
98-
out.writeWithSizePrefix(reference);
111+
if (out.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)) {
112+
out.writeWithSizePrefix(reference);
113+
} else {
114+
out.legacyWriteWithSizePrefix(reference);
115+
}
99116
}
100117

101118
@Override
@@ -105,13 +122,14 @@ public T expand() {
105122

106123
@Override
107124
public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry registry) {
108-
BytesStreamOutput buffer;
109-
try {
110-
buffer = writeToBuffer(TransportVersion.current());
125+
// TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers
126+
BytesStreamOutput buffer = new BytesStreamOutput();
127+
try (var out = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(buffer))) {
128+
out.setTransportVersion(TransportVersion.current());
129+
reference.writeTo(out);
111130
} catch (IOException e) {
112131
throw new RuntimeException("unexpected error writing writeable to buffer", e);
113132
}
114-
// TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers
115133
return new Serialized<>(reader, TransportVersion.current(), registry, ReleasableBytesReference.wrap(buffer.bytes()));
116134
}
117135

@@ -125,14 +143,6 @@ public long getSerializedSize() {
125143
return DelayableWriteable.getSerializedSize(reference);
126144
}
127145

128-
private BytesStreamOutput writeToBuffer(TransportVersion version) throws IOException {
129-
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
130-
buffer.setTransportVersion(version);
131-
reference.writeTo(buffer);
132-
return buffer;
133-
}
134-
}
135-
136146
@Override
137147
public void close() {
138148
// noop
@@ -169,7 +179,12 @@ public void writeTo(StreamOutput out) throws IOException {
169179
* which is good because this is how shard request caching
170180
* works.
171181
*/
172-
out.writeBytesReference(serialized);
182+
if (out.getTransportVersion().onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)) {
183+
out.writeInt(serialized.length());
184+
serialized.writeTo(out);
185+
} else {
186+
out.writeBytesReference(serialized);
187+
}
173188
} else {
174189
/*
175190
* If the version doesn't line up then we have to deserialize
@@ -211,6 +226,7 @@ public long getSerializedSize() {
211226
public void close() {
212227
serialized.close();
213228
}
229+
214230
}
215231

216232
/**
@@ -232,7 +248,11 @@ private static <T> T deserialize(
232248
NamedWriteableRegistry registry,
233249
BytesReference serialized
234250
) throws IOException {
235-
try (StreamInput in = serialized.streamInput()) {
251+
try (
252+
StreamInput in = serializedAtVersion.onOrAfter(TransportVersions.COMPRESS_DELAYABLE_WRITEABLE)
253+
? CompressorFactory.COMPRESSOR.threadLocalStreamInput(serialized.streamInput())
254+
: serialized.streamInput()
255+
) {
236256
return reader.read(wrapWithDeduplicatorStreamInput(in, serializedAtVersion, registry));
237257
}
238258
}

server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public ReleasableBytesReference readReleasableBytesReference() throws IOExceptio
4747
return delegate.readReleasableBytesReference();
4848
}
4949

50+
@Override
51+
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
52+
return delegate.readReleasableBytesReference(length);
53+
}
54+
5055
@Override
5156
public boolean supportReadAllToReleasableBytesReference() {
5257
return delegate.supportReadAllToReleasableBytesReference();

server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void writeLongLE(long i) throws IOException {
155155
}
156156

157157
@Override
158-
public void writeWithSizePrefix(Writeable writeable) throws IOException {
158+
public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException {
159159
// TODO: do this without copying the bytes from tmp by calling writeBytes and just use the pages in tmp directly through
160160
// manipulation of the offsets on the pages after writing to tmp. This will require adjustments to the places in this class
161161
// that make assumptions about the page size
@@ -214,6 +214,7 @@ public void flush() {
214214
// nothing to do
215215
}
216216

217+
@Override
217218
public void seek(long position) {
218219
ensureCapacityFromPosition(position);
219220
int offsetInPage = (int) (position % pageSize);

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ public ReleasableBytesReference readReleasableBytesReference() throws IOExceptio
127127
return ReleasableBytesReference.wrap(readBytesReference());
128128
}
129129

130+
/**
131+
* Same as {@link #readBytesReference()} but with an explicitly provided length.
132+
* @param length number of bytes to read
133+
*/
134+
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
135+
return ReleasableBytesReference.wrap(readBytesReference(length));
136+
}
137+
130138
/**
131139
* Reads the same bytes returned by {@link #readReleasableBytesReference()} but does not retain a reference to these bytes.
132140
* The returned {@link BytesReference} thus only contains valid content as long as the underlying buffer has not been released.
@@ -138,6 +146,10 @@ public BytesReference readSlicedBytesReference() throws IOException {
138146
return readBytesReference();
139147
}
140148

149+
public BytesReference readSlicedBytesReference(int bytes) throws IOException {
150+
return readBytesReference(bytes);
151+
}
152+
141153
/**
142154
* Checks if this {@link InputStream} supports {@link #readAllToReleasableBytesReference()}.
143155
*/

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.Strings;
1919
import org.elasticsearch.common.bytes.BytesArray;
2020
import org.elasticsearch.common.bytes.BytesReference;
21+
import org.elasticsearch.common.compress.CompressorFactory;
2122
import org.elasticsearch.common.geo.GeoPoint;
2223
import org.elasticsearch.common.io.stream.Writeable.Writer;
2324
import org.elasticsearch.common.settings.SecureString;
@@ -131,14 +132,34 @@ public void writeByteArray(byte[] b) throws IOException {
131132
* Serializes a writable just like {@link Writeable#writeTo(StreamOutput)} would but prefixes it with the serialized size of the result.
132133
*
133134
* @param writeable {@link Writeable} to serialize
135+
* @deprecated use {@link #writeWithSizePrefix} instead
134136
*/
135-
public void writeWithSizePrefix(Writeable writeable) throws IOException {
137+
@Deprecated
138+
public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException {
136139
final BytesStreamOutput tmp = new BytesStreamOutput();
137140
tmp.setTransportVersion(version);
138141
writeable.writeTo(tmp);
139142
writeBytesReference(tmp.bytes());
140143
}
141144

145+
/**
146+
* Serializes a writable just like {@link Writeable#writeTo(StreamOutput)} would but also compresses and prefixes it with the serialized
147+
* size of the result.
148+
149+
*
150+
* @param writeable {@link Writeable} to serialize
151+
*/
152+
public void writeWithSizePrefix(Writeable writeable) throws IOException {
153+
final BytesStreamOutput tmp = new BytesStreamOutput();
154+
try (var o = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(tmp))) {
155+
o.setTransportVersion(version);
156+
writeable.writeTo(o);
157+
}
158+
var bytes = tmp.bytes();
159+
this.writeInt(bytes.length());
160+
bytes.writeTo(this);
161+
}
162+
142163
/**
143164
* Writes the bytes reference, including a length header.
144165
*/

0 commit comments

Comments
 (0)