Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,25 @@ public class Netty4TcpChannel implements TcpChannel {
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final ChannelStats stats = new ChannelStats();
private final boolean rstOnClose;
private volatile Exception channelError = null;

Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
this.channel = channel;
this.isServer = isServer;
this.profile = profile;
this.connectContext = new ListenableFuture<>();
this.rstOnClose = rstOnClose;
addListener(this.channel.closeFuture(), closeContext);
addListener(connectFuture, connectContext);
// The netty closeFuture is programmed to never return an exception.
// This listener takes close-causing exceptions reported during the
// channel lifetime, and reports them to the closeListener.
addListener(this.channel.closeFuture(), ActionListener.running(() -> {
if (channelError != null) {
closeContext.onFailure(channelError);
} else {
closeContext.onResponse(null);
}
}));
}

@Override
Expand Down Expand Up @@ -95,6 +105,11 @@ public void addConnectListener(ActionListener<Void> listener) {
connectContext.addListener(listener);
}

@Override
public void onException(Exception e) {
channelError = e;
}

@Override
public ChannelStats getChannelStats() {
return stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -281,6 +282,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile
rstOnClose,
connectFuture
);
addClosedExceptionLogger(nettyChannel);
channel.attr(CHANNEL_KEY).set(nettyChannel);

return nettyChannel;
Expand Down Expand Up @@ -312,14 +314,19 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
setupPipeline(ch, false);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
if (cause instanceof Error) {
channel.onException(new Exception(cause));
} else {
channel.onException((Exception) cause);
}
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand All @@ -337,17 +344,23 @@ protected ServerChannelInitializer(String name) {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
addClosedExceptionLogger(nettyTcpChannel);
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
setupPipeline(ch, isRemoteClusterServerChannel);
serverAcceptedChannel(nettyTcpChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
if (cause instanceof Error) {
channel.onException(new Exception(cause));
} else {
channel.onException((Exception) cause);
}
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand Down Expand Up @@ -383,12 +396,12 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
);
}

private static void addClosedExceptionLogger(Channel channel) {
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
private static void addClosedExceptionLogger(Netty4TcpChannel channel) {
channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> {
if (logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channel), e);
}
});
}));
}

@ChannelHandler.Sharable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable {
* channel. If the channel is already closed when the listener is added the listener will immediately be
* executed by the thread that is attempting to add the listener.
*
* When the close completes but an exception prompted the closure, the exception will be passed to the
* listener's onFailure method.
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<Void> listener);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu
return curr;
});
if (tracker.registered.compareAndSet(false, true)) {
channel.addCloseListener(ActionListener.wrap(r -> {
channel.addCloseListener(ActionListener.running(() -> {
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
assert removedTracker == tracker;
onChannelClosed(tracker);
}, e -> { assert false : new AssertionError("must not be here", e); }));
}));
}
return tracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public void close() {
}
}

public void closeAndFail(Exception e) {
if (closed.compareAndSet(false, true)) {
closeContext.onFailure(e);
}
}

@Override
public void onRemoved() {
if (removed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
e
);
channel.onException(e);
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void sendResponse(
),
ex
);
channel.onException(ex);
channel.close();
} else {
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
Expand Down Expand Up @@ -204,6 +205,7 @@ void sendErrorResponse(
} catch (Exception sendException) {
sendException.addSuppressed(error);
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
channel.onException(sendException);
channel.close();
}
}
Expand Down Expand Up @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
}
});
} catch (RuntimeException ex) {
channel.onException(ex);
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel {
*/
void addConnectListener(ActionListener<Void> listener);

/**
* Report an exception on this channel
*
* @param e the exception
*/
void onException(Exception e);

/**
* Returns stats about this channel
*/
Expand Down
39 changes: 36 additions & 3 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) {

@Override
public void close() {
handleClose(null);
}

@Override
public void closeAndFail(Exception e) {
handleClose(e);
}

private void handleClose(Exception e) {
if (isClosing.compareAndSet(false, true)) {
try {
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
CloseableChannel.closeChannels(channels, block);
} finally {
// Call the super method to trigger listeners
super.close();
if (e == null) {
super.close();
} else {
super.closeAndFail(e);
}
}
}
}
Expand Down Expand Up @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
}
} finally {
if (closeChannel) {
channel.onException(e);
CloseableChannel.closeChannel(channel);
}
}
Expand Down Expand Up @@ -1120,7 +1134,17 @@ public void onResponse(Void v) {
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.running(nodeChannels::close));
ch.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
nodeChannels.close();
}

@Override
public void onFailure(Exception e) {
nodeChannels.closeAndFail(e);
}
});
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
Expand Down Expand Up @@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) {

@Override
public void onFailure(Exception e) {
assert false : e; // never called
long closeTimeMillis = threadPool.relativeTimeInMillis();
logger.info(
() -> format(
"closed transport connection [{}] to [{}] with age [{}ms], exception:",
connectionId,
node,
closeTimeMillis - openTimeMillis
),
e
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp
TransportException;

/**
* The listener's {@link ActionListener#onResponse(Object)} method will be called when this
* connection is closed. No implementations currently throw an exception during close, so
* {@link ActionListener#onFailure(Exception)} will not be called.
* The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method
* will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called
* when the connection has successfully closed, but an exception has prompted the close.
*
* @param listener to be called
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand Down Expand Up @@ -238,6 +239,9 @@ public void testClosesChannelOnErrorInHandshake() throws Exception {
final AtomicBoolean isClosed = new AtomicBoolean();
channel.addCloseListener(ActionListener.running(() -> assertTrue(isClosed.compareAndSet(false, true))));

PlainActionFuture<Void> closeListener = new PlainActionFuture<>();
channel.addCloseListener(closeListener);

final TransportVersion remoteVersion = TransportVersionUtils.randomVersionBetween(
random(),
TransportVersionUtils.getFirstVersion(),
Expand All @@ -255,6 +259,11 @@ public void testClosesChannelOnErrorInHandshake() throws Exception {
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
handler.inboundMessage(channel, requestMessage);
assertTrue(isClosed.get());
assertTrue(closeListener.isDone());
try {
closeListener.get();
assert false : "channel should have an exception reported";
} catch (Exception e) {}
assertNull(channel.getMessageCaptor().get());
mockLog.assertAllExpectationsMatched();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ public class OutboundHandlerTests extends ESTestCase {
private InboundPipeline pipeline;
private OutboundHandler handler;
private FakeTcpChannel channel;
private PlainActionFuture<Void> closeListener;
private DiscoveryNode node;
private Compression.Scheme compressionScheme;

@Before
public void setUp() throws Exception {
super.setUp();
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
closeListener = new PlainActionFuture<>();
channel.addCloseListener(closeListener);
TransportAddress transportAddress = buildNewFakeTransportAddress();
node = DiscoveryNodeUtils.create("", transportAddress);
StatsTracker statsTracker = new StatsTracker();
Expand Down Expand Up @@ -378,6 +381,7 @@ public void onResponseSent(long requestId, String action, Exception error) {
assertThat(rme.getCause().getMessage(), equalTo("simulated cbe"));
}
assertTrue(channel.isOpen());
assertFalse(closeListener.isDone());
}

public void testFailToSendResponseThenFailToSendError() {
Expand All @@ -388,6 +392,8 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
throw new IllegalStateException("pipe broken");
}
};
closeListener = new PlainActionFuture<>();
channel.addCloseListener(closeListener);
TransportVersion version = TransportVersionUtils.randomVersion();
String action = randomAlphaOfLength(10);
long requestId = randomLongBetween(0, 300);
Expand Down Expand Up @@ -431,6 +437,11 @@ public void writeTo(StreamOutput out) {
assertNull(channel.getMessageCaptor().get());
assertNull(channel.getListenerCaptor().get());
assertFalse(channel.isOpen());
assertTrue(closeListener.isDone());
try {
closeListener.get();
assert false : "channel should have an exception reported";
} catch (Exception e) {}
}

public void testFailToSendHandshakeResponse() {
Expand Down Expand Up @@ -473,6 +484,11 @@ public void onResponseSent(long requestId, String action, Exception error) {
assertEquals(action, actionRef.get());
assertTrue(response.released.get());
assertFalse(channel.isOpen());
assertTrue(closeListener.isDone());
try {
closeListener.get();
assert false : "channel should have an exception reported";
} catch (Exception e) {}
}

public void testFailToSendErrorResponse() {
Expand All @@ -483,6 +499,8 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
throw new IllegalStateException("pipe broken");
}
};
closeListener = new PlainActionFuture<>();
channel.addCloseListener(closeListener);
TransportVersion version = TransportVersionUtils.randomVersion();
String action = randomAlphaOfLength(10);
long requestId = randomLongBetween(0, 300);
Expand Down Expand Up @@ -515,6 +533,11 @@ public void onResponseSent(long requestId, String action, Exception error) {
assertFalse(channel.isOpen());
assertNull(channel.getMessageCaptor().get());
assertNull(channel.getListenerCaptor().get());
assertTrue(closeListener.isDone());
try {
closeListener.get();
assert false : "channel should have an exception reported";
} catch (Exception e) {}
}

/**
Expand Down
Loading