- 
                Notifications
    
You must be signed in to change notification settings  - Fork 25.6k
 
Stop retaining transport responses for test-only listener #125163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
3791e17
              fd49527
              9897475
              b034274
              d5534c3
              681783e
              79cb99e
              04cb7d6
              17c451e
              4bee100
              7a34d93
              835639a
              af89489
              410040b
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -17,25 +17,35 @@ | |
| import org.elasticsearch.TransportVersions; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.common.bytes.CompositeBytesReference; | ||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.common.compress.CompressorFactory; | ||
| import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; | ||
| import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.common.network.CloseableChannel; | ||
| import org.elasticsearch.common.network.HandlingTimeTracker; | ||
| import org.elasticsearch.common.recycler.Recycler; | ||
| import org.elasticsearch.common.transport.NetworkExceptionHelper; | ||
| import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.RefCounted; | ||
| import org.elasticsearch.core.Releasable; | ||
| import org.elasticsearch.core.Releasables; | ||
| import org.elasticsearch.core.Streams; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.core.UpdateForV10; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| 
     | 
||
| import java.io.IOException; | ||
| import java.util.function.Supplier; | ||
| 
     | 
||
| import static org.elasticsearch.core.Strings.format; | ||
| 
     | 
||
| final class OutboundHandler { | ||
| public final class OutboundHandler { | ||
| 
     | 
||
| private static final Logger logger = LogManager.getLogger(OutboundHandler.class); | ||
| 
     | 
||
| 
          
            
          
           | 
    @@ -85,7 +95,7 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) { | |
| * thread. | ||
| */ | ||
| void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) { | ||
| internalSend(channel, bytes, null, listener); | ||
| internalSend(channel, bytes, () -> "", listener); | ||
| } | ||
| 
     | 
||
| /** | ||
| 
        
          
        
         | 
    @@ -104,18 +114,14 @@ void sendRequest( | |
| final boolean isHandshake | ||
| ) throws IOException, TransportException { | ||
| assert assertValidTransportVersion(transportVersion); | ||
| final OutboundMessage.Request message = new OutboundMessage.Request( | ||
| threadPool.getThreadContext(), | ||
| request, | ||
| transportVersion, | ||
| sendMessage( | ||
| channel, | ||
| action, | ||
| request, | ||
| requestId, | ||
| isHandshake, | ||
| compressionScheme | ||
| ); | ||
| sendMessage( | ||
| channel, | ||
| message, | ||
| compressionScheme, | ||
| transportVersion, | ||
| ResponseStatsConsumer.NONE, | ||
| () -> messageListener.onRequestSent(node, requestId, action, request, options) | ||
| ); | ||
| 
        
          
        
         | 
    @@ -138,17 +144,19 @@ void sendResponse( | |
| final ResponseStatsConsumer responseStatsConsumer | ||
| ) { | ||
| assert assertValidTransportVersion(transportVersion); | ||
| OutboundMessage.Response message = new OutboundMessage.Response( | ||
| threadPool.getThreadContext(), | ||
| response, | ||
| transportVersion, | ||
| requestId, | ||
| isHandshake, | ||
| compressionScheme | ||
| ); | ||
| assert response.hasReferences(); | ||
| try { | ||
| sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action)); | ||
| sendMessage( | ||
| channel, | ||
| null, | ||
| response, | ||
| requestId, | ||
| isHandshake, | ||
| compressionScheme, | ||
| transportVersion, | ||
| responseStatsConsumer, | ||
| () -> messageListener.onResponseSent(requestId, action) | ||
| ); | ||
| } catch (Exception ex) { | ||
| if (isHandshake) { | ||
| logger.error( | ||
| 
          
            
          
           | 
    @@ -178,16 +186,19 @@ void sendErrorResponse( | |
| final Exception error | ||
| ) { | ||
| assert assertValidTransportVersion(transportVersion); | ||
| OutboundMessage.Response message = new OutboundMessage.Response( | ||
| threadPool.getThreadContext(), | ||
| new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error), | ||
| transportVersion, | ||
| requestId, | ||
| false, | ||
| null | ||
| ); | ||
| var msg = new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error); | ||
| try { | ||
| sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, error)); | ||
| sendMessage( | ||
| channel, | ||
| null, | ||
| msg, | ||
| requestId, | ||
| false, | ||
| null, | ||
| transportVersion, | ||
| responseStatsConsumer, | ||
| () -> messageListener.onResponseSent(requestId, action, error) | ||
| ); | ||
| } catch (Exception sendException) { | ||
| sendException.addSuppressed(error); | ||
| logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException); | ||
| 
        
          
        
         | 
    @@ -197,38 +208,50 @@ void sendErrorResponse( | |
| 
     | 
||
| private void sendMessage( | ||
| TcpChannel channel, | ||
| OutboundMessage networkMessage, | ||
| @Nullable String requestAction, | ||
| Writeable writeable, | ||
| long requestId, | ||
| boolean isHandshake, | ||
| Compression.Scheme compressionScheme, | ||
| TransportVersion version, | ||
| ResponseStatsConsumer responseStatsConsumer, | ||
| Releasable onAfter | ||
| ) throws IOException { | ||
| final RecyclerBytesStreamOutput byteStreamOutput; | ||
| boolean bufferSuccess = false; | ||
| try { | ||
| byteStreamOutput = new RecyclerBytesStreamOutput(recycler); | ||
| bufferSuccess = true; | ||
| } finally { | ||
| if (bufferSuccess == false) { | ||
| Releasables.closeExpectNoException(onAfter); | ||
| } | ||
| } | ||
| compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme; | ||
| final BytesReference message; | ||
| boolean serializeSuccess = false; | ||
| final boolean isError = writeable instanceof RemoteTransportException; | ||
| final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler); | ||
| try { | ||
| message = networkMessage.serialize(byteStreamOutput); | ||
| message = serialize( | ||
| requestAction, | ||
| requestId, | ||
| isHandshake, | ||
| version, | ||
| isError, | ||
| compressionScheme, | ||
| writeable, | ||
| threadPool.getThreadContext(), | ||
| byteStreamOutput | ||
| ); | ||
| serializeSuccess = true; | ||
| } catch (Exception e) { | ||
| logger.warn(() -> "failed to serialize outbound message [" + networkMessage + "]", e); | ||
| logger.warn(() -> "failed to serialize outbound message [" + writeable + "]", e); | ||
| throw e; | ||
| } finally { | ||
| if (serializeSuccess == false) { | ||
| Releasables.close(byteStreamOutput, onAfter); | ||
| } | ||
| } | ||
| responseStatsConsumer.addResponseStats(message.length()); | ||
| final var responseType = writeable.getClass(); | ||
| final boolean compress = compressionScheme != null; | ||
| internalSend( | ||
| channel, | ||
| message, | ||
| networkMessage, | ||
| requestAction == null | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I said above, not 100% sure about this one, but then again, either we want this information or not though and this still seems cheaper than redundantly building the string. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep I think we want this info. Looks easy enough to plumb in the   | 
||
| ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}" | ||
| : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}", | ||
| ActionListener.releasing( | ||
| message instanceof ReleasableBytesReference r | ||
| ? Releasables.wrap(byteStreamOutput, onAfter, r) | ||
| 
        
          
        
         | 
    @@ -237,10 +260,101 @@ private void sendMessage( | |
| ); | ||
| } | ||
| 
     | 
||
| // public for tests | ||
| public static BytesReference serialize( | ||
| @Nullable String requestAction, | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get your point @DaveCTurner, setting request or response based on a null is not great, but then again, this method already has an absurd number of parameters? :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it needs a parameters object? I think I'd call it...  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll follow up with something like this: drops the   | 
||
| long requestId, | ||
| boolean isHandshake, | ||
| TransportVersion version, | ||
| boolean isError, | ||
| Compression.Scheme compressionScheme, | ||
| Writeable writeable, | ||
| ThreadContext threadContext, | ||
| RecyclerBytesStreamOutput byteStreamOutput | ||
| ) throws IOException { | ||
| byteStreamOutput.setTransportVersion(version); | ||
| byteStreamOutput.skip(TcpHeader.HEADER_SIZE); | ||
| threadContext.writeTo(byteStreamOutput); | ||
| if (requestAction != null) { | ||
| if (version.before(TransportVersions.V_8_0_0)) { | ||
| // empty features array | ||
| byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY); | ||
| } | ||
| byteStreamOutput.writeString(requestAction); | ||
| } | ||
| 
     | 
||
| final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE); | ||
| BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput); | ||
| byte status = requestAction != null ? 0 : TransportStatus.setResponse((byte) 0); | ||
                
       | 
||
| if (isHandshake) { | ||
| status = TransportStatus.setHandshake(status); | ||
| } | ||
| if (isError) { | ||
| status = TransportStatus.setError(status); | ||
| } | ||
| if (compressionScheme != null) { | ||
| status = TransportStatus.setCompress(status); | ||
| } | ||
| byteStreamOutput.seek(0); | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should at least assert that   | 
||
| TcpHeader.writeHeader(byteStreamOutput, requestId, status, version, message.length() - TcpHeader.HEADER_SIZE, variableHeaderLength); | ||
| return message; | ||
| } | ||
| 
     | 
||
| private static BytesReference serializeMessageBody( | ||
| Writeable writeable, | ||
| Compression.Scheme compressionScheme, | ||
| TransportVersion version, | ||
| RecyclerBytesStreamOutput byteStreamOutput | ||
| ) throws IOException { | ||
| // The compressible bytes stream will not close the underlying bytes stream | ||
| final StreamOutput stream = compressionScheme != null ? wrapCompressed(compressionScheme, byteStreamOutput) : byteStreamOutput; | ||
| final ReleasableBytesReference zeroCopyBuffer; | ||
| try { | ||
| stream.setTransportVersion(version); | ||
| if (writeable instanceof BytesTransportRequest bRequest) { | ||
| bRequest.writeThin(stream); | ||
| zeroCopyBuffer = bRequest.bytes; | ||
| } else if (writeable instanceof RemoteTransportException remoteTransportException) { | ||
| stream.writeException(remoteTransportException); | ||
| zeroCopyBuffer = ReleasableBytesReference.empty(); | ||
| } else { | ||
| writeable.writeTo(stream); | ||
| zeroCopyBuffer = ReleasableBytesReference.empty(); | ||
| } | ||
| } finally { | ||
| // We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker) | ||
| // are written. | ||
| if (compressionScheme != null) { | ||
| stream.close(); | ||
| } | ||
| } | ||
| final BytesReference msg = byteStreamOutput.bytes(); | ||
| if (zeroCopyBuffer.length() == 0) { | ||
| return msg; | ||
| } | ||
| zeroCopyBuffer.mustIncRef(); | ||
| return new ReleasableBytesReference(CompositeBytesReference.of(msg, zeroCopyBuffer), (RefCounted) zeroCopyBuffer); | ||
| } | ||
| 
     | 
||
| // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release | ||
| // resources and write EOS marker bytes but must not yet release the bytes themselves | ||
| private static StreamOutput wrapCompressed(Compression.Scheme compressionScheme, RecyclerBytesStreamOutput bytesStream) | ||
| throws IOException { | ||
| if (compressionScheme == Compression.Scheme.DEFLATE) { | ||
| return new OutputStreamStreamOutput( | ||
| CompressorFactory.COMPRESSOR.threadLocalOutputStream(org.elasticsearch.core.Streams.noCloseStream(bytesStream)) | ||
| ); | ||
| } else if (compressionScheme == Compression.Scheme.LZ4) { | ||
| return new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(Streams.noCloseStream(bytesStream))); | ||
| } else { | ||
| throw new IllegalArgumentException("Invalid compression scheme: " + compressionScheme); | ||
| } | ||
| } | ||
| 
     | 
||
| private void internalSend( | ||
| TcpChannel channel, | ||
| BytesReference reference, | ||
| @Nullable OutboundMessage message, | ||
| Supplier<String> messageDescription, | ||
| ActionListener<Void> listener | ||
| ) { | ||
| final long startTime = threadPool.rawRelativeTimeInMillis(); | ||
| 
          
            
          
           | 
    @@ -280,7 +394,7 @@ private void maybeLogSlowMessage(boolean success) { | |
| logger.warn( | ||
| "sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn " | ||
| + "threshold of [{}ms] with success [{}]", | ||
| message, | ||
| messageDescription.get(), | ||
| messageSize, | ||
| channel, | ||
| took, | ||
| 
          
            
          
           | 
    ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.