1515import io .vertx .core .eventbus .*;
1616import io .vertx .core .impl .ContextInternal ;
1717import io .vertx .core .impl .VertxInternal ;
18+ import io .vertx .core .impl .logging .Logger ;
19+ import io .vertx .core .impl .logging .LoggerFactory ;
1820import io .vertx .core .impl .utils .ConcurrentCyclicSequence ;
1921import io .vertx .core .spi .metrics .EventBusMetrics ;
2022import io .vertx .core .spi .metrics .MetricsProvider ;
2123import io .vertx .core .spi .metrics .VertxMetrics ;
2224
2325import java .util .ArrayList ;
2426import java .util .Arrays ;
27+ import java .util .Collection ;
28+ import java .util .HashSet ;
29+ import java .util .Iterator ;
2530import java .util .List ;
2631import java .util .Objects ;
32+ import java .util .Set ;
2733import java .util .concurrent .ConcurrentHashMap ;
2834import java .util .concurrent .ConcurrentMap ;
35+ import java .util .concurrent .RejectedExecutionException ;
2936import java .util .concurrent .atomic .AtomicLong ;
3037import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
3138import java .util .function .Function ;
@@ -40,6 +47,7 @@ public class EventBusImpl implements EventBusInternal, MetricsProvider {
4047 private static final AtomicReferenceFieldUpdater <EventBusImpl , Handler []> OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater .newUpdater (EventBusImpl .class , Handler [].class , "outboundInterceptors" );
4148 private static final AtomicReferenceFieldUpdater <EventBusImpl , Handler []> INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater .newUpdater (EventBusImpl .class , Handler [].class , "inboundInterceptors" );
4249
50+ static final Logger logger = LoggerFactory .getLogger (EventBusImpl .class );
4351 private volatile Handler <DeliveryContext >[] outboundInterceptors = new Handler [0 ];
4452 private volatile Handler <DeliveryContext >[] inboundInterceptors = new Handler [0 ];
4553 private final AtomicLong replySequence = new AtomicLong (0 );
@@ -353,17 +361,46 @@ protected boolean isMessageLocal(MessageImpl msg) {
353361 protected ReplyException deliverMessageLocally (MessageImpl msg ) {
354362 ConcurrentCyclicSequence <HandlerHolder > handlers = handlerMap .get (msg .address ());
355363 boolean messageLocal = isMessageLocal (msg );
364+ boolean findingHandlerFailed = true ;
356365 if (handlers != null ) {
357366 if (msg .isSend ()) {
358367 //Choose one
359- HandlerHolder holder = nextHandler (handlers , messageLocal );
368+ HandlerHolder holder = nextHandler (handlers , messageLocal , null );
360369 if (metrics != null ) {
361370 metrics .messageReceived (msg .address (), !msg .isSend (), messageLocal , holder != null ? 1 : 0 );
362371 }
363- if (holder != null ) {
364- holder .handler .receive (msg .copyBeforeReceive ());
365- } else {
366- // RACY issue !!!!!
372+ /*
373+ In case the handler isn't able to enqueue the operation, we will try until we have exhausted all the handlers
374+ before failing hard.
375+ */
376+ Set <HandlerHolder > blacklistedHandlers = null ;
377+ while (true ) {
378+ if (holder != null ) {
379+ try {
380+ holder .handler .receive (msg .copyBeforeReceive ());
381+ findingHandlerFailed = false ;
382+ } catch (RejectedExecutionException e ) {
383+ if (blacklistedHandlers == null ) {
384+ blacklistedHandlers = new HashSet <>();
385+ }
386+ blacklistedHandlers .add (holder );
387+ holder = nextHandler (handlers , messageLocal , blacklistedHandlers );
388+ if (holder != null ) {
389+ if (logger .isDebugEnabled ()) {
390+ logger .debug (String .format ("Failed to enqueue message onto handler during send, will try another handler. Address: %s" , msg .address ()), e );
391+ }
392+ continue ;
393+ }
394+ else {
395+ if (logger .isDebugEnabled ()) {
396+ logger .debug (String .format ("Failed to enqueue message onto handler during send, no other handler found. Address: %s" , msg .address ()), e );
397+ }
398+ }
399+ }
400+ } else {
401+ // RACY issue !!!!!
402+ }
403+ break ;
367404 }
368405 } else {
369406 // Publish
@@ -372,21 +409,43 @@ protected ReplyException deliverMessageLocally(MessageImpl msg) {
372409 }
373410 for (HandlerHolder holder : handlers ) {
374411 if (messageLocal || !holder .isLocalOnly ()) {
375- holder .handler .receive (msg .copyBeforeReceive ());
412+ try {
413+ holder .handler .receive (msg .copyBeforeReceive ());
414+ findingHandlerFailed = false ;
415+ } catch (RejectedExecutionException e ) {
416+ if (logger .isDebugEnabled ()) {
417+ logger .debug (String .format ("Failed to enqueue message onto handler during publish. Address: %s" , msg .address ()), e );
418+ }
419+ }
376420 }
377421 }
378422 }
379- return null ;
380- } else {
423+ }
424+ if ( findingHandlerFailed ) {
381425 if (metrics != null ) {
382426 metrics .messageReceived (msg .address (), !msg .isSend (), messageLocal , 0 );
383427 }
384428 return new ReplyException (ReplyFailure .NO_HANDLERS , "No handlers for address " + msg .address );
385429 }
430+ return null ;
386431 }
387432
388- protected HandlerHolder nextHandler (ConcurrentCyclicSequence <HandlerHolder > handlers , boolean messageLocal ) {
389- return handlers .next ();
433+ protected HandlerHolder nextHandler (ConcurrentCyclicSequence <HandlerHolder > handlers , boolean messageLocal , Collection <HandlerHolder > blacklistedHandlers ) {
434+ return nextHandlerMessageLocal (handlers , blacklistedHandlers );
435+ }
436+
437+ protected static HandlerHolder nextHandlerMessageLocal (ConcurrentCyclicSequence <HandlerHolder > handlers , Collection <HandlerHolder > blacklistedHandlers ) {
438+ if (blacklistedHandlers == null ) {
439+ return handlers .next ();
440+ }
441+ final Iterator <HandlerHolder > iterator = handlers .iterator ();
442+ while (iterator .hasNext ()) {
443+ final HandlerHolder handlerHolder = iterator .next ();
444+ if (!blacklistedHandlers .contains (handlerHolder )) {
445+ return handlerHolder ;
446+ }
447+ }
448+ return null ;
390449 }
391450
392451 protected void checkStarted () {
0 commit comments