Skip to content

Commit 3ffdba7

Browse files
committed
Addressed review feedback:
- renamed channelError and onException - removed dead channel error logging code - used expectThrows pattern in tests - added closeListener test - added assert to onFailure branch of netty channel future/listener adapter - de-duplicated throwable/exception adapting code - log transport errors at debug
1 parent 7498303 commit 3ffdba7

File tree

11 files changed

+50
-67
lines changed

11 files changed

+50
-67
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public class Netty4TcpChannel implements TcpChannel {
3434
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
3535
private final ChannelStats stats = new ChannelStats();
3636
private final boolean rstOnClose;
37-
private volatile Exception channelError = null;
37+
// Exception causing a close, reported to the closeContext listener
38+
private volatile Exception closeException = null;
3839

3940
Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
4041
this.channel = channel;
@@ -43,16 +44,21 @@ public class Netty4TcpChannel implements TcpChannel {
4344
this.connectContext = new ListenableFuture<>();
4445
this.rstOnClose = rstOnClose;
4546
addListener(connectFuture, connectContext);
46-
// The netty closeFuture is programmed to never return an exception.
47-
// This listener takes close-causing exceptions reported during the
48-
// channel lifetime, and reports them to the closeListener.
49-
addListener(this.channel.closeFuture(), ActionListener.running(() -> {
50-
if (channelError != null) {
51-
closeContext.onFailure(channelError);
52-
} else {
53-
closeContext.onResponse(null);
47+
addListener(this.channel.closeFuture(), new ActionListener<>() {
48+
@Override
49+
public void onResponse(Void ignored) {
50+
if (closeException != null) {
51+
closeContext.onFailure(closeException);
52+
} else {
53+
closeContext.onResponse(null);
54+
}
55+
}
56+
57+
@Override
58+
public void onFailure(Exception e) {
59+
assert false : new AssertionError("netty channel closeFuture should never report a failure");
5460
}
55-
}));
61+
});
5662
}
5763

5864
@Override
@@ -106,8 +112,8 @@ public void addConnectListener(ActionListener<Void> listener) {
106112
}
107113

108114
@Override
109-
public void onException(Exception e) {
110-
channelError = e;
115+
public void setCloseException(Exception e) {
116+
closeException = e;
111117
}
112118

113119
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.lucene.util.BytesRef;
2929
import org.elasticsearch.ExceptionsHelper;
3030
import org.elasticsearch.TransportVersion;
31-
import org.elasticsearch.action.ActionListener;
3231
import org.elasticsearch.cluster.node.DiscoveryNode;
3332
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3433
import org.elasticsearch.common.network.NetworkService;
@@ -55,7 +54,6 @@
5554
import java.util.Map;
5655

5756
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
58-
import static org.elasticsearch.core.Strings.format;
5957
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
6058
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;
6159

@@ -282,7 +280,6 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile
282280
rstOnClose,
283281
connectFuture
284282
);
285-
addClosedExceptionLogger(nettyChannel);
286283
channel.attr(CHANNEL_KEY).set(nettyChannel);
287284

288285
return nettyChannel;
@@ -310,6 +307,14 @@ protected void stopInternal() {
310307
}, serverBootstraps::clear, () -> clientBootstrap = null);
311308
}
312309

310+
private Exception exceptionFromThrowable(Throwable cause) {
311+
if (cause instanceof Error) {
312+
return new Exception(cause);
313+
} else {
314+
return (Exception) cause;
315+
}
316+
}
317+
313318
protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
314319

315320
@Override
@@ -322,11 +327,7 @@ protected void initChannel(Channel ch) throws Exception {
322327
@Override
323328
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
324329
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
325-
if (cause instanceof Error) {
326-
channel.onException(new Exception(cause));
327-
} else {
328-
channel.onException((Exception) cause);
329-
}
330+
channel.setCloseException(exceptionFromThrowable(cause));
330331
ExceptionsHelper.maybeDieOnAnotherThread(cause);
331332
super.exceptionCaught(ctx, cause);
332333
}
@@ -347,7 +348,6 @@ protected void initChannel(Channel ch) throws Exception {
347348
assert ch instanceof Netty4NioSocketChannel;
348349
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
349350
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
350-
addClosedExceptionLogger(nettyTcpChannel);
351351
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
352352
setupPipeline(ch, isRemoteClusterServerChannel);
353353
serverAcceptedChannel(nettyTcpChannel);
@@ -356,11 +356,7 @@ protected void initChannel(Channel ch) throws Exception {
356356
@Override
357357
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
358358
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
359-
if (cause instanceof Error) {
360-
channel.onException(new Exception(cause));
361-
} else {
362-
channel.onException((Exception) cause);
363-
}
359+
channel.setCloseException(exceptionFromThrowable(cause));
364360
ExceptionsHelper.maybeDieOnAnotherThread(cause);
365361
super.exceptionCaught(ctx, cause);
366362
}
@@ -396,14 +392,6 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
396392
);
397393
}
398394

399-
private static void addClosedExceptionLogger(Netty4TcpChannel channel) {
400-
channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> {
401-
if (logger.isDebugEnabled()) {
402-
logger.debug(format("exception while closing channel: %s", channel), e);
403-
}
404-
}));
405-
}
406-
407395
@ChannelHandler.Sharable
408396
private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter {
409397

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
390390
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
391391
e
392392
);
393-
channel.onException(e);
393+
channel.setCloseException(e);
394394
channel.close();
395395
}
396396
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ void sendResponse(
168168
),
169169
ex
170170
);
171-
channel.onException(ex);
171+
channel.setCloseException(ex);
172172
channel.close();
173173
} else {
174174
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
@@ -205,7 +205,7 @@ void sendErrorResponse(
205205
} catch (Exception sendException) {
206206
sendException.addSuppressed(error);
207207
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
208-
channel.onException(sendException);
208+
channel.setCloseException(sendException);
209209
channel.close();
210210
}
211211
}
@@ -433,7 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
433433
}
434434
});
435435
} catch (RuntimeException ex) {
436-
channel.onException(ex);
436+
channel.setCloseException(ex);
437437
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
438438
throw ex;
439439
}

server/src/main/java/org/elasticsearch/transport/TcpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public interface TcpChannel extends CloseableChannel {
6767
void addConnectListener(ActionListener<Void> listener);
6868

6969
/**
70-
* Report an exception on this channel
70+
* Report a close-causing exception on this channel
7171
*
7272
* @param e the exception
7373
*/
74-
void onException(Exception e);
74+
void setCloseException(Exception e);
7575

7676
/**
7777
* Returns stats about this channel

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
773773
}
774774
} finally {
775775
if (closeChannel) {
776-
channel.onException(e);
776+
channel.setCloseException(e);
777777
CloseableChannel.closeChannel(channel);
778778
}
779779
}
@@ -1206,7 +1206,7 @@ public void onResponse(Void ignored) {
12061206
@Override
12071207
public void onFailure(Exception e) {
12081208
long closeTimeMillis = threadPool.relativeTimeInMillis();
1209-
logger.info(
1209+
logger.debug(
12101210
() -> format(
12111211
"closed transport connection [{}] to [{}] with age [{}ms], exception:",
12121212
connectionId,

server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static org.hamcrest.Matchers.equalTo;
5656
import static org.hamcrest.Matchers.instanceOf;
5757
import static org.hamcrest.Matchers.notNullValue;
58+
import static org.hamcrest.Matchers.nullValue;
5859
import static org.mockito.ArgumentMatchers.any;
5960
import static org.mockito.ArgumentMatchers.eq;
6061
import static org.mockito.Mockito.doAnswer;
@@ -107,6 +108,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
107108

108109
DiscoveryNode node = DiscoveryNodeUtils.create("", new TransportAddress(InetAddress.getLoopbackAddress(), 0));
109110
Transport.Connection connection = new TestConnect(node);
111+
PlainActionFuture<Void> closeListener = new PlainActionFuture<>();
112+
connection.addCloseListener(closeListener);
110113
doAnswer(invocationOnMock -> {
111114
@SuppressWarnings("unchecked")
112115
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
@@ -137,6 +140,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
137140
connection.close();
138141
}
139142
assertTrue(connection.isClosed());
143+
assertThat(closeListener.actionGet(), nullValue());
140144
assertEquals(0, connectionManager.size());
141145
assertEquals(1, nodeConnectedCount.get());
142146
assertEquals(1, nodeDisconnectedCount.get());

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,7 @@ public void testClosesChannelOnErrorInHandshake() throws Exception {
260260
handler.inboundMessage(channel, requestMessage);
261261
assertTrue(isClosed.get());
262262
assertTrue(closeListener.isDone());
263-
try {
264-
closeListener.get();
265-
assert false : "channel should have an exception reported";
266-
} catch (Exception e) {}
263+
expectThrows(Exception.class, () -> closeListener.get());
267264
assertNull(channel.getMessageCaptor().get());
268265
mockLog.assertAllExpectationsMatched();
269266
}

server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,7 @@ public void writeTo(StreamOutput out) {
438438
assertNull(channel.getListenerCaptor().get());
439439
assertFalse(channel.isOpen());
440440
assertTrue(closeListener.isDone());
441-
try {
442-
closeListener.get();
443-
assert false : "channel should have an exception reported";
444-
} catch (Exception e) {}
441+
expectThrows(Exception.class, () -> closeListener.get());
445442
}
446443

447444
public void testFailToSendHandshakeResponse() {
@@ -485,10 +482,7 @@ public void onResponseSent(long requestId, String action, Exception error) {
485482
assertTrue(response.released.get());
486483
assertFalse(channel.isOpen());
487484
assertTrue(closeListener.isDone());
488-
try {
489-
closeListener.get();
490-
assert false : "channel should have an exception reported";
491-
} catch (Exception e) {}
485+
expectThrows(Exception.class, () -> closeListener.get());
492486
}
493487

494488
public void testFailToSendErrorResponse() {
@@ -534,10 +528,7 @@ public void onResponseSent(long requestId, String action, Exception error) {
534528
assertNull(channel.getMessageCaptor().get());
535529
assertNull(channel.getListenerCaptor().get());
536530
assertTrue(closeListener.isDone());
537-
try {
538-
closeListener.get();
539-
assert false : "channel should have an exception reported";
540-
} catch (Exception e) {}
531+
expectThrows(Exception.class, () -> closeListener.get());
541532
}
542533

543534
/**

server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -605,10 +605,7 @@ private void testExceptionHandling(
605605

606606
if (expectClosed) {
607607
assertTrue(listener.isDone());
608-
try {
609-
listener.get();
610-
assert false : "channel should have an exception reported";
611-
} catch (Exception e) {}
608+
expectThrows(Exception.class, () -> listener.get());
612609
} else {
613610
assertFalse(listener.isDone());
614611
}

0 commit comments

Comments
 (0)