Skip to content

Commit b192a21

Browse files
author
Oskar Johansson
committed
#5337: EventBusImpl.deliverMessageLocally did not properly handle if the receiving context's event loop has been shutdown
1 parent 0b41047 commit b192a21

File tree

4 files changed

+283
-18
lines changed

4 files changed

+283
-18
lines changed

src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,24 @@
1515
import io.vertx.core.eventbus.*;
1616
import io.vertx.core.impl.ContextInternal;
1717
import io.vertx.core.impl.VertxInternal;
18+
import io.vertx.core.impl.logging.Logger;
19+
import io.vertx.core.impl.logging.LoggerFactory;
1820
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
1921
import io.vertx.core.spi.metrics.EventBusMetrics;
2022
import io.vertx.core.spi.metrics.MetricsProvider;
2123
import io.vertx.core.spi.metrics.VertxMetrics;
2224

2325
import java.util.ArrayList;
2426
import java.util.Arrays;
27+
import java.util.Collection;
28+
import java.util.HashSet;
29+
import java.util.Iterator;
2530
import java.util.List;
2631
import java.util.Objects;
32+
import java.util.Set;
2733
import java.util.concurrent.ConcurrentHashMap;
2834
import java.util.concurrent.ConcurrentMap;
35+
import java.util.concurrent.RejectedExecutionException;
2936
import java.util.concurrent.atomic.AtomicLong;
3037
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3138
import java.util.function.Function;
@@ -40,6 +47,7 @@ public class EventBusImpl implements EventBusInternal, MetricsProvider {
4047
private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "outboundInterceptors");
4148
private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "inboundInterceptors");
4249

50+
static final Logger logger = LoggerFactory.getLogger(EventBusImpl.class);
4351
private volatile Handler<DeliveryContext>[] outboundInterceptors = new Handler[0];
4452
private volatile Handler<DeliveryContext>[] inboundInterceptors = new Handler[0];
4553
private final AtomicLong replySequence = new AtomicLong(0);
@@ -353,17 +361,46 @@ protected boolean isMessageLocal(MessageImpl msg) {
353361
protected ReplyException deliverMessageLocally(MessageImpl msg) {
354362
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
355363
boolean messageLocal = isMessageLocal(msg);
364+
boolean findingHandlerFailed = true;
356365
if (handlers != null) {
357366
if (msg.isSend()) {
358367
//Choose one
359-
HandlerHolder holder = nextHandler(handlers, messageLocal);
368+
HandlerHolder holder = nextHandler(handlers, messageLocal, null);
360369
if (metrics != null) {
361370
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0);
362371
}
363-
if (holder != null) {
364-
holder.handler.receive(msg.copyBeforeReceive());
365-
} else {
366-
// RACY issue !!!!!
372+
/*
373+
In case the handler isn't able to enqueue the operation, we will try until we have exhausted all the handlers
374+
before failing hard.
375+
*/
376+
Set<HandlerHolder> blacklistedHandlers = null;
377+
while(true) {
378+
if (holder != null) {
379+
try {
380+
holder.handler.receive(msg.copyBeforeReceive());
381+
findingHandlerFailed = false;
382+
} catch (RejectedExecutionException e) {
383+
if(blacklistedHandlers == null) {
384+
blacklistedHandlers = new HashSet<>();
385+
}
386+
blacklistedHandlers.add(holder);
387+
holder = nextHandler(handlers, messageLocal, blacklistedHandlers);
388+
if(holder != null) {
389+
if(logger.isDebugEnabled()) {
390+
logger.debug(String.format("Failed to enqueue message onto handler during send, will try another handler. Address: %s", msg.address()), e);
391+
}
392+
continue;
393+
}
394+
else {
395+
if(logger.isDebugEnabled()) {
396+
logger.debug(String.format("Failed to enqueue message onto handler during send, no other handler found. Address: %s", msg.address()), e);
397+
}
398+
}
399+
}
400+
} else {
401+
// RACY issue !!!!!
402+
}
403+
break;
367404
}
368405
} else {
369406
// Publish
@@ -372,21 +409,43 @@ protected ReplyException deliverMessageLocally(MessageImpl msg) {
372409
}
373410
for (HandlerHolder holder: handlers) {
374411
if (messageLocal || !holder.isLocalOnly()) {
375-
holder.handler.receive(msg.copyBeforeReceive());
412+
try {
413+
holder.handler.receive(msg.copyBeforeReceive());
414+
findingHandlerFailed = false;
415+
} catch (RejectedExecutionException e) {
416+
if(logger.isDebugEnabled()) {
417+
logger.debug(String.format("Failed to enqueue message onto handler during publish. Address: %s", msg.address()), e);
418+
}
419+
}
376420
}
377421
}
378422
}
379-
return null;
380-
} else {
423+
}
424+
if (findingHandlerFailed) {
381425
if (metrics != null) {
382426
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0);
383427
}
384428
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
385429
}
430+
return null;
386431
}
387432

388-
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal) {
389-
return handlers.next();
433+
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal, Collection<HandlerHolder> blacklistedHandlers) {
434+
return nextHandlerMessageLocal(handlers, blacklistedHandlers);
435+
}
436+
437+
protected static HandlerHolder nextHandlerMessageLocal(ConcurrentCyclicSequence<HandlerHolder> handlers, Collection<HandlerHolder> blacklistedHandlers) {
438+
if(blacklistedHandlers == null) {
439+
return handlers.next();
440+
}
441+
final Iterator<HandlerHolder> iterator = handlers.iterator();
442+
while (iterator.hasNext()) {
443+
final HandlerHolder handlerHolder = iterator.next();
444+
if(!blacklistedHandlers.contains(handlerHolder)) {
445+
return handlerHolder;
446+
}
447+
}
448+
return null;
390449
}
391450

392451
protected void checkStarted() {

src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.vertx.core.spi.cluster.RegistrationInfo;
4545
import io.vertx.core.spi.metrics.VertxMetrics;
4646

47+
import java.util.Collection;
4748
import java.util.Iterator;
4849
import java.util.Objects;
4950
import java.util.UUID;
@@ -242,17 +243,19 @@ protected boolean isMessageLocal(MessageImpl msg) {
242243
}
243244

244245
@Override
245-
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal) {
246+
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal, Collection<HandlerHolder> blacklistedHandlers) {
246247
HandlerHolder handlerHolder = null;
247248
if (messageLocal) {
248-
handlerHolder = handlers.next();
249+
handlerHolder = nextHandlerMessageLocal(handlers, blacklistedHandlers);
249250
} else {
250251
Iterator<HandlerHolder> iterator = handlers.iterator(false);
251252
while (iterator.hasNext()) {
252253
HandlerHolder next = iterator.next();
253254
if (next.isReplyHandler() || !next.isLocalOnly()) {
254-
handlerHolder = next;
255-
break;
255+
if(blacklistedHandlers == null || !blacklistedHandlers.contains(next)) {
256+
handlerHolder = next;
257+
break;
258+
}
256259
}
257260
}
258261
}

src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
package io.vertx.core.eventbus;
1313

1414
import io.vertx.core.*;
15+
import io.vertx.core.eventbus.impl.HandlerHolder;
1516
import io.vertx.core.impl.VertxInternal;
17+
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
1618
import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableImplObject;
1719
import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableObject;
1820
import io.vertx.core.shareddata.AsyncMapTest.SomeSerializableObject;
@@ -25,10 +27,9 @@
2527
import org.junit.Test;
2628

2729
import java.io.InvalidClassException;
28-
import java.util.ArrayList;
29-
import java.util.Arrays;
30-
import java.util.Collections;
31-
import java.util.List;
30+
import java.lang.reflect.InvocationTargetException;
31+
import java.lang.reflect.Method;
32+
import java.util.*;
3233
import java.util.concurrent.ConcurrentLinkedDeque;
3334
import java.util.concurrent.CountDownLatch;
3435
import java.util.concurrent.atomic.AtomicBoolean;
@@ -706,4 +707,77 @@ public void testMultiHeaders() {
706707
await();
707708

708709
}
710+
711+
@Test
712+
public void testNextHandlerForNonLocalMessageEmptyBlacklist() throws Throwable {
713+
testNextHandlerInternal(0, 10, false);
714+
}
715+
716+
@Test
717+
public void testNextHandlerForNonLocalMessageNullBlacklist() throws Throwable {
718+
testNextHandlerInternal(-1, 10, false);
719+
}
720+
721+
@Test
722+
public void testNextHandlerForNonLocalMessageHalfBlacklisted() throws Throwable {
723+
testNextHandlerInternal(5, 10, false);
724+
}
725+
726+
@Test
727+
public void testNextHandlerForNonLocalMessageAllBlacklisted() throws Throwable {
728+
testNextHandlerInternal(10, 10, false);
729+
}
730+
731+
@Test
732+
public void testNextHandlerForLocalMessageEmptyBlacklist() throws Throwable {
733+
testNextHandlerInternal(0, 10, true);
734+
}
735+
736+
@Test
737+
public void testNextHandlerForLocalMessageNullBlacklist() throws Throwable {
738+
testNextHandlerInternal(-1, 10, true);
739+
}
740+
741+
@Test
742+
public void testNextHandlerForLocalMessageHalfBlacklisted() throws Throwable {
743+
testNextHandlerInternal(5, 10, true);
744+
}
745+
746+
@Test
747+
public void testNextHandlerForLocalMessageAllBlacklisted() throws Throwable {
748+
testNextHandlerInternal(10, 10, true);
749+
}
750+
751+
private void testNextHandlerInternal(int numberOfEntriesToBlacklist, int totalNumberOfEntries, boolean localMessage) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
752+
int expectedIndex = numberOfEntriesToBlacklist >= 0 ? (numberOfEntriesToBlacklist >= totalNumberOfEntries ? -1 : numberOfEntriesToBlacklist) : 0;
753+
startNodes(1);
754+
waitFor(1);
755+
final EventBus eventBus = vertices[0].eventBus();
756+
List<HandlerHolder> handlerHolders = new ArrayList<>();
757+
for(int i = 0; i < totalNumberOfEntries; i++) {
758+
final int handlerIndex = i;
759+
handlerHolders.add(new HandlerHolder(null, false, false, null) {
760+
@Override
761+
public String toString() {
762+
return super.toString() + " Index: " + handlerIndex;
763+
}
764+
765+
@Override
766+
public boolean equals(Object o) {
767+
return this == o;
768+
}
769+
770+
@Override
771+
public int hashCode() {
772+
return handlerIndex;
773+
}
774+
});
775+
}
776+
List<HandlerHolder> blacklist = numberOfEntriesToBlacklist >= 0 ? handlerHolders.stream().limit(numberOfEntriesToBlacklist).collect(Collectors.toList()) : null;
777+
final ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence = new ConcurrentCyclicSequence<>(handlerHolders.toArray(new HandlerHolder[0]));
778+
final Method methodNextHandler = eventBus.getClass().getDeclaredMethod("nextHandler", new Class<?>[]{ConcurrentCyclicSequence.class, Boolean.TYPE, Collection.class});
779+
methodNextHandler.setAccessible(true);
780+
final HandlerHolder selectedHandleHolder = (HandlerHolder) methodNextHandler.invoke(eventBus, concurrentCyclicSequence, localMessage, blacklist);
781+
assertSame(expectedIndex >= 0 ? handlerHolders.get(expectedIndex) : null, selectedHandleHolder);
782+
}
709783
}

0 commit comments

Comments
 (0)