Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,31 @@ public class Netty4TcpChannel implements TcpChannel {
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final ChannelStats stats = new ChannelStats();
private final boolean rstOnClose;
// Exception causing a close, reported to the closeContext listener
private volatile Exception closeException = null;

Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
this.channel = channel;
this.isServer = isServer;
this.profile = profile;
this.connectContext = new ListenableFuture<>();
this.rstOnClose = rstOnClose;
addListener(this.channel.closeFuture(), closeContext);
addListener(connectFuture, connectContext);
addListener(this.channel.closeFuture(), new ActionListener<>() {
@Override
public void onResponse(Void ignored) {
if (closeException != null) {
closeContext.onFailure(closeException);
} else {
closeContext.onResponse(null);
}
}

@Override
public void onFailure(Exception e) {
assert false : new AssertionError("netty channel closeFuture should never report a failure");
}
});
}

@Override
Expand Down Expand Up @@ -95,6 +111,11 @@ public void addConnectListener(ActionListener<Void> listener) {
connectContext.addListener(listener);
}

@Override
public void setCloseException(Exception e) {
closeException = e;
}

@Override
public ChannelStats getChannelStats() {
return stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Map;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;

Expand Down Expand Up @@ -308,18 +307,27 @@ protected void stopInternal() {
}, serverBootstraps::clear, () -> clientBootstrap = null);
}

private Exception exceptionFromThrowable(Throwable cause) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for including the ExceptionsHelper.maybeDieOnAnotherThread(cause); call in here too, just to make it clearer that we're not silently swallowing an Error here (which would be a bad bug). I'd have moved the channel.setCloseException call in here too I think. I could be persuaded to leave it like this if you feel strongly tho.

Copy link
Contributor Author

@schase-es schase-es May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of the reasoning for separating just this bit out, was that this pattern is potentially widely used enough to be in ExceptionsHelper. There's actually another exception handler at the bottom of this file that uses this, and it's in several other netty error receivers. Maybe another reason to wait, is that there is a parallel set of channel wrappers in the HTTP domain, that may have similar issues around exception reporting on close.

I also thought about making the TcpChannel setCloseException take a throwable instead, and do this internally.

Most of these exception handlers are short and have the same 2-3 things, depending on the context. Even though it's boilerplate and could be abstracted (plus or minus the channel type and netty pattern), I gravitate more towards being able to see those few things in an important exception handler instead of having to follow the code. I think this is okay for now?

if (cause instanceof Error) {
return new Exception(cause);
} else {
return (Exception) cause;
}
}

protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
setupPipeline(ch, false);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
channel.setCloseException(exceptionFromThrowable(cause));
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand All @@ -337,7 +345,6 @@ protected ServerChannelInitializer(String name) {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
Expand All @@ -348,6 +355,8 @@ protected void initChannel(Channel ch) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
channel.setCloseException(exceptionFromThrowable(cause));
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand Down Expand Up @@ -383,14 +392,6 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
);
}

private static void addClosedExceptionLogger(Channel channel) {
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
}
});
}

@ChannelHandler.Sharable
private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable {
* channel. If the channel is already closed when the listener is added the listener will immediately be
* executed by the thread that is attempting to add the listener.
*
* When the close completes but an exception prompted the closure, the exception will be passed to the
* listener's onFailure method.
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<Void> listener);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu
return curr;
});
if (tracker.registered.compareAndSet(false, true)) {
channel.addCloseListener(ActionListener.wrap(r -> {
channel.addCloseListener(ActionListener.running(() -> {
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
assert removedTracker == tracker;
onChannelClosed(tracker);
}, e -> { assert false : new AssertionError("must not be here", e); }));
}));
}
return tracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public void close() {
}
}

public void closeAndFail(Exception e) {
if (closed.compareAndSet(false, true)) {
closeContext.onFailure(e);
}
}

@Override
public void onRemoved() {
if (removed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
e
);
channel.setCloseException(e);
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void sendResponse(
),
ex
);
channel.setCloseException(ex);
channel.close();
} else {
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
Expand Down Expand Up @@ -204,6 +205,7 @@ void sendErrorResponse(
} catch (Exception sendException) {
sendException.addSuppressed(error);
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
channel.setCloseException(sendException);
channel.close();
}
}
Expand Down Expand Up @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
}
});
} catch (RuntimeException ex) {
channel.setCloseException(ex);
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel {
*/
void addConnectListener(ActionListener<Void> listener);

/**
* Report a close-causing exception on this channel
*
* @param e the exception
*/
void setCloseException(Exception e);

/**
* Returns stats about this channel
*/
Expand Down
39 changes: 36 additions & 3 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) {

@Override
public void close() {
handleClose(null);
}

@Override
public void closeAndFail(Exception e) {
handleClose(e);
}

private void handleClose(Exception e) {
if (isClosing.compareAndSet(false, true)) {
try {
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
CloseableChannel.closeChannels(channels, block);
} finally {
// Call the super method to trigger listeners
super.close();
if (e == null) {
super.close();
} else {
super.closeAndFail(e);
}
}
}
}
Expand Down Expand Up @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
}
} finally {
if (closeChannel) {
channel.setCloseException(e);
CloseableChannel.closeChannel(channel);
}
}
Expand Down Expand Up @@ -1120,7 +1134,17 @@ public void onResponse(Void v) {
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.running(nodeChannels::close));
ch.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
nodeChannels.close();
}

@Override
public void onFailure(Exception e) {
nodeChannels.closeAndFail(e);
}
});
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
Expand Down Expand Up @@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) {

@Override
public void onFailure(Exception e) {
assert false : e; // never called
long closeTimeMillis = threadPool.relativeTimeInMillis();
logger.debug(
() -> format(
"closed transport connection [{}] to [{}] with age [{}ms], exception:",
connectionId,
node,
closeTimeMillis - openTimeMillis
),
e
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp
TransportException;

/**
* The listener's {@link ActionListener#onResponse(Object)} method will be called when this
* connection is closed. No implementations currently throw an exception during close, so
* {@link ActionListener#onFailure(Exception)} will not be called.
* The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method
* will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called
* when the connection has successfully closed, but an exception has prompted the close.
*
* @param listener to be called
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -107,6 +108,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti

DiscoveryNode node = DiscoveryNodeUtils.create("", new TransportAddress(InetAddress.getLoopbackAddress(), 0));
Transport.Connection connection = new TestConnect(node);
PlainActionFuture<Void> closeListener = new PlainActionFuture<>();
connection.addCloseListener(closeListener);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
Expand Down Expand Up @@ -137,6 +140,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
connection.close();
}
assertTrue(connection.isClosed());
assertThat(closeListener.actionGet(), nullValue());
assertEquals(0, connectionManager.size());
assertEquals(1, nodeConnectedCount.get());
assertEquals(1, nodeDisconnectedCount.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand Down Expand Up @@ -238,6 +239,9 @@ public void testClosesChannelOnErrorInHandshake() throws Exception {
final AtomicBoolean isClosed = new AtomicBoolean();
channel.addCloseListener(ActionListener.running(() -> assertTrue(isClosed.compareAndSet(false, true))));

PlainActionFuture<Void> closeListener = new PlainActionFuture<>();
channel.addCloseListener(closeListener);

final TransportVersion remoteVersion = TransportVersionUtils.randomVersionBetween(
random(),
TransportVersionUtils.getFirstVersion(),
Expand All @@ -255,6 +259,8 @@ public void testClosesChannelOnErrorInHandshake() throws Exception {
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
handler.inboundMessage(channel, requestMessage);
assertTrue(isClosed.get());
assertTrue(closeListener.isDone());
expectThrows(Exception.class, () -> closeListener.get());
assertNull(channel.getMessageCaptor().get());
mockLog.assertAllExpectationsMatched();
}
Expand Down
Loading