@@ -496,36 +496,44 @@ private byte[] internalRetrieve(EntityDescriptor entityDescriptor) throws Entity
496
496
497
497
private Invocation .Task queueInFlightMessage (EntityID eid , Supplier <NetworkVoltronEntityMessage > message , InvocationCallback <byte []> callback ) {
498
498
boolean queued ;
499
- InFlightMessage inFlight = new InFlightMessage (eid , message , callback );
500
499
try {
501
- msgCount .increment ();
502
- inflights .add (inFlightMessages .size ());
503
- // NOTE: If we are already stop, the handler in outbound will fail this message for us.
504
- queued = enqueueMessage (inFlight );
505
- } catch (Throwable t ) {
506
- transactionSource .retire (inFlight .getTransactionID ());
507
- throw t ;
508
- }
500
+ InFlightMessage inFlight = new InFlightMessage (eid , message , callback );
501
+ try {
502
+ msgCount .increment ();
503
+ inflights .add (inFlightMessages .size ());
504
+ // NOTE: If we are already stop, the handler in outbound will fail this message for us.
505
+ queued = enqueueMessage (inFlight );
506
+ } catch (Throwable t ) {
507
+ transactionSource .retire (inFlight .getTransactionID ());
508
+ throw t ;
509
+ }
509
510
510
- if (queued && !stateManager .isShutdown ()) {
511
- inFlight .sent ();
512
- if (!inFlight .send ()) {
513
- logger .debug ("message not sent. Make sure resend happens " + inFlight );
514
- if (channel ().map (c -> !c .getProductID ().isReconnectEnabled ()).orElse (false )) {
515
- throwClosedExceptionOnMessage (inFlight , "connection not capable of resend" );
511
+ if (queued && !stateManager .isShutdown ()) {
512
+ inFlight .sent ();
513
+ if (!inFlight .send ()) {
514
+ logger .debug ("message not sent. Make sure resend happens " + inFlight );
515
+ if (channel ().map (c -> !c .getProductID ().isReconnectEnabled ()).orElse (false )) {
516
+ throwClosedExceptionOnMessage (inFlight , "connection not capable of resend" );
517
+ }
516
518
}
517
- }
518
- } else {
519
- throwClosedExceptionOnMessage (inFlight , "Connection closed before sending message" );
520
- }
521
- return () -> {
522
- if (inFlight .cancel ()) {
523
- inFlightMessages .remove (inFlight .getTransactionID (), inFlight );
524
- return true ;
525
519
} else {
526
- return false ;
520
+ throwClosedExceptionOnMessage ( inFlight , "Connection closed before sending message" ) ;
527
521
}
528
- };
522
+ return () -> {
523
+ if (inFlight .cancel ()) {
524
+ inFlightMessages .remove (inFlight .getTransactionID (), inFlight );
525
+ return true ;
526
+ } else {
527
+ return false ;
528
+ }
529
+ };
530
+ } catch (ConnectionClosedException e ) {
531
+ callback .sent ();
532
+ callback .failure (e );
533
+ callback .complete ();
534
+ callback .retired ();
535
+ return () -> false ;
536
+ }
529
537
}
530
538
531
539
private NetworkVoltronEntityMessage createMessageWithoutClientInstance (EntityID entityID , long version , boolean requiresReplication , byte [] config , VoltronEntityMessage .Type type , Set <VoltronEntityMessage .Acks > acks ) {
0 commit comments