Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand Down Expand Up @@ -188,6 +192,7 @@ public String toString() {

@Override
protected void doStart() {
transportService.addConnectionListener(new ConnectionChangeListener());
final ConnectionChecker connectionChecker = new ConnectionChecker();
this.connectionChecker = connectionChecker;
connectionChecker.scheduleNextCheck();
Expand All @@ -209,12 +214,36 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio
});
}

private class ConnectionTarget {
// exposed for testing
protected ConnectionTarget connectionTargetForNode(DiscoveryNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to expose the whole ConnectionTarget out to tests - we could just allow access to the DisconnectionHistory for a node and keep the ConnectionTarget class private.

synchronized (mutex) {
return targetsByNode.get(node);
}
}

/**
* Time of disconnect in absolute time ({@link ThreadPool#absoluteTimeInMillis()}),
* and disconnect-causing exception, if any
*/
record DisconnectionHistory(long disconnectTimeMillis, @Nullable Exception disconnectCause) {
public long getDisconnectTimeMillis() {
return disconnectTimeMillis;
}

public Exception getDisconnectCause() {
return disconnectCause;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These methods do not seem necessary for a record class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for this -- I did look up making record fields public after accessing them from the test, but for some reason came away thinking I needed to write my own accessors. This helped me realize that I can just do disconnectTimeMillis() instead...

}

protected class ConnectionTarget {
private final DiscoveryNode discoveryNode;

private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
private final AtomicReference<Releasable> connectionRef = new AtomicReference<>();

// access is synchronized by the service mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

protected DisconnectionHistory disconnectionHistory = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggest marking this nullable (and describing what the null value means)

Suggested change
protected DisconnectionHistory disconnectionHistory = null;
@Nullable // if node is connected
protected DisconnectionHistory disconnectionHistory = null;


// all access to these fields is synchronized
private List<Releasable> pendingRefs;
private boolean connectionInProgress;
Expand Down Expand Up @@ -347,4 +376,67 @@ public String toString() {
}
}
}

/**
* Receives connection/disconnection events from the transport, and records them in per-node DisconnectionHistory
* structures for logging network issues. DisconnectionHistory records are stored their node's ConnectionTarget.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: DisconnectionHistory records are stored their node's ConnectionTarget, should it be "... stored in their node's ..."

*
* Network issues (that this listener monitors for) occur whenever a reconnection to a node succeeds,
* and it has the same ephemeral ID as it did during the last connection; this happens when a connection event
* occurs, and its ConnectionTarget entry has a previous DisconnectionHistory stored.
*/
private class ConnectionChangeListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
DisconnectionHistory disconnectionHistory = null;
synchronized (mutex) {
ConnectionTarget connectionTarget = targetsByNode.get(node);
if (connectionTarget != null) {
disconnectionHistory = connectionTarget.disconnectionHistory;
connectionTarget.disconnectionHistory = null;
}
}

if (disconnectionHistory != null) {
long millisSinceDisconnect = threadPool.absoluteTimeInMillis() - disconnectionHistory.disconnectTimeMillis;
if (disconnectionHistory.disconnectCause != null) {
logger.warn(
() -> format(
"""
reopened transport connection to node [%s] \
which disconnected exceptionally [%dms] ago but did not \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for including both the number of milliseconds and a human-readable representation of the time, see e.g. org.elasticsearch.action.support.SubscribableListener#scheduleTimeout. Sometimes these things may be minutes/hours long and it's hard to eyeball such large timespans in terms of milliseconds.

Suggested change
which disconnected exceptionally [%dms] ago but did not \
which disconnected exceptionally [%s/%dms] ago but did not \

restart, so the disconnection is unexpected; \
see [%s] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
millisSinceDisconnect,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
),
disconnectionHistory.disconnectCause
);
} else {
logger.warn(
"""
reopened transport connection to node [{}] \
which disconnected gracefully [{}ms] ago but did not \
restart, so the disconnection is unexpected; \
see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
millisSinceDisconnect,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
}
}
}

@Override
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
DisconnectionHistory disconnectionHistory = new DisconnectionHistory(threadPool.absoluteTimeInMillis(), closeException);
synchronized (mutex) {
ConnectionTarget connectionTarget = targetsByNode.get(node);
if (connectionTarget != null) {
connectionTarget.disconnectionHistory = disconnectionHistory;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -137,7 +137,7 @@ public FollowersChecker(
);
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
handleDisconnectedNode(node);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -124,7 +123,7 @@ public class LeaderChecker {

transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
handleDisconnectedNode(node);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,23 @@ private void connectToNodeOrRetry(
try {
connectionListener.onNodeConnected(node, conn);
} finally {
conn.addCloseListener(ActionListener.running(() -> {
connectedNodes.remove(node, conn);
connectionListener.onNodeDisconnected(node, conn);
managerRefs.decRef();
}));
conn.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
handleClose(null);
}

@Override
public void onFailure(Exception e) {
handleClose(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to store the connection history even when conn.hasReferences() == false ? I'm not 100% familiar with this code, but I wonder if we might get the occasional ungraceful disconnect after we've released all our references?

I guess in that case we would eventually discard the entry via retainConnectionHistory anyway.

Do we need to be careful with the timing of calls to retainConnectionHistory versus the these close handlers firing? I guess any entries that are added after a purge would not survive subsequent purges.


void handleClose(@Nullable Exception e) {
connectedNodes.remove(node, conn);
connectionListener.onNodeDisconnected(node, e);
managerRefs.decRef();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was like this already, but it strikes me that this decRef should be in a finally just in case some future onNodeDisconnected implementation throws an exception (can be done in a follow-up)

}
});

conn.addCloseListener(ActionListener.running(() -> {
if (connectingRefCounter.hasReferences() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ final class DelegatingNodeConnectionListener implements TransportConnectionListe
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode key, @Nullable Exception closeException) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key, connection);
listener.onNodeDisconnected(key, closeException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
}

@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
removeConnectedNode(node);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -339,7 +340,7 @@ boolean shouldRebuildConnection(Settings newSettings) {
protected abstract ConnectionStrategy strategyType();

@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
if (shouldOpenMoreConnections()) {
// try to reconnect and fill up the slot of the disconnected node
connect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;

/**
* A listener interface that allows to react on transport events. All methods may be
Expand Down Expand Up @@ -38,5 +39,5 @@ default void onNodeConnected(DiscoveryNode node, Transport.Connection connection
/**
* Called once a node connection is closed and unregistered.
*/
default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {}
default void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned earlier that we could consider pulling this API change out to a separate PR. As things stand I now think we should definitely do that - it's a simple refactoring (needs no test changes) and will make this change much more focussed.

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -831,7 +832,7 @@ public void testCCSRemoteReduceWithDisconnectedRemoteClusters() throws Exception
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
Expand Down Expand Up @@ -1134,7 +1135,7 @@ public void testCollectSearchShards() throws Exception {
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
Expand Down
Loading