Skip to content

Commit 44d0f24

Browse files
authored
Merge branch 'main' into ivf_hkmeans
2 parents e5746a1 + ee716f1 commit 44d0f24

File tree

11 files changed

+42
-23
lines changed

11 files changed

+42
-23
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
import org.elasticsearch.common.util.concurrent.EsExecutors;
2727
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2828
import org.elasticsearch.core.CheckedRunnable;
29+
import org.elasticsearch.core.Nullable;
2930
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.monitor.NodeHealthService;
3132
import org.elasticsearch.monitor.StatusInfo;
3233
import org.elasticsearch.threadpool.ThreadPool.Names;
3334
import org.elasticsearch.transport.AbstractTransportRequest;
3435
import org.elasticsearch.transport.ConnectTransportException;
3536
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
36-
import org.elasticsearch.transport.Transport;
3737
import org.elasticsearch.transport.TransportConnectionListener;
3838
import org.elasticsearch.transport.TransportException;
3939
import org.elasticsearch.transport.TransportRequestOptions;
@@ -137,7 +137,7 @@ public FollowersChecker(
137137
);
138138
transportService.addConnectionListener(new TransportConnectionListener() {
139139
@Override
140-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
140+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
141141
handleDisconnectedNode(node);
142142
}
143143
});

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.transport.ConnectTransportException;
3333
import org.elasticsearch.transport.NodeDisconnectedException;
3434
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
35-
import org.elasticsearch.transport.Transport;
3635
import org.elasticsearch.transport.TransportConnectionListener;
3736
import org.elasticsearch.transport.TransportException;
3837
import org.elasticsearch.transport.TransportRequestOptions;
@@ -124,7 +123,7 @@ public class LeaderChecker {
124123

125124
transportService.addConnectionListener(new TransportConnectionListener() {
126125
@Override
127-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
126+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
128127
handleDisconnectedNode(node);
129128
}
130129
});

server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,26 @@ private void connectToNodeOrRetry(
229229
try {
230230
connectionListener.onNodeConnected(node, conn);
231231
} finally {
232-
conn.addCloseListener(ActionListener.running(() -> {
233-
connectedNodes.remove(node, conn);
234-
connectionListener.onNodeDisconnected(node, conn);
235-
managerRefs.decRef();
236-
}));
232+
conn.addCloseListener(new ActionListener<Void>() {
233+
@Override
234+
public void onResponse(Void ignored) {
235+
handleClose(null);
236+
}
237+
238+
@Override
239+
public void onFailure(Exception e) {
240+
handleClose(e);
241+
}
242+
243+
void handleClose(@Nullable Exception e) {
244+
connectedNodes.remove(node, conn);
245+
try {
246+
connectionListener.onNodeDisconnected(node, e);
247+
} finally {
248+
managerRefs.decRef();
249+
}
250+
}
251+
});
237252

238253
conn.addCloseListener(ActionListener.running(() -> {
239254
if (connectingRefCounter.hasReferences() == false) {

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ final class DelegatingNodeConnectionListener implements TransportConnectionListe
6060
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
6161

6262
@Override
63-
public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) {
63+
public void onNodeDisconnected(DiscoveryNode key, @Nullable Exception closeException) {
6464
for (TransportConnectionListener listener : listeners) {
65-
listener.onNodeDisconnected(key, connection);
65+
listener.onNodeDisconnected(key, closeException);
6666
}
6767
}
6868

server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
6161
}
6262

6363
@Override
64-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
64+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
6565
removeConnectedNode(node);
6666
}
6767
});

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2323
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
24+
import org.elasticsearch.core.Nullable;
2425
import org.elasticsearch.core.TimeValue;
2526
import org.elasticsearch.threadpool.ThreadPool;
2627

@@ -339,7 +340,7 @@ boolean shouldRebuildConnection(Settings newSettings) {
339340
protected abstract ConnectionStrategy strategyType();
340341

341342
@Override
342-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
343+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
343344
if (shouldOpenMoreConnections()) {
344345
// try to reconnect and fill up the slot of the disconnected node
345346
connect(

server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.core.Nullable;
1314

1415
/**
1516
* A listener interface that allows to react on transport events. All methods may be
@@ -38,5 +39,5 @@ default void onNodeConnected(DiscoveryNode node, Transport.Connection connection
3839
/**
3940
* Called once a node connection is closed and unregistered.
4041
*/
41-
default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {}
42+
default void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {}
4243
}

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.common.settings.Settings;
5353
import org.elasticsearch.common.transport.TransportAddress;
5454
import org.elasticsearch.common.util.set.Sets;
55+
import org.elasticsearch.core.Nullable;
5556
import org.elasticsearch.core.TimeValue;
5657
import org.elasticsearch.core.Tuple;
5758
import org.elasticsearch.index.Index;
@@ -831,7 +832,7 @@ public void testCCSRemoteReduceWithDisconnectedRemoteClusters() throws Exception
831832
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
832833
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
833834
@Override
834-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
835+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
835836
if (disconnectedNodes.remove(node)) {
836837
disconnectedLatch.countDown();
837838
}
@@ -1134,7 +1135,7 @@ public void testCollectSearchShards() throws Exception {
11341135
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
11351136
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
11361137
@Override
1137-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
1138+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
11381139
if (disconnectedNodes.remove(node)) {
11391140
disconnectedLatch.countDown();
11401141
}

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
3131
import org.elasticsearch.core.AbstractRefCounted;
3232
import org.elasticsearch.core.CheckedRunnable;
33+
import org.elasticsearch.core.Nullable;
3334
import org.elasticsearch.core.RefCounted;
3435
import org.elasticsearch.core.TimeValue;
3536
import org.elasticsearch.test.ESTestCase;
@@ -251,7 +252,7 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
251252
final AtomicReference<ActionListener<DiscoveryNode>> disconnectListenerRef = new AtomicReference<>();
252253
transportService.addConnectionListener(new TransportConnectionListener() {
253254
@Override
254-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
255+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
255256
final ActionListener<DiscoveryNode> disconnectListener = disconnectListenerRef.getAndSet(null);
256257
if (disconnectListener != null) {
257258
disconnectListener.onResponse(node);

server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.util.concurrent.RunOnce;
2424
import org.elasticsearch.common.util.concurrent.ThreadContext;
2525
import org.elasticsearch.core.AbstractRefCounted;
26+
import org.elasticsearch.core.Nullable;
2627
import org.elasticsearch.core.Releasable;
2728
import org.elasticsearch.core.Releasables;
2829
import org.elasticsearch.core.TimeValue;
@@ -101,7 +102,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
101102
}
102103

103104
@Override
104-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
105+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
105106
nodeDisconnectedCount.incrementAndGet();
106107
}
107108
});
@@ -658,7 +659,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
658659
}
659660

660661
@Override
661-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
662+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
662663
nodeDisconnectedCount.incrementAndGet();
663664
}
664665
});
@@ -698,7 +699,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
698699
}
699700

700701
@Override
701-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
702+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
702703
nodeDisconnectedCount.incrementAndGet();
703704
}
704705
});

0 commit comments

Comments
 (0)