Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportLogger;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@ESIntegTestCase.ClusterScope(numDataNodes = 2, scope = ESIntegTestCase.Scope.TEST)
public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
Expand Down Expand Up @@ -96,7 +103,7 @@ public void testConnectionLogging() throws IOException {
"close connection log",
TcpTransport.class.getCanonicalName(),
Level.DEBUG,
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*"
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\]$"
)
);

Expand All @@ -105,4 +112,46 @@ public void testConnectionLogging() throws IOException {

mockLog.assertAllExpectationsMatched();
}

@TestLogging(
value = "org.elasticsearch.transport.TcpTransport:DEBUG",
reason = "to ensure we log exception disconnect events on DEBUG level"
)
public void testExceptionalDisconnectLogging() throws Exception {
mockLog.addExpectation(
new MockLog.PatternSeenEventExpectation(
"exceptional close connection log",
TcpTransport.class.getCanonicalName(),
Level.DEBUG,
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\], exception:.*"
)
);

final String nodeName = internalCluster().startNode();

final CountDownLatch latch = new CountDownLatch(1);
String masterNode = internalCluster().getMasterName();
ConnectionManager connManager = internalCluster().getInstance(TransportService.class, masterNode).getConnectionManager();
connManager.addListener(new TransportConnectionListener() {
@Override
public void onConnectionClosed(Transport.Connection conn) {
conn.addCloseListener(new ActionListener<>() {
@Override
public void onResponse(Void ignored) {}

@Override
public void onFailure(Exception e) {
latch.countDown();
}
});
}
});

int failAttempts = 0;
do {
internalCluster().restartNode(nodeName);
} while (latch.await(500, TimeUnit.MILLISECONDS) == false && failAttempts++ < 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you encounter cases where we don't get an exceptional close on the first try?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi David -- great question. This turned out to not be needed, so I've removed it. I think this grew out of an earlier version that was broken, and got lost in some debugging and breakpoints.


mockLog.assertAllExpectationsMatched();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public class Netty4TcpChannel implements TcpChannel {
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final ChannelStats stats = new ChannelStats();
private final boolean rstOnClose;
// Exception causing a close, reported to the closeContext listener
/**
* Exception causing a close, reported to the {@link #closeContext} listener
*/
private volatile Exception closeException = null;

Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ protected void stopInternal() {
}, serverBootstraps::clear, () -> clientBootstrap = null);
}

private Exception exceptionFromThrowable(Throwable cause) {
static Exception exceptionFromThrowable(Throwable cause) {
if (cause instanceof Error) {
return new Exception(cause);
} else {
Expand Down Expand Up @@ -399,11 +399,7 @@ private static class ServerChannelExceptionHandler extends ChannelInboundHandler
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
onServerException(serverChannel, new Exception(cause));
} else {
onServerException(serverChannel, (Exception) cause);
}
onServerException(serverChannel, exceptionFromThrowable(cause));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ public void onFailure(Exception e) {
long closeTimeMillis = threadPool.relativeTimeInMillis();
logger.debug(
() -> format(
"closed transport connection [{}] to [{}] with age [{}ms], exception:",
"closed transport connection [%d] to [%s] with age [%dms], exception:",
connectionId,
node,
closeTimeMillis - openTimeMillis
Expand Down