Skip to content

Commit 00bf386

Browse files
authored
Throw ESRejected*Exception when too many pending connect listeners (#59036)
The backport in #56073 was supposed to change the max pending listeners to 1000 and throw ESRejectedExecutionException instead of RejectedExecutionException when reaching that limit. However, it missed the latter. Closes #53225
1 parent 7038525 commit 00bf386

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.unit.TimeValue;
4848
import org.elasticsearch.common.util.CancellableThreads;
4949
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
50+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
5051
import org.elasticsearch.common.util.concurrent.ThreadContext;
5152
import org.elasticsearch.core.internal.io.IOUtils;
5253
import org.elasticsearch.threadpool.ThreadPool;
@@ -65,7 +66,6 @@
6566
import java.util.concurrent.ArrayBlockingQueue;
6667
import java.util.concurrent.BlockingQueue;
6768
import java.util.concurrent.ExecutorService;
68-
import java.util.concurrent.RejectedExecutionException;
6969
import java.util.concurrent.Semaphore;
7070
import java.util.concurrent.atomic.AtomicBoolean;
7171
import java.util.function.Consumer;
@@ -442,7 +442,7 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
442442
ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext());
443443
synchronized (queue) {
444444
if (listener != null && queue.offer(listener) == false) {
445-
listener.onFailure(new RejectedExecutionException("connect queue is full"));
445+
listener.onFailure(new EsRejectedExecutionException("connect queue is full"));
446446
return;
447447
}
448448
if (forceRun == false && queue.isEmpty()) {

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.common.transport.TransportAddress;
5050
import org.elasticsearch.common.unit.TimeValue;
5151
import org.elasticsearch.common.util.CancellableThreads;
52+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
5253
import org.elasticsearch.common.util.concurrent.ThreadContext;
5354
import org.elasticsearch.common.xcontent.XContentBuilder;
5455
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -97,6 +98,7 @@
9798
import static org.hamcrest.Matchers.iterableWithSize;
9899
import static org.hamcrest.Matchers.not;
99100
import static org.hamcrest.Matchers.notNullValue;
101+
import static org.hamcrest.Matchers.nullValue;
100102
import static org.hamcrest.Matchers.sameInstance;
101103
import static org.hamcrest.Matchers.startsWith;
102104

@@ -1204,6 +1206,67 @@ public void onFailure(Exception e) {
12041206
}
12051207
}
12061208

1209+
public void testPendingConnectListeners() throws IOException, InterruptedException {
1210+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
1211+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
1212+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
1213+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
1214+
knownNodes.add(seedTransport.getLocalDiscoNode());
1215+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
1216+
Collections.shuffle(knownNodes, random());
1217+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
1218+
service.start();
1219+
service.acceptIncomingRequests();
1220+
final Settings settings = Settings.builder()
1221+
.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1).build();
1222+
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, "test-cluster",
1223+
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) {
1224+
ConnectionManager connectionManager = connection.getConnectionManager();
1225+
CountDownLatch connectionOpenedLatch = new CountDownLatch(1);
1226+
CountDownLatch connectionBlockedLatch = new CountDownLatch(1);
1227+
connectionManager.addListener(new TransportConnectionListener() {
1228+
@Override
1229+
public void onConnectionOpened(Transport.Connection connection) {
1230+
connectionOpenedLatch.countDown();
1231+
try {
1232+
connectionBlockedLatch.await();
1233+
} catch (InterruptedException e) {
1234+
throw new AssertionError(e);
1235+
}
1236+
}
1237+
});
1238+
1239+
Thread thread = new Thread(() -> connection.ensureConnected(ActionListener.wrap(() -> {})));
1240+
thread.start();
1241+
connectionOpenedLatch.await();
1242+
connection.ensureConnected(ActionListener.wrap(() -> {}));
1243+
try {
1244+
int pendingConnections = randomIntBetween(1, 5);
1245+
for (int i = 0; i < pendingConnections; i++) {
1246+
AtomicReference<Exception> error = new AtomicReference<>();
1247+
connection.ensureConnected(new ActionListener<Void>() {
1248+
@Override
1249+
public void onResponse(Void aVoid) {
1250+
1251+
}
1252+
1253+
@Override
1254+
public void onFailure(Exception e) {
1255+
error.set(e);
1256+
}
1257+
});
1258+
assertThat(error.get(), not(nullValue()));
1259+
assertThat(error.get(), instanceOf(EsRejectedExecutionException.class));
1260+
}
1261+
} finally {
1262+
connectionBlockedLatch.countDown();
1263+
thread.join();
1264+
}
1265+
}
1266+
}
1267+
}
1268+
}
1269+
12071270
public void testCollectNodes() throws Exception {
12081271
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
12091272
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {

0 commit comments

Comments
 (0)