diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index dcc4e08b52c20..a83d1019e7c64 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -116,6 +115,7 @@ void sendRequest( assert assertValidTransportVersion(transportVersion); sendMessage( channel, + MessageDirection.REQUEST, action, request, requestId, @@ -148,7 +148,8 @@ void sendResponse( try { sendMessage( channel, - null, + MessageDirection.RESPONSE, + action, response, requestId, isHandshake, @@ -190,7 +191,8 @@ void sendErrorResponse( try { sendMessage( channel, - null, + MessageDirection.RESPONSE_ERROR, + action, msg, requestId, false, @@ -206,29 +208,36 @@ void sendErrorResponse( } } + public enum MessageDirection { + REQUEST, + RESPONSE, + RESPONSE_ERROR + } + private void sendMessage( TcpChannel channel, - @Nullable String requestAction, + MessageDirection messageDirection, + String action, Writeable writeable, long requestId, boolean isHandshake, - Compression.Scheme compressionScheme, + Compression.Scheme possibleCompressionScheme, TransportVersion version, ResponseStatsConsumer responseStatsConsumer, Releasable onAfter ) throws IOException { - compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme; + assert action != null; + final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme; final BytesReference message; boolean serializeSuccess = false; - final boolean isError = writeable instanceof RemoteTransportException; final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler); try { message = serialize( - requestAction, + messageDirection, + action, requestId, isHandshake, version, - isError, compressionScheme, writeable, threadPool.getThreadContext(), @@ -244,14 +253,23 @@ private void sendMessage( } } responseStatsConsumer.addResponseStats(message.length()); - final var responseType = writeable.getClass(); - final boolean compress = compressionScheme != null; + final var messageType = writeable.getClass(); internalSend( channel, message, - requestAction == null - ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}" - : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}", + () -> (messageDirection == MessageDirection.REQUEST ? "Request{" : "Response{") + + action + + "}{id=" + + requestId + + "}{err=" + + (messageDirection == MessageDirection.RESPONSE_ERROR) + + "}{cs=" + + compressionScheme + + "}{hs=" + + isHandshake + + "}{t=" + + messageType + + "}", ActionListener.releasing( message instanceof ReleasableBytesReference r ? Releasables.wrap(byteStreamOutput, onAfter, r) @@ -262,38 +280,39 @@ private void sendMessage( // public for tests public static BytesReference serialize( - @Nullable String requestAction, + MessageDirection messageDirection, + String action, long requestId, boolean isHandshake, TransportVersion version, - boolean isError, Compression.Scheme compressionScheme, Writeable writeable, ThreadContext threadContext, RecyclerBytesStreamOutput byteStreamOutput ) throws IOException { + assert action != null; assert byteStreamOutput.position() == 0; byteStreamOutput.setTransportVersion(version); byteStreamOutput.skip(TcpHeader.HEADER_SIZE); threadContext.writeTo(byteStreamOutput); - if (requestAction != null) { + if (messageDirection == MessageDirection.REQUEST) { if (version.before(TransportVersions.V_8_0_0)) { // empty features array byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY); } - byteStreamOutput.writeString(requestAction); + byteStreamOutput.writeString(action); } final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE); BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput); byte status = 0; - if (requestAction == null) { + if (messageDirection != MessageDirection.REQUEST) { status = TransportStatus.setResponse(status); } if (isHandshake) { status = TransportStatus.setHandshake(status); } - if (isError) { + if (messageDirection == MessageDirection.RESPONSE_ERROR) { status = TransportStatus.setError(status); } if (compressionScheme != null) { @@ -316,6 +335,8 @@ private static BytesReference serializeMessageBody( try { stream.setTransportVersion(version); if (writeable instanceof BytesTransportRequest bRequest) { + assert stream == byteStreamOutput; + assert compressionScheme == null; bRequest.writeThin(stream); zeroCopyBuffer = bRequest.bytes; } else if (writeable instanceof RemoteTransportException remoteTransportException) { diff --git a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java index 50fbb2ae4895e..63599516ec2c1 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java @@ -56,32 +56,17 @@ public void testDecode() throws IOException { } try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { - final BytesReference totalBytes; - if (isRequest) { - totalBytes = OutboundHandler.serialize( - action, - requestId, - false, - TransportVersion.current(), - false, - null, - new TestRequest(randomAlphaOfLength(100)), - threadContext, - os - ); - } else { - totalBytes = OutboundHandler.serialize( - null, - requestId, - false, - TransportVersion.current(), - false, - null, - new TestResponse(randomAlphaOfLength(100)), - threadContext, - os - ); - } + final BytesReference totalBytes = OutboundHandler.serialize( + isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE, + action, + requestId, + false, + TransportVersion.current(), + null, + isRequest ? new TestRequest(randomAlphaOfLength(100)) : new TestResponse(randomAlphaOfLength(100)), + threadContext, + os + ); int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize); @@ -144,11 +129,11 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { final BytesReference bytes = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, action, requestId, true, transportVersion, - false, compressionScheme, new TestRequest(randomAlphaOfLength(100)), threadContext, @@ -195,11 +180,11 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception { try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { final BytesReference bytes = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, action, requestId, isHandshake, version, - false, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null), new TestRequest(randomAlphaOfLength(100)), threadContext, @@ -243,11 +228,11 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception { try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { final BytesReference bytes = OutboundHandler.serialize( - null, + OutboundHandler.MessageDirection.RESPONSE, + "test:action", requestId, isHandshake, version, - false, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null), new TestRequest(randomAlphaOfLength(100)), threadContext, @@ -281,38 +266,23 @@ public void testCompressedDecode() throws IOException { } else { threadContext.addResponseHeader(headerKey, headerValue); } - final BytesReference totalBytes; - TransportMessage transportMessage; Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { - if (isRequest) { - transportMessage = new TestRequest(randomAlphaOfLength(100)); - totalBytes = OutboundHandler.serialize( - action, - requestId, - false, - TransportVersion.current(), - false, - scheme, - transportMessage, - threadContext, - os - ); - } else { - transportMessage = new TestResponse(randomAlphaOfLength(100)); - totalBytes = OutboundHandler.serialize( - null, - requestId, - false, - TransportVersion.current(), - false, - scheme, - transportMessage, - threadContext, - os - ); - } + final TransportMessage transportMessage = isRequest + ? new TestRequest(randomAlphaOfLength(100)) + : new TestResponse(randomAlphaOfLength(100)); + final BytesReference totalBytes = OutboundHandler.serialize( + isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE, + action, + requestId, + false, + TransportVersion.current(), + scheme, + transportMessage, + threadContext, + os + ); final BytesStreamOutput out = new BytesStreamOutput(); transportMessage.writeTo(out); final BytesReference uncompressedBytes = out.bytes(); @@ -373,11 +343,11 @@ public void testVersionIncompatibilityDecodeException() throws IOException { final ReleasableBytesReference releasable1; try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { final BytesReference bytes = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, action, requestId, false, incompatibleVersion, - false, Compression.Scheme.DEFLATE, new TestRequest(randomAlphaOfLength(100)), threadContext, diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 0a11c413a43bf..a683c2332c451 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -172,11 +172,11 @@ public TestResponse read(StreamInput in) throws IOException { String requestValue = randomAlphaOfLength(10); BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); BytesReference fullRequestBytes = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, action, requestId, false, TransportVersion.current(), - false, null, new TestRequest(requestValue), threadPool.getThreadContext(), diff --git a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java index 9f4ce4811a9e7..60926f28eadbe 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java @@ -114,11 +114,11 @@ public void testPipelineHandling() throws IOException { if (rarely()) { messageData = new MessageData(version, requestId, true, compressionScheme, breakThisAction, null); message = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, breakThisAction, requestId, false, version, - false, compressionScheme, new TestRequest(value), threadContext, @@ -128,11 +128,11 @@ public void testPipelineHandling() throws IOException { } else { messageData = new MessageData(version, requestId, true, compressionScheme, actionName, value); message = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, actionName, requestId, false, version, - false, compressionScheme, new TestRequest(value), threadContext, @@ -142,11 +142,11 @@ public void testPipelineHandling() throws IOException { } else { messageData = new MessageData(version, requestId, false, compressionScheme, null, value); message = OutboundHandler.serialize( - null, + OutboundHandler.MessageDirection.RESPONSE, + actionName, requestId, false, version, - false, compressionScheme, new TestResponse(value), threadContext, @@ -219,32 +219,17 @@ public void testDecodeExceptionIsPropagated() throws IOException { final boolean isRequest = randomBoolean(); final long requestId = randomNonNegativeLong(); - BytesReference message; - if (isRequest) { - message = OutboundHandler.serialize( - actionName, - requestId, - false, - invalidVersion, - false, - null, - new TestRequest(value), - threadContext, - streamOutput - ); - } else { - message = OutboundHandler.serialize( - null, - requestId, - false, - invalidVersion, - false, - null, - new TestResponse(value), - threadContext, - streamOutput - ); - } + final BytesReference message = OutboundHandler.serialize( + isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE, + actionName, + requestId, + false, + invalidVersion, + null, + isRequest ? new TestRequest(value) : new TestResponse(value), + threadContext, + streamOutput + ); try (ReleasableBytesReference releasable = ReleasableBytesReference.wrap(message)) { expectThrows(IllegalStateException.class, () -> pipeline.handleBytes(new FakeTcpChannel(), releasable)); @@ -275,32 +260,17 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { final boolean isRequest = randomBoolean(); final long requestId = randomNonNegativeLong(); - final BytesReference reference; - if (isRequest) { - reference = OutboundHandler.serialize( - actionName, - requestId, - false, - version, - false, - null, - new TestRequest(value), - threadContext, - streamOutput - ); - } else { - reference = OutboundHandler.serialize( - null, - requestId, - false, - version, - false, - null, - new TestResponse(value), - threadContext, - streamOutput - ); - } + final BytesReference reference = OutboundHandler.serialize( + isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE, + actionName, + requestId, + false, + version, + null, + isRequest ? new TestRequest(value) : new TestResponse(value), + threadContext, + streamOutput + ); final int variableHeaderSize = reference.getInt(TcpHeader.HEADER_SIZE - 4); final int totalHeaderSize = TcpHeader.HEADER_SIZE + variableHeaderSize; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index 2167a9d6bcad7..a361f08009955 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -71,11 +71,11 @@ private BytesReference buildRequest() throws IOException { Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null); try (RecyclerBytesStreamOutput bytesStreamOutput = new RecyclerBytesStreamOutput(recycler)) { return OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, "internal:test", randomInt(30), false, TransportVersion.current(), - false, compress, new EmptyRequest(), new ThreadContext(Settings.EMPTY), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportAuthenticationTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportAuthenticationTests.java index 8fb4d5c004a3f..fcf95f2176c93 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportAuthenticationTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransportAuthenticationTests.java @@ -334,11 +334,11 @@ public void testConnectionDisconnectedWhenAuthnFails() throws Exception { Recycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler); BytesReference bytesReference = OutboundHandler.serialize( + OutboundHandler.MessageDirection.REQUEST, "internal:whatever", randomNonNegativeLong(), false, TransportVersion.current(), - false, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null), new EmptyRequest(), threadPool.getThreadContext(),