Skip to content

Conversation

@DaveCTurner
Copy link
Contributor

The close-listeners are never completed exceptionally today so they do
not need the exception mangling of a ListenableFuture. The connect-
and remove-listeners sometimes see an exception if the connection
attempt fails, but they also do not need any exception-mangling.

This commit removes the exception-mangling by replacing these
ListenableFuture instances with SubscribableListener ones.

The close-listeners are never completed exceptionally today so they do
not need the exception mangling of a `ListenableFuture`. The connect-
and remove-listeners sometimes see an exception if the connection
attempt fails, but they also do not need any exception-mangling.

This commit removes the exception-mangling by replacing these
`ListenableFuture` instances with `SubscribableListener` ones.
@DaveCTurner DaveCTurner requested a review from schase-es May 9, 2025 07:30
@DaveCTurner DaveCTurner added >non-issue :Distributed Coordination/Network Http and internode communication implementations v9.1.0 labels May 9, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label May 9, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@DaveCTurner
Copy link
Contributor Author

Relates #127895 - as implemented today we get all sorts of unnecessary junk in the logs:

[2025-05-09T09:31:38,350][INFO ][o.e.t.TcpTransport       ][node_t3][transport_worker][T#2] closed transport connection [{}] to [{}] with age [{}ms], exception:
org.elasticsearch.common.util.concurrent.UncategorizedExecutionException: Failed execution
	at org.elasticsearch.action.support.SubscribableListener.wrapAsExecutionException(SubscribableListener.java:306) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.common.util.concurrent.ListenableFuture.wrapException(ListenableFuture.java:39) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.common.util.concurrent.ListenableFuture.wrapException(ListenableFuture.java:28) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.action.support.SubscribableListener.onFailure(SubscribableListener.java:262) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.transport.netty4.Netty4TcpChannel.lambda$new$0(Netty4TcpChannel.java:51) ~[main/:?]
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:218) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.transport.netty4.Netty4Utils.lambda$addListener$2(Netty4Utils.java:232) ~[main/:?]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1161) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:753) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:729) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:619) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1299) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:755) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:733) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.handler.logging.LoggingHandler.close(LoggingHandler.java:256) ~[netty-handler-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:757) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:733) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:560) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:906) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannel.close(AbstractChannel.java:243) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at org.elasticsearch.core.Releasables.close(Releasables.java:39) ~[elasticsearch-core-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.core.Releasables.close(Releasables.java:48) ~[elasticsearch-core-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.transport.netty4.Netty4TcpChannel.rstAndClose(Netty4TcpChannel.java:68) ~[main/:?]
	at org.elasticsearch.transport.netty4.Netty4TcpChannel.close(Netty4TcpChannel.java:61) ~[main/:?]
	at org.elasticsearch.core.IOUtils.close(IOUtils.java:71) ~[elasticsearch-core-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.core.IOUtils.close(IOUtils.java:119) ~[elasticsearch-core-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.common.network.CloseableChannel.closeChannels(CloseableChannel.java:82) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.common.network.CloseableChannel.closeChannel(CloseableChannel.java:71) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.common.network.CloseableChannel.closeChannel(CloseableChannel.java:61) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.transport.TcpTransport.handleException(TcpTransport.java:777) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.transport.TcpTransport.onException(TcpTransport.java:701) ~[elasticsearch-9.1.0-SNAPSHOT.jar:9.1.0-SNAPSHOT]
	at org.elasticsearch.transport.netty4.Netty4MessageInboundHandler.exceptionCaught(Netty4MessageInboundHandler.java:70) ~[main/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:346) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:325) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:317) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.handler.logging.LoggingHandler.exceptionCaught(LoggingHandler.java:214) ~[netty-handler-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:346) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:325) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:317) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1324) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:346) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:325) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:856) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:177) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:697) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:660) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
	at java.lang.Thread.run(Thread.java:1447) ~[?:?]
Caused by: java.util.concurrent.ExecutionException: java.net.SocketException: Connection reset
	... 59 more
Caused by: java.net.SocketException: Connection reset
	at sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:399) ~[?:?]
	at sun.nio.ch.SocketChannelImpl.implRead(SocketChannelImpl.java:431) ~[?:?]
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:489) ~[?:?]
	at org.elasticsearch.transport.netty4.CopyBytesSocketChannel.readFromSocketChannel(CopyBytesSocketChannel.java:132) ~[main/:?]
	at org.elasticsearch.transport.netty4.CopyBytesSocketChannel.doReadBytes(CopyBytesSocketChannel.java:117) ~[main/:?]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
	... 7 more

With this change we don't do any of that unnecessary wrapping and get something more manageable instead:

[2025-05-09T08:58:55,854][DEBUG][o.e.t.TcpTransport       ][node_t3][transport_worker][T#1] closed transport connection [{}] to [{}] with age [{}ms], exception:
java.net.SocketException: Connection reset
    at sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:399) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.implRead(SocketChannelImpl.java:431) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:489) ~[?:?]
    at org.elasticsearch.transport.netty4.CopyBytesSocketChannel.readFromSocketChannel(CopyBytesSocketChannel.java:132) ~[main/:?]
    at org.elasticsearch.transport.netty4.CopyBytesSocketChannel.doReadBytes(CopyBytesSocketChannel.java:117) ~[main/:?]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:697) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:660) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.118.Final.jar:4.1.118.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
    at java.lang.Thread.run(Thread.java:1447) ~[?:?]

@DaveCTurner
Copy link
Contributor Author

@schase-es just checking you've seen I've asked for you to review this

@schase-es
Copy link
Contributor

Hi -- yes, this curiously didn't show up where it's expected. I also need to do some better github filtering with my email...

Copy link
Contributor

@schase-es schase-es left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. There may be a few merge conflicts with my PR.

Apologies for the delay.

@DaveCTurner DaveCTurner added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label May 21, 2025
@DaveCTurner
Copy link
Contributor Author

No probs; looks like it merges cleanly, tho I've kicked off one more CI run on the merged result just to be sure.

@elasticsearchmachine elasticsearchmachine merged commit 3504c27 into elastic:main May 22, 2025
18 checks passed
@DaveCTurner DaveCTurner deleted the 2025/05/09/transport-connect-exception-mangling branch May 22, 2025 11:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Distributed Coordination/Network Http and internode communication implementations >non-issue Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants