diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 1a9043d093feb..9b829fcaf837e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -44,49 +44,17 @@ final class TransportHandshaker { * ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later * messages. * - * This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure - * as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response - * to any v6080099 or v8800000 requests that it receives. + * This version supports two handshake protocols, v7170099 and v8800000, which respectively have the same message structure as the + * transport protocols of v7.17.0, and v8.18.0. This node only sends v8800000 requests, but it can send a valid response to any v7170099 + * requests that it receives. * * Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be - * stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did - * rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer + * stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did rely + * on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer * true. * * Here are some example messages, broken down to show their structure. See TransportHandshakerRawMessageTests for supporting tests. * - * ## v6080099 Request: - * - * 45 53 -- 'ES' marker - * 00 00 00 34 -- total message length - * 00 00 00 00 00 00 00 01 -- request ID - * 08 -- status flags (0b1000 == handshake request) - * 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099) - * 00 -- no request headers [1] - * 00 -- no response headers [1] - * 01 -- one feature [2] - * 06 -- feature name length - * 78 2d 70 61 63 6b -- feature name 'x-pack' - * 16 -- action string size - * 69 6e 74 65 72 6e 61 6c } - * 3a 74 63 70 2f 68 61 6e }- ASCII representation of HANDSHAKE_ACTION_NAME - * 64 73 68 61 6b 65 } - * 00 -- no parent task ID [3] - * 04 -- payload length - * 8b d5 b5 03 -- max acceptable protocol version (vInt: 00000011 10110101 11010101 10001011 == 7170699) - * - * ## v6080099 Response: - * - * 45 53 -- 'ES' marker - * 00 00 00 13 -- total message length - * 00 00 00 00 00 00 00 01 -- request ID (copied from request) - * 09 -- status flags (0b1001 == handshake response) - * 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099, copied from request) - * 00 -- no request headers [1] - * 00 -- no response headers [1] - * c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099) - * - * * ## v7170099 Requests: * * 45 53 -- 'ES' marker @@ -158,14 +126,9 @@ final class TransportHandshaker { * [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure. */ - static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99); static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99); static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0); - static final Set ALLOWED_HANDSHAKE_VERSIONS = Set.of( - V7_HANDSHAKE_VERSION, - V8_HANDSHAKE_VERSION, - V9_HANDSHAKE_VERSION - ); + static final Set ALLOWED_HANDSHAKE_VERSIONS = Set.of(V8_HANDSHAKE_VERSION, V9_HANDSHAKE_VERSION); static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; private final ConcurrentMap pendingHandshakes = new ConcurrentHashMap<>(); @@ -203,7 +166,7 @@ void sendHandshake( ); boolean success = false; try { - handshakeRequestSender.sendRequest(node, channel, requestId, V8_HANDSHAKE_VERSION); + handshakeRequestSender.sendRequest(node, channel, requestId, V9_HANDSHAKE_VERSION); threadPool.schedule( () -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")), diff --git a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java index 9b56cd3bde53c..cfb3cc68e035f 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.transport.InboundDecoder.ChannelType; @@ -126,105 +125,6 @@ public void testDecode() throws IOException { } - @UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // can delete test in v9 - public void testDecodePreHeaderSizeVariableInt() throws IOException { - Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null); - String action = "test-request"; - long requestId = randomNonNegativeLong(); - final TransportVersion preHeaderVariableInt = TransportHandshaker.V7_HANDSHAKE_VERSION; - final String contentValue = randomAlphaOfLength(100); - // 8.0 is only compatible with handshakes on a pre-variable int version - final OutboundMessage message = new OutboundMessage.Request( - threadContext, - new TestRequest(contentValue), - preHeaderVariableInt, - action, - requestId, - true, - compressionScheme - ); - - try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { - final BytesReference totalBytes = message.serialize(os); - int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt); - - InboundDecoder decoder = new InboundDecoder(recycler); - final ArrayList fragments = new ArrayList<>(); - final ReleasableBytesReference releasable1 = wrapAsReleasable(totalBytes); - int bytesConsumed = decoder.decode(releasable1, fragments::add); - assertEquals(partialHeaderSize, bytesConsumed); - assertTrue(releasable1.hasReferences()); - - final Header header = (Header) fragments.get(0); - assertEquals(requestId, header.getRequestId()); - assertEquals(preHeaderVariableInt, header.getVersion()); - if (compressionScheme == null) { - assertFalse(header.isCompressed()); - } else { - assertTrue(header.isCompressed()); - } - assertTrue(header.isHandshake()); - assertTrue(header.isRequest()); - assertTrue(header.needsToReadVariableHeader()); - fragments.clear(); - - final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed); - final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2); - int bytesConsumed2 = decoder.decode(releasable2, fragments::add); - if (compressionScheme == null) { - assertEquals(2, fragments.size()); - } else { - assertEquals(3, fragments.size()); - final Object body = fragments.get(1); - assertThat(body, instanceOf(ReleasableBytesReference.class)); - ((ReleasableBytesReference) body).close(); - } - assertEquals(InboundDecoder.END_CONTENT, fragments.get(fragments.size() - 1)); - assertEquals(totalBytes.length() - bytesConsumed, bytesConsumed2); - } - } - - public void testDecodeHandshakeV7Compatibility() throws IOException { - String action = "test-request"; - long requestId = randomNonNegativeLong(); - final String headerKey = randomAlphaOfLength(10); - final String headerValue = randomAlphaOfLength(20); - threadContext.putHeader(headerKey, headerValue); - TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION; - OutboundMessage message = new OutboundMessage.Request( - threadContext, - new TestRequest(randomAlphaOfLength(100)), - handshakeCompat, - action, - requestId, - true, - null - ); - - try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { - final BytesReference bytes = message.serialize(os); - int totalHeaderSize = TcpHeader.headerSize(handshakeCompat); - - InboundDecoder decoder = new InboundDecoder(recycler); - final ArrayList fragments = new ArrayList<>(); - final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes); - int bytesConsumed = decoder.decode(releasable1, fragments::add); - assertEquals(totalHeaderSize, bytesConsumed); - assertTrue(releasable1.hasReferences()); - - final Header header = (Header) fragments.get(0); - assertEquals(requestId, header.getRequestId()); - assertEquals(handshakeCompat, header.getVersion()); - assertFalse(header.isCompressed()); - assertTrue(header.isHandshake()); - assertTrue(header.isRequest()); - // TODO: On 9.0 this will be true because all compatible versions with contain the variable header int - assertTrue(header.needsToReadVariableHeader()); - fragments.clear(); - } - - } - public void testDecodeHandshakeV8Compatibility() throws IOException { doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, null); doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE); @@ -453,46 +353,6 @@ public void testCompressedDecode() throws IOException { } - public void testCompressedDecodeHandshakeCompatibility() throws IOException { - String action = "test-request"; - long requestId = randomNonNegativeLong(); - final String headerKey = randomAlphaOfLength(10); - final String headerValue = randomAlphaOfLength(20); - threadContext.putHeader(headerKey, headerValue); - TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION; - OutboundMessage message = new OutboundMessage.Request( - threadContext, - new TestRequest(randomAlphaOfLength(100)), - handshakeCompat, - action, - requestId, - true, - Compression.Scheme.DEFLATE - ); - - try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { - final BytesReference bytes = message.serialize(os); - int totalHeaderSize = TcpHeader.headerSize(handshakeCompat); - - InboundDecoder decoder = new InboundDecoder(recycler); - final ArrayList fragments = new ArrayList<>(); - final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes); - int bytesConsumed = decoder.decode(releasable1, fragments::add); - assertEquals(totalHeaderSize, bytesConsumed); - assertTrue(releasable1.hasReferences()); - - final Header header = (Header) fragments.get(0); - assertEquals(requestId, header.getRequestId()); - assertEquals(handshakeCompat, header.getVersion()); - assertTrue(header.isCompressed()); - assertTrue(header.isHandshake()); - assertTrue(header.isRequest()); - // TODO: On 9.0 this will be true because all compatible versions with contain the variable header int - assertTrue(header.needsToReadVariableHeader()); - fragments.clear(); - } - } - public void testVersionIncompatibilityDecodeException() throws IOException { String action = "test-request"; long requestId = randomNonNegativeLong(); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java index de44ca70f2005..2bac41199ab83 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerRawMessageTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.UpdateForV10; -import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.TransportVersionUtils; @@ -38,56 +37,6 @@ public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase { - @UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // remove support for v7 handshakes in v9 - public void testV7Handshake() throws Exception { - final BytesRef handshakeRequestBytes; - final var requestId = randomNonNegativeLong(); - try (var outputStream = new BytesStreamOutput()) { - outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - outputStream.writeLong(requestId); - outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0))); - outputStream.writeInt(TransportHandshaker.V7_HANDSHAKE_VERSION.id()); - outputStream.writeByte((byte) 0); // no request headers; - outputStream.writeByte((byte) 0); // no response headers; - outputStream.writeStringArray(new String[] { "x-pack" }); // one feature - outputStream.writeString("internal:tcp/handshake"); - outputStream.writeByte((byte) 0); // no parent task ID; - - final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id(); - assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt - outputStream.writeByte((byte) 4); // payload length - outputStream.writeVInt(requestNodeTransportVersionId); - - handshakeRequestBytes = outputStream.bytes().toBytesRef(); - } - - final BytesRef handshakeResponseBytes; - try (var socket = openTransportConnection()) { - var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream()); - streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII)); - streamOutput.writeInt(handshakeRequestBytes.length); - streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length); - streamOutput.flush(); - - var streamInput = new InputStreamStreamInput(socket.getInputStream()); - assertEquals((byte) 'E', streamInput.readByte()); - assertEquals((byte) 'S', streamInput.readByte()); - var responseLength = streamInput.readInt(); - handshakeResponseBytes = streamInput.readBytesRef(responseLength); - } - - try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) { - assertEquals(requestId, inputStream.readLong()); - assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte()); - assertEquals(TransportHandshaker.V7_HANDSHAKE_VERSION.id(), inputStream.readInt()); - assertEquals((byte) 0, inputStream.readByte()); // no request headers - assertEquals((byte) 0, inputStream.readByte()); // no response headers - inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - assertEquals(TransportVersion.current().id(), inputStream.readVInt()); - assertEquals(-1, inputStream.read()); - } - } - @UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // remove support for v8 handshakes in v10 public void testV8Handshake() throws Exception { final BytesRef handshakeRequestBytes; @@ -223,11 +172,10 @@ public void testOutboundHandshake() throws Exception { try (var inputStream = new BytesArray(handshakeRequestBytes).streamInput()) { assertThat(inputStream.readLong(), greaterThan(0L)); assertEquals(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), inputStream.readByte()); - assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt()); - assertEquals(0x1a, inputStream.readInt()); // length of variable-length header, always 0x1a + assertEquals(TransportHandshaker.V9_HANDSHAKE_VERSION.id(), inputStream.readInt()); + assertEquals(0x19, inputStream.readInt()); // length of variable-length header, always 0x19 assertEquals((byte) 0, inputStream.readByte()); // no request headers assertEquals((byte) 0, inputStream.readByte()); // no response headers - assertEquals((byte) 0, inputStream.readByte()); // no features assertEquals("internal:tcp/handshake", inputStream.readString()); assertEquals((byte) 0, inputStream.readByte()); // no parent task inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION); @@ -236,8 +184,9 @@ public void testOutboundHandshake() throws Exception { } try (var inputStream = new BytesArray(payloadBytes).streamInput()) { - inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION); + inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION); assertEquals(TransportVersion.current().id(), inputStream.readVInt()); + assertEquals(Build.current().version(), inputStream.readString()); assertEquals(-1, inputStream.read()); } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index d260d66157651..4fd82480c4e5e 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.UpdateForV9; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; @@ -39,8 +39,8 @@ public class TransportHandshakerTests extends ESTestCase { private TestThreadPool threadPool; private TransportHandshaker.HandshakeRequestSender requestSender; - @UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) - private static final TransportVersion HANDSHAKE_REQUEST_VERSION = TransportHandshaker.V8_HANDSHAKE_VERSION; + @UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // new handshake version required in v10 + private static final TransportVersion HANDSHAKE_REQUEST_VERSION = TransportHandshaker.V9_HANDSHAKE_VERSION; @Override public void setUp() throws Exception { @@ -133,10 +133,8 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException verify(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION); - TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest( - TransportVersion.current(), - randomIdentifier() - ); + final var buildVersion = randomIdentifier(); + final var handshakeRequest = new TransportHandshaker.HandshakeRequest(TransportVersion.current(), buildVersion); BytesStreamOutput currentHandshakeBytes = new BytesStreamOutput(); currentHandshakeBytes.setTransportVersion(HANDSHAKE_REQUEST_VERSION); handshakeRequest.writeTo(currentHandshakeBytes); @@ -145,17 +143,27 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException BytesStreamOutput futureHandshake = new BytesStreamOutput(); TaskId.EMPTY_TASK_ID.writeTo(lengthCheckingHandshake); TaskId.EMPTY_TASK_ID.writeTo(futureHandshake); + final var extraDataSize = between(0, 1024); try (BytesStreamOutput internalMessage = new BytesStreamOutput()) { Version.writeVersion(Version.CURRENT, internalMessage); + internalMessage.writeString(buildVersion); lengthCheckingHandshake.writeBytesReference(internalMessage.bytes()); - internalMessage.write(new byte[1024]); + internalMessage.write(new byte[extraDataSize]); futureHandshake.writeBytesReference(internalMessage.bytes()); } StreamInput futureHandshakeStream = futureHandshake.bytes().streamInput(); // We check that the handshake we serialize for this test equals the actual request. // Otherwise, we need to update the test. assertEquals(currentHandshakeBytes.bytes().length(), lengthCheckingHandshake.bytes().length()); - assertEquals(1031, futureHandshakeStream.available()); + final var expectedInternalMessageSize = 4 /* transport version id */ + + (1 + buildVersion.length()) /* length prefixed release version string */ + + extraDataSize; + assertEquals( + 1 /* EMPTY_TASK_ID */ + + (expectedInternalMessageSize < 0x80 ? 1 : 2) /* internalMessage size vInt */ + + expectedInternalMessageSize /* internalMessage */, + futureHandshakeStream.available() + ); final PlainActionFuture responseFuture = new PlainActionFuture<>(); final TestTransportChannel channel = new TestTransportChannel(responseFuture); handshaker.handleHandshake(channel, reqId, futureHandshakeStream); @@ -166,43 +174,6 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException assertEquals(TransportVersion.current(), response.getTransportVersion()); } - @UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // v7 handshakes are not supported in v9 - public void testReadV7HandshakeRequest() throws IOException { - final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random()); - - final var requestPayloadStreamOutput = new BytesStreamOutput(); - requestPayloadStreamOutput.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - requestPayloadStreamOutput.writeVInt(transportVersion.id()); - - final var requestBytesStreamOutput = new BytesStreamOutput(); - requestBytesStreamOutput.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - TaskId.EMPTY_TASK_ID.writeTo(requestBytesStreamOutput); - requestBytesStreamOutput.writeBytesReference(requestPayloadStreamOutput.bytes()); - - final var requestBytesStream = requestBytesStreamOutput.bytes().streamInput(); - requestBytesStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - final var handshakeRequest = new TransportHandshaker.HandshakeRequest(requestBytesStream); - - assertEquals(transportVersion, handshakeRequest.transportVersion); - assertEquals(transportVersion.toReleaseVersion(), handshakeRequest.releaseVersion); - } - - @UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // v7 handshakes are not supported in v9 - public void testReadV7HandshakeResponse() throws IOException { - final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random()); - - final var responseBytesStreamOutput = new BytesStreamOutput(); - responseBytesStreamOutput.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - responseBytesStreamOutput.writeVInt(transportVersion.id()); - - final var responseBytesStream = responseBytesStreamOutput.bytes().streamInput(); - responseBytesStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION); - final var handshakeResponse = new TransportHandshaker.HandshakeResponse(responseBytesStream); - - assertEquals(transportVersion, handshakeResponse.getTransportVersion()); - assertEquals(transportVersion.toReleaseVersion(), handshakeResponse.getReleaseVersion()); - } - public void testReadV8HandshakeRequest() throws IOException { final var transportVersion = TransportVersionUtils.randomCompatibleVersion(random()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 4595fbf286077..0df978fe4937e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2758,8 +2758,8 @@ public void handleException(TransportException exp) { TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); - assertEquals(29, transportStats.getRxSize().getBytes()); - assertEquals(55, transportStats.getTxSize().getBytes()); + assertEquals(35, transportStats.getRxSize().getBytes()); + assertEquals(60, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest( connection, @@ -2773,16 +2773,16 @@ public void handleException(TransportException exp) { TransportStats transportStats = serviceC.transport.getStats(); // request has been send assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); - assertEquals(29, transportStats.getRxSize().getBytes()); - assertEquals(114, transportStats.getTxSize().getBytes()); + assertEquals(35, transportStats.getRxSize().getBytes()); + assertEquals(119, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); stats = serviceC.transport.getStats(); // response has been received assertEquals(2, stats.getRxCount()); assertEquals(2, stats.getTxCount()); - assertEquals(54, stats.getRxSize().getBytes()); - assertEquals(114, stats.getTxSize().getBytes()); + assertEquals(60, stats.getRxSize().getBytes()); + assertEquals(119, stats.getTxSize().getBytes()); } finally { serviceC.close(); } @@ -2873,8 +2873,8 @@ public void handleException(TransportException exp) { TransportStats transportStats = serviceC.transport.getStats(); // request has been sent assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); - assertEquals(29, transportStats.getRxSize().getBytes()); - assertEquals(55, transportStats.getTxSize().getBytes()); + assertEquals(35, transportStats.getRxSize().getBytes()); + assertEquals(60, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest( connection, @@ -2888,8 +2888,8 @@ public void handleException(TransportException exp) { TransportStats transportStats = serviceC.transport.getStats(); // request has been sent assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); - assertEquals(29, transportStats.getRxSize().getBytes()); - assertEquals(114, transportStats.getTxSize().getBytes()); + assertEquals(35, transportStats.getRxSize().getBytes()); + assertEquals(119, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2904,8 +2904,8 @@ public void handleException(TransportException exp) { String failedMessage = "Unexpected read bytes size. The transport exception that was received=" + exception; // 57 bytes are the non-exception message bytes that have been received. It should include the initial // handshake message and the header, version, etc bytes in the exception message. - assertEquals(failedMessage, 57 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); - assertEquals(114, stats.getTxSize().getBytes()); + assertEquals(failedMessage, 63 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); + assertEquals(119, stats.getTxSize().getBytes()); } finally { serviceC.close(); }