Skip to content

Commit b437d50

Browse files
committed
Fix EventBus cluster manager node lookup failure.
Motivation: When the EventBus allocated a connection for a given node, it reserves a slot in a map to cumulate messages until the connection is established. When connection fails, the allocated connection slot should be removed to let further reconnect attempts work.
1 parent 950e7e8 commit b437d50

File tree

3 files changed

+54
-9
lines changed

3 files changed

+54
-9
lines changed

vertx-core/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,10 @@ private void connect(OutboundConnection conn) {
385385
});
386386
conn.connected(connection);
387387
} else {
388-
log.warn("Connecting to server " + conn.remoteNodeId() + " failed", ar.cause());
388+
if (log.isWarnEnabled()) {
389+
log.warn("Connecting to server " + conn.remoteNodeId() + " failed", ar.cause());
390+
}
391+
outboundConnections.remove(conn.remoteNodeId(), conn);
389392
conn.handleClose(ar.cause());
390393
}
391394
});

vertx-core/src/main/java/io/vertx/core/eventbus/impl/clustered/OutboundConnection.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.internal.VertxInternal;
2222
import io.vertx.core.internal.logging.Logger;
2323
import io.vertx.core.internal.logging.LoggerFactory;
24+
import io.vertx.core.internal.net.NetSocketInternal;
2425
import io.vertx.core.net.NetSocket;
2526
import io.vertx.core.spi.metrics.EventBusMetrics;
2627

@@ -49,6 +50,7 @@ final class OutboundConnection implements Handler<Buffer> {
4950
private boolean connected;
5051
private long pingReplyTimeoutID = -1;
5152
private long pingTimeoutID = -1;
53+
private boolean closed;
5254

5355
OutboundConnection(ClusteredEventBus eventBus, String remoteNodeId) {
5456
this.eventBus = eventBus;
@@ -62,17 +64,28 @@ String remoteNodeId() {
6264
}
6365

6466
synchronized void writeMessage(MessageImpl<?, ?> message, Promise<Void> writePromise) {
65-
if (connected) {
67+
Throwable failure;
68+
synchronized (this) {
69+
if (closed) {
70+
failure = NetSocketInternal.CLOSED_EXCEPTION;
71+
} else if (connected) {
72+
failure = null;
73+
} else {
74+
if (pendingWrites == null) {
75+
if (log.isDebugEnabled()) {
76+
log.debug("Not connected to server " + remoteNodeId + " - starting queuing");
77+
}
78+
pendingWrites = new ArrayDeque<>();
79+
}
80+
pendingWrites.add(new MessageWrite(message, writePromise));
81+
return;
82+
}
83+
}
84+
if (failure == null) {
6685
writeMessage(message)
6786
.onComplete(writePromise);
6887
} else {
69-
if (pendingWrites == null) {
70-
if (log.isDebugEnabled()) {
71-
log.debug("Not connected to server " + remoteNodeId + " - starting queuing");
72-
}
73-
pendingWrites = new ArrayDeque<>();
74-
}
75-
pendingWrites.add(new MessageWrite(message, writePromise));
88+
writePromise.tryFail(failure);
7689
}
7790
}
7891

@@ -91,6 +104,7 @@ void handleClose(Throwable cause) {
91104
vertx.cancelTimer(pingTimeoutID);
92105
}
93106
synchronized (this) {
107+
closed = true;
94108
MessageWrite msg;
95109
if (pendingWrites != null) {
96110
while ((msg = pendingWrites.poll()) != null) {

vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,4 +752,32 @@ public void disconnected(Object socketMetric, SocketAddress remoteAddress) {
752752
Future.future(eventBus::close).await();
753753
assertWaitUntil(() -> numberOfOutboundConnections.get() == 0 && numberOfInboundConnections.get() == 0);
754754
}
755+
756+
@Test
757+
public void testHandleCloseRemovesStaleOutboundConnectionOnConnectFailure() {
758+
AtomicInteger idx = new AtomicInteger();
759+
startNodes(2, () -> new WrappedClusterManager(getClusterManager()) {
760+
@Override
761+
public void getNodeInfo(String nodeId, Completable<io.vertx.core.spi.cluster.NodeInfo> promise) {
762+
if (idx.getAndIncrement() == 0) {
763+
promise.fail("induced failure");
764+
} else {
765+
super.getNodeInfo(nodeId, promise);
766+
}
767+
}
768+
});
769+
770+
vertices[1].eventBus().consumer(ADDRESS1, msg -> {
771+
testComplete();
772+
}).completion().await();
773+
774+
try {
775+
vertices[0].eventBus().sender(ADDRESS1).write("will fail").await();
776+
fail("Should have failed");
777+
} catch (Exception e) {
778+
vertices[0].eventBus().request(ADDRESS1, "will succeed");
779+
}
780+
781+
await();
782+
}
755783
}

0 commit comments

Comments
 (0)