Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,17 @@ private static int headerBytesToRead(BytesReference reference, ByteSizeValue max
return 0;
}

TransportVersion remoteVersion = TransportVersion.fromId(reference.getInt(TcpHeader.VERSION_POSITION));
int fixedHeaderSize = TcpHeader.headerSize(remoteVersion);
if (fixedHeaderSize > reference.length()) {
if (reference.length() <= TcpHeader.HEADER_SIZE) {
return 0;
} else if (remoteVersion.before(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
return fixedHeaderSize;
} else {
int variableHeaderSize = reference.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
if (variableHeaderSize < 0) {
throw new StreamCorruptedException("invalid negative variable header size: " + variableHeaderSize);
}
if (variableHeaderSize > maxHeaderSize.getBytes() - fixedHeaderSize) {
throw new StreamCorruptedException(
"header size [" + (fixedHeaderSize + variableHeaderSize) + "] exceeds limit of [" + maxHeaderSize + "]"
);
int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize;
if (totalHeaderSize > maxHeaderSize.getBytes()) {
throw new StreamCorruptedException("header size [" + totalHeaderSize + "] exceeds limit of [" + maxHeaderSize + "]");
}
int totalHeaderSize = fixedHeaderSize + variableHeaderSize;
if (totalHeaderSize > reference.length()) {
return 0;
} else {
Expand Down Expand Up @@ -211,11 +205,9 @@ private static Header readHeader(int networkMessageSize, BytesReference bytesRef
checkVersionCompatibility(header.getVersion());
}

if (header.getVersion().onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
// Skip since we already have ensured enough data available
streamInput.readInt();
header.finishParsingHeader(streamInput);
}
// Skip since we already have ensured enough data available
streamInput.readInt();
header.finishParsingHeader(streamInput);
return header;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,20 @@ abstract class OutboundMessage extends NetworkMessage {

BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOException {
bytesStream.setTransportVersion(version);
bytesStream.skip(TcpHeader.headerSize(version));
bytesStream.skip(TcpHeader.HEADER_SIZE);

// The compressible bytes stream will not close the underlying bytes stream
BytesReference reference;
int variableHeaderLength = -1;
final long preHeaderPosition = bytesStream.position();

if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
writeVariableHeader(bytesStream);
variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
}
writeVariableHeader(bytesStream);
int variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);

final boolean compress = TransportStatus.isCompress(status);
final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream;
final ReleasableBytesReference zeroCopyBuffer;
try {
stream.setTransportVersion(version);
if (variableHeaderLength == -1) {
writeVariableHeader(stream);
}
if (message instanceof BytesTransportRequest bRequest) {
bRequest.writeThin(stream);
zeroCopyBuffer = bRequest.bytes;
Expand All @@ -89,7 +83,7 @@ BytesReference serialize(RecyclerBytesStreamOutput bytesStream) throws IOExcepti
}

bytesStream.seek(0);
final int contentSize = reference.length() - TcpHeader.headerSize(version);
final int contentSize = reference.length() - TcpHeader.HEADER_SIZE;
TcpHeader.writeHeader(bytesStream, requestId, status, version, contentSize, variableHeaderLength);
return reference;
}
Expand Down
28 changes: 4 additions & 24 deletions server/src/main/java/org/elasticsearch/transport/TcpHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class TcpHeader {

public static final TransportVersion VERSION_WITH_HEADER_SIZE = TransportVersions.V_7_6_0;

public static final int MARKER_BYTES_SIZE = 2;

public static final int MESSAGE_LENGTH_SIZE = 4;
Expand All @@ -37,19 +34,9 @@ public class TcpHeader {

public static final int VARIABLE_HEADER_SIZE_POSITION = VERSION_POSITION + VERSION_ID_SIZE;

private static final int PRE_76_HEADER_SIZE = VERSION_POSITION + VERSION_ID_SIZE;

public static final int BYTES_REQUIRED_FOR_VERSION = PRE_76_HEADER_SIZE;
public static final int BYTES_REQUIRED_FOR_VERSION = VERSION_POSITION + VERSION_ID_SIZE;

private static final int HEADER_SIZE = PRE_76_HEADER_SIZE + VARIABLE_HEADER_SIZE;

public static int headerSize(TransportVersion version) {
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
return HEADER_SIZE;
} else {
return PRE_76_HEADER_SIZE;
}
}
public static final int HEADER_SIZE = BYTES_REQUIRED_FOR_VERSION + VARIABLE_HEADER_SIZE;

private static final byte[] PREFIX = { (byte) 'E', (byte) 'S' };

Expand All @@ -63,17 +50,10 @@ public static void writeHeader(
) throws IOException {
output.writeBytes(PREFIX);
// write the size, the size indicates the remaining message size, not including the size int
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);
} else {
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE);
}
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);
output.writeLong(requestId);
output.writeByte(status);
output.writeInt(version.id());
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
assert variableHeaderSize != -1 : "Variable header size not set";
output.writeInt(variableHeaderSize);
}
output.writeInt(variableHeaderSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,7 @@ private static String format(TcpChannel channel, BytesReference message, String
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);

if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
sb.append(", header size: ").append(streamInput.readInt()).append('B');
} else {
streamInput = decompressingStream(status, streamInput);
assert InboundHandler.assertRemoteVersion(streamInput, version);
}
sb.append(", header size: ").append(streamInput.readInt()).append('B');

// read and discard headers
ThreadContext.readHeadersFromStream(streamInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ public void testDecode() throws IOException {

try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference totalBytes = message.serialize(os);
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt(
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
);
int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize);

InboundDecoder decoder = new InboundDecoder(recycler);
Expand Down Expand Up @@ -151,13 +149,12 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com

try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference bytes = message.serialize(os);
int totalHeaderSize = TcpHeader.headerSize(transportVersion);

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertThat(bytesConsumed, greaterThan(totalHeaderSize));
assertThat(bytesConsumed, greaterThan(TcpHeader.HEADER_SIZE));
assertTrue(releasable1.hasReferences());

final Header header = (Header) fragments.get(0);
Expand Down Expand Up @@ -213,9 +210,7 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.SERVER, ChannelType.MIX))) {
final ArrayList<Object> fragments = new ArrayList<>();
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0)
? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION)
: 0);
int totalHeaderSize = TcpHeader.HEADER_SIZE + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
assertEquals(totalHeaderSize, bytesConsumed);
final Header header = (Header) fragments.get(0);
assertEquals(requestId, header.getRequestId());
Expand Down Expand Up @@ -259,9 +254,7 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.CLIENT, ChannelType.MIX))) {
final ArrayList<Object> fragments = new ArrayList<>();
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0)
? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION)
: 0);
int totalHeaderSize = TcpHeader.HEADER_SIZE + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
assertEquals(totalHeaderSize, bytesConsumed);
final Header header = (Header) fragments.get(0);
assertEquals(requestId, header.getRequestId());
Expand Down Expand Up @@ -304,9 +297,7 @@ public void testCompressedDecode() throws IOException {
final BytesStreamOutput out = new BytesStreamOutput();
transportMessage.writeTo(out);
final BytesReference uncompressedBytes = out.bytes();
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt(
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
);
int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public void testPing() throws Exception {

public void testRequestAndResponse() throws Exception {
String action = "test-request";
int headerSize = TcpHeader.headerSize(TransportVersion.current());
boolean isError = randomBoolean();
AtomicReference<TestRequest> requestCaptor = new AtomicReference<>();
AtomicReference<TestResponse> responseCaptor = new AtomicReference<>();
Expand Down Expand Up @@ -183,7 +182,7 @@ public TestResponse read(StreamInput in) throws IOException {

BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
BytesReference fullRequestBytes = request.serialize(new RecyclerBytesStreamOutput(recycler));
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);
BytesReference requestContent = fullRequestBytes.slice(TcpHeader.HEADER_SIZE, fullRequestBytes.length() - TcpHeader.HEADER_SIZE);
Header requestHeader = new Header(
fullRequestBytes.length() - 6,
requestId,
Expand All @@ -208,7 +207,7 @@ public TestResponse read(StreamInput in) throws IOException {
}

BytesReference fullResponseBytes = channel.getMessageCaptor().get();
BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize);
BytesReference responseContent = fullResponseBytes.slice(TcpHeader.HEADER_SIZE, fullResponseBytes.length() - TcpHeader.HEADER_SIZE);
Header responseHeader = new Header(fullRequestBytes.length() - 6, requestId, responseStatus, TransportVersion.current());
InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {});
responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,8 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException {
}

final BytesReference reference = message.serialize(streamOutput);
final int fixedHeaderSize = TcpHeader.headerSize(TransportVersion.current());
final int variableHeaderSize = reference.getInt(fixedHeaderSize - 4);
final int totalHeaderSize = fixedHeaderSize + variableHeaderSize;
final int variableHeaderSize = reference.getInt(TcpHeader.HEADER_SIZE - 4);
final int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize;
final AtomicBoolean bodyReleased = new AtomicBoolean(false);
for (int i = 0; i < totalHeaderSize - 1; ++i) {
try (ReleasableBytesReference slice = ReleasableBytesReference.wrap(reference.slice(i, 1))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void terminateThreadPool() {
public void testThatAcceptableHeaderSizeGoesThroughTheRemoteClusterPort() throws Exception {
int messageLength = randomIntBetween(128, 256);
long requestId = randomLongBetween(1L, 1000L);
int acceptableHeaderSize = randomIntBetween(0, maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()));
int acceptableHeaderSize = randomIntBetween(0, maxHeaderSize - TcpHeader.HEADER_SIZE);
try (
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE,
Expand All @@ -163,8 +163,8 @@ public void testThatLargerHeaderSizeClosesTheRemoteClusterPort() throws Exceptio
int messageLength = randomIntBetween(128, 256);
long requestId = randomLongBetween(1L, 1000L);
int largeHeaderSize = randomIntBetween(
maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()) + 1,
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.headerSize(TransportVersion.current())
maxHeaderSize - TcpHeader.HEADER_SIZE + 1,
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.HEADER_SIZE
);
try (
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(
Expand All @@ -190,8 +190,8 @@ public void testThatLargerHeaderSizeIsAcceptableForDefaultTransportPort() throws
int messageLength = randomIntBetween(128, 256);
long requestId = randomLongBetween(1L, 1000L);
int largeHeaderSize = randomIntBetween(
maxHeaderSize - TcpHeader.headerSize(TransportVersion.current()) + 1,
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.headerSize(TransportVersion.current())
maxHeaderSize - TcpHeader.HEADER_SIZE + 1,
messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE - TcpHeader.HEADER_SIZE
);
try (
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(
Expand Down