Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
private final Logger logger;
private final NamedWriteableRegistry namedWriteableRegistry;
protected final NamedWriteableRegistry namedWriteableRegistry;
protected final SearchTransportService searchTransportService;
private final Executor executor;
private final ActionListener<SearchResponse> listener;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -289,4 +290,16 @@ public boolean markSupported() {

@Override
public void close() throws IOException {}

@Override
public boolean supportReadAllToReleasableBytesReference() {
return true;
}

@Override
public ReleasableBytesReference readAllToReleasableBytesReference() {
final byte[] res = new byte[buffer.remaining()];
buffer.get(res);
return ReleasableBytesReference.wrap(new BytesArray(res));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public interface BytesTransportMessage {

TransportVersion version();

ReleasableBytesReference bytes();

/**
* Writes the data in a "thin" manner, without the actual bytes, assumes
* the actual bytes will be appended right after this content.
*/
void writeThin(StreamOutput out) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -21,7 +20,7 @@
* A specialized, bytes only request, that can potentially be optimized on the network
* layer, specifically for the same large buffer send to several nodes.
*/
public class BytesTransportRequest extends AbstractTransportRequest {
public class BytesTransportRequest extends AbstractTransportRequest implements BytesTransportMessage {

final ReleasableBytesReference bytes;
private final TransportVersion version;
Expand All @@ -37,18 +36,17 @@ public BytesTransportRequest(ReleasableBytesReference bytes, TransportVersion ve
this.version = version;
}

@Override
public TransportVersion version() {
return this.version;
}

public BytesReference bytes() {
@Override
public ReleasableBytesReference bytes() {
return this.bytes;
}

/**
* Writes the data in a "thin" manner, without the actual bytes, assumes
* the actual bytes will be appended right after this content.
*/
@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(bytes.length());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A specialized, bytes only response, that can potentially be optimized on the network layer.
*/
public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage {

private final ReleasableBytesReference bytes;
private final TransportVersion version;

public BytesTransportResponse(StreamInput in) throws IOException {
bytes = in.readAllToReleasableBytesReference();
version = in.getTransportVersion();
}

public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) {
this.bytes = bytes;
this.version = version;
}

@Override
public TransportVersion version() {
return this.version;
}

@Override
public ReleasableBytesReference bytes() {
return this.bytes;
}

@Override
public void writeThin(StreamOutput out) throws IOException {}

@Override
public void writeTo(StreamOutput out) throws IOException {
bytes.writeTo(out);
}

@Override
public void incRef() {
bytes.incRef();
}

@Override
public boolean tryIncRef() {
return bytes.tryIncRef();
}

@Override
public boolean decRef() {
return bytes.decRef();
}

@Override
public boolean hasReferences() {
return bytes.hasReferences();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void sendMessage(
Releasable onAfter
) throws IOException {
assert action != null;
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
final var compressionScheme = writeable instanceof BytesTransportMessage ? null : possibleCompressionScheme;
final BytesReference message;
boolean serializeSuccess = false;
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
Expand Down Expand Up @@ -334,11 +334,11 @@ private static BytesReference serializeMessageBody(
final ReleasableBytesReference zeroCopyBuffer;
try {
stream.setTransportVersion(version);
if (writeable instanceof BytesTransportRequest bRequest) {
if (writeable instanceof BytesTransportMessage bRequest) {
assert stream == byteStreamOutput;
assert compressionScheme == null;
bRequest.writeThin(stream);
zeroCopyBuffer = bRequest.bytes;
zeroCopyBuffer = bRequest.bytes();
} else if (writeable instanceof RemoteTransportException remoteTransportException) {
stream.writeException(remoteTransportException);
zeroCopyBuffer = ReleasableBytesReference.empty();
Expand Down