Skip to content

Commit cb01dc3

Browse files
original-brownbearafoucret
authored andcommitted
Add safer conversion from RecyclerBytesStreamOutput to ReleasableBytesReference (elastic#127404)
We have a couple of places in the codebase where we do the transition from the stream to the reference. We can save some code and make this a little less error-prone by having a conversion method with move-style semantics and enabling the use of try-with-resources. Also, this enables a couple of optimizations down the line and unlinking the list of pages and moving it to the reference instead of nulling it out is a bit nicer to the CPU caches also.
1 parent 2544746 commit cb01dc3

File tree

5 files changed

+33
-45
lines changed

5 files changed

+33
-45
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.action.support.IndicesOptions;
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.ClusterState;
26-
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2726
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2827
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2928
import org.elasticsearch.common.io.stream.StreamInput;
@@ -820,7 +819,7 @@ void onShardDone() {
820819
out.close();
821820
}
822821
}
823-
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out)));
822+
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference()));
824823
}
825824

826825
private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) {

server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,7 @@ private ReleasableBytesReference maybeSerializeClusterState(
391391
}
392392
assert clusterState.nodes().isLocalNodeElectedMaster();
393393

394-
final var bytesStream = transportService.newNetworkBytesStream();
395-
var success = false;
396-
try {
394+
try (var bytesStream = transportService.newNetworkBytesStream()) {
397395
try (
398396
var stream = new OutputStreamStreamOutput(
399397
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
@@ -404,22 +402,16 @@ private ReleasableBytesReference maybeSerializeClusterState(
404402
} catch (IOException e) {
405403
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode);
406404
}
407-
final var newBytes = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
408405
logger.trace(
409406
"serialized join validation cluster state version [{}] for transport version [{}] with size [{}]",
410407
clusterState.version(),
411408
version,
412-
newBytes.length()
409+
bytesStream.position()
413410
);
411+
var newBytes = bytesStream.moveToBytesReference();
414412
final var previousBytes = statesByVersion.put(version, newBytes);
415413
assert previousBytes == null;
416-
success = true;
417414
return newBytes;
418-
} finally {
419-
if (success == false) {
420-
bytesStream.close();
421-
assert false;
422-
}
423415
}
424416
}
425417
}

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,7 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu
262262
}
263263

264264
private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) {
265-
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
266-
boolean success = false;
267-
try {
265+
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
268266
final long uncompressedBytes;
269267
try (
270268
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
@@ -278,20 +276,15 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
278276
} catch (IOException e) {
279277
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
280278
}
281-
final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
282-
serializationStatsTracker.serializedFullState(uncompressedBytes, result.length());
279+
final int size = bytesStream.size();
280+
serializationStatsTracker.serializedFullState(uncompressedBytes, size);
283281
logger.trace(
284282
"serialized full cluster state version [{}] using transport version [{}] with size [{}]",
285283
clusterState.version(),
286284
version,
287-
result.length()
285+
size
288286
);
289-
success = true;
290-
return result;
291-
} finally {
292-
if (success == false) {
293-
bytesStream.close();
294-
}
287+
return bytesStream.moveToBytesReference();
295288
}
296289
}
297290

@@ -302,9 +295,7 @@ private ReleasableBytesReference serializeDiffClusterState(
302295
TransportVersion version
303296
) {
304297
final long clusterStateVersion = newState.version();
305-
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
306-
boolean success = false;
307-
try {
298+
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
308299
final long uncompressedBytes;
309300
try (
310301
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
@@ -322,20 +313,15 @@ private ReleasableBytesReference serializeDiffClusterState(
322313
} catch (IOException e) {
323314
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
324315
}
325-
final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
326-
serializationStatsTracker.serializedDiff(uncompressedBytes, result.length());
316+
final int size = bytesStream.size();
317+
serializationStatsTracker.serializedDiff(uncompressedBytes, size);
327318
logger.trace(
328319
"serialized cluster state diff for version [{}] using transport version [{}] with size [{}]",
329320
clusterStateVersion,
330321
version,
331-
result.length()
322+
size
332323
);
333-
success = true;
334-
return result;
335-
} finally {
336-
if (success == false) {
337-
bytesStream.close();
338-
}
324+
return bytesStream.moveToBytesReference();
339325
}
340326
}
341327

server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.bytes.BytesArray;
1414
import org.elasticsearch.common.bytes.BytesReference;
1515
import org.elasticsearch.common.bytes.CompositeBytesReference;
16+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1617
import org.elasticsearch.common.recycler.Recycler;
1718
import org.elasticsearch.core.Releasable;
1819
import org.elasticsearch.core.Releasables;
@@ -37,7 +38,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
3738
static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
3839
static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);
3940

40-
private final ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
41+
private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
4142
private final Recycler<BytesRef> recycler;
4243
private final int pageSize;
4344
private int pageIndex = -1;
@@ -237,13 +238,26 @@ public void skip(int length) {
237238

238239
@Override
239240
public void close() {
240-
try {
241+
var pages = this.pages;
242+
if (pages != null) {
243+
this.pages = null;
241244
Releasables.close(pages);
242-
} finally {
243-
pages.clear();
244245
}
245246
}
246247

248+
/**
249+
* Move the contents written to this stream to a {@link ReleasableBytesReference}. Closing this instance becomes a noop after
250+
* this method returns successfully and its buffers need to be released by releasing the returned bytes reference.
251+
*
252+
* @return a {@link ReleasableBytesReference} that must be released once no longer needed
253+
*/
254+
public ReleasableBytesReference moveToBytesReference() {
255+
var bytes = bytes();
256+
var pages = this.pages;
257+
this.pages = null;
258+
return new ReleasableBytesReference(bytes, () -> Releasables.close(pages));
259+
}
260+
247261
/**
248262
* Returns the current size of the buffer.
249263
*

server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
166166
if (serialization.hasNext() == false) {
167167
builder.close();
168168
}
169-
final var result = new ReleasableBytesReference(
170-
chunkStream.bytes(),
171-
() -> Releasables.closeExpectNoException(chunkStream)
172-
);
169+
final var result = chunkStream.moveToBytesReference();
173170
target = null;
174171
return result;
175172
} catch (Exception e) {

0 commit comments

Comments
 (0)