@@ -53,7 +53,6 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5353
5454 private volatile ClientReceiver nativeReceiver ;
5555 private final AtomicBoolean closed = new AtomicBoolean (false );
56- private volatile Future <?> receiveLoop ;
5756 private final int initialCredits ;
5857 private final MessageHandler messageHandler ;
5958 private final Long id ;
@@ -70,6 +69,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7069 private final SessionHandler sessionHandler ;
7170 private final AtomicLong unsettledMessageCount = new AtomicLong (0 );
7271 private final Runnable replenishCreditOperation = this ::replenishCreditIfNeeded ;
72+ private final ExecutorService dispatchingExecutorService ;
73+ private final java .util .function .Consumer <Delivery > nativeHandler ;
74+ private final java .util .function .Consumer <ClientException > nativeReceiverCloseHandler ;
7375 // native receiver internal state, accessed only in the native executor/scheduler
7476 private ProtonReceiver protonReceiver ;
7577 private volatile Scheduler protonExecutor ;
@@ -96,16 +98,34 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
9698 ofNullable (builder .subscriptionListener ()).orElse (NO_OP_SUBSCRIPTION_LISTENER );
9799 this .connection = builder .connection ();
98100 this .sessionHandler = this .connection .createSessionHandler ();
101+
102+ this .dispatchingExecutorService = connection .dispatchingExecutorService ();
103+ this .nativeHandler = createNativeHandler (messageHandler );
104+ this .nativeReceiverCloseHandler =
105+ e ->
106+ this .dispatchingExecutorService .submit (
107+ () -> {
108+ // get result to make spotbugs happy
109+ boolean ignored = maybeCloseConsumerOnException (this , e );
110+ });
99111 this .nativeReceiver =
100112 this .createNativeReceiver (
101113 this .sessionHandler .session (),
102114 this .address ,
103115 this .linkProperties ,
104116 this .filters ,
105- this .subscriptionListener );
117+ this .subscriptionListener ,
118+ this .nativeHandler ,
119+ this .nativeReceiverCloseHandler );
106120 this .initStateFromNativeReceiver (this .nativeReceiver );
107121 this .metricsCollector = this .connection .metricsCollector ();
108- this .startReceivingLoop ();
122+ try {
123+ this .nativeReceiver .addCredit (this .initialCredits );
124+ } catch (ClientException e ) {
125+ AmqpException ex = ExceptionUtils .convert (e );
126+ this .close (ex );
127+ throw ex ;
128+ }
109129 this .state (OPEN );
110130 this .metricsCollector .openConsumer ();
111131 }
@@ -163,7 +183,9 @@ private ClientReceiver createNativeReceiver(
163183 String address ,
164184 Map <String , Object > properties ,
165185 Map <String , DescribedType > filters ,
166- SubscriptionListener subscriptionListener ) {
186+ SubscriptionListener subscriptionListener ,
187+ java .util .function .Consumer <Delivery > nativeHandler ,
188+ java .util .function .Consumer <ClientException > closeHandler ) {
167189 try {
168190 filters = new LinkedHashMap <>(filters );
169191 StreamOptions streamOptions = AmqpConsumerBuilder .streamOptions (filters );
@@ -173,6 +195,8 @@ private ClientReceiver createNativeReceiver(
173195 .deliveryMode (DeliveryMode .AT_LEAST_ONCE )
174196 .autoAccept (false )
175197 .autoSettle (false )
198+ .handler (nativeHandler )
199+ .closeHandler (closeHandler )
176200 .creditWindow (0 )
177201 .properties (properties );
178202 Map <String , Object > localSourceFilters = Collections .emptyMap ();
@@ -201,6 +225,37 @@ private ClientReceiver createNativeReceiver(
201225 }
202226 }
203227
228+ private java .util .function .Consumer <Delivery > createNativeHandler (MessageHandler handler ) {
229+ return delivery -> {
230+ this .unsettledMessageCount .incrementAndGet ();
231+ this .metricsCollector .consume ();
232+ this .dispatchingExecutorService .submit (
233+ () -> {
234+ AmqpMessage message ;
235+ try {
236+ message = new AmqpMessage (delivery .message ());
237+ } catch (ClientException e ) {
238+ LOGGER .warn ("Error while decoding message: {}" , e .getMessage ());
239+ try {
240+ delivery .disposition (DeliveryState .rejected ("" , "" ), true );
241+ } catch (ClientException ex ) {
242+ LOGGER .warn ("Error while rejecting non-decoded message: {}" , ex .getMessage ());
243+ }
244+ return ;
245+ }
246+ Consumer .Context context =
247+ new DeliveryContext (
248+ delivery ,
249+ this .protonExecutor ,
250+ this .metricsCollector ,
251+ this .unsettledMessageCount ,
252+ this .replenishCreditOperation ,
253+ this );
254+ handler .handle (context , message );
255+ });
256+ };
257+ }
258+
204259 private Runnable createReceiveTask (Receiver receiver , MessageHandler messageHandler ) {
205260 return () -> {
206261 try {
@@ -217,7 +272,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
217272 this .protonExecutor ,
218273 this .metricsCollector ,
219274 this .unsettledMessageCount ,
220- this .replenishCreditOperation );
275+ this .replenishCreditOperation ,
276+ this );
221277 messageHandler .handle (context , message );
222278 }
223279 }
@@ -237,11 +293,6 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
237293 };
238294 }
239295
240- private void startReceivingLoop () {
241- Runnable receiveTask = createReceiveTask (nativeReceiver , messageHandler );
242- this .receiveLoop = this .connection .environment ().consumerExecutorService ().submit (receiveTask );
243- }
244-
245296 void recoverAfterConnectionFailure () {
246297 this .nativeReceiver =
247298 RetryUtils .callAndMaybeRetry (
@@ -251,7 +302,9 @@ void recoverAfterConnectionFailure() {
251302 this .address ,
252303 this .linkProperties ,
253304 this .filters ,
254- this .subscriptionListener ),
305+ this .subscriptionListener ,
306+ this .nativeHandler ,
307+ this .nativeReceiverCloseHandler ),
255308 e -> {
256309 boolean shouldRetry =
257310 e instanceof AmqpException .AmqpResourceClosedException
@@ -267,22 +320,24 @@ void recoverAfterConnectionFailure() {
267320 this .initStateFromNativeReceiver (this .nativeReceiver );
268321 this .pauseStatus .set (PauseStatus .UNPAUSED );
269322 this .unsettledMessageCount .set (0 );
270- startReceivingLoop ();
323+ try {
324+ this .nativeReceiver .addCredit (this .initialCredits );
325+ } catch (ClientException e ) {
326+ throw ExceptionUtils .convert (e );
327+ }
271328 }
272329
273330 private void close (Throwable cause ) {
274331 if (this .closed .compareAndSet (false , true )) {
275332 this .state (CLOSING , cause );
276333 this .connection .removeConsumer (this );
277- if (this .receiveLoop != null ) {
278- this .receiveLoop .cancel (true );
279- }
280334 try {
281335 this .nativeReceiver .close ();
282336 this .sessionHandler .close ();
283337 } catch (Exception e ) {
284338 LOGGER .warn ("Error while closing receiver" , e );
285339 }
340+
286341 this .state (CLOSED , cause );
287342 this .metricsCollector .closeConsumer ();
288343 }
@@ -372,18 +427,21 @@ private static class DeliveryContext implements Consumer.Context {
372427 private final MetricsCollector metricsCollector ;
373428 private final AtomicLong unsettledMessageCount ;
374429 private final Runnable replenishCreditOperation ;
430+ private final AmqpConsumer consumer ;
375431
376432 private DeliveryContext (
377433 Delivery delivery ,
378434 Scheduler protonExecutor ,
379435 MetricsCollector metricsCollector ,
380436 AtomicLong unsettledMessageCount ,
381- Runnable replenishCreditOperation ) {
437+ Runnable replenishCreditOperation ,
438+ AmqpConsumer consumer ) {
382439 this .delivery = delivery ;
383440 this .protonExecutor = protonExecutor ;
384441 this .metricsCollector = metricsCollector ;
385442 this .unsettledMessageCount = unsettledMessageCount ;
386443 this .replenishCreditOperation = replenishCreditOperation ;
444+ this .consumer = consumer ;
387445 }
388446
389447 @ Override
@@ -394,10 +452,8 @@ public void accept() {
394452 delivery .disposition (DeliveryState .accepted (), true );
395453 unsettledMessageCount .decrementAndGet ();
396454 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .ACCEPTED );
397- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
398- LOGGER .debug ("message accept failed: {}" , e .getMessage ());
399- } catch (ClientException e ) {
400- throw ExceptionUtils .convert (e );
455+ } catch (Exception e ) {
456+ handleException (e , "accept" );
401457 }
402458 }
403459 }
@@ -410,10 +466,8 @@ public void discard() {
410466 delivery .disposition (DeliveryState .rejected ("" , "" ), true );
411467 unsettledMessageCount .decrementAndGet ();
412468 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
413- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
414- LOGGER .debug ("message discard failed: {}" , e .getMessage ());
415- } catch (ClientException e ) {
416- throw ExceptionUtils .convert (e );
469+ } catch (Exception e ) {
470+ handleException (e , "discard" );
417471 }
418472 }
419473 }
@@ -428,10 +482,8 @@ public void discard(Map<String, Object> annotations) {
428482 delivery .disposition (DeliveryState .modified (true , true , annotations ), true );
429483 unsettledMessageCount .decrementAndGet ();
430484 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
431- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
432- LOGGER .debug ("message discard (modified) failed: {}" , e .getMessage ());
433- } catch (ClientException e ) {
434- throw ExceptionUtils .convert (e );
485+ } catch (Exception e ) {
486+ handleException (e , "discard (modified)" );
435487 }
436488 }
437489 }
@@ -444,10 +496,8 @@ public void requeue() {
444496 delivery .disposition (DeliveryState .released (), true );
445497 unsettledMessageCount .decrementAndGet ();
446498 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
447- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
448- LOGGER .debug ("message requeue failed: {}" , e .getMessage ());
449- } catch (ClientException e ) {
450- throw ExceptionUtils .convert (e );
499+ } catch (Exception e ) {
500+ handleException (e , "requeue" );
451501 }
452502 }
453503 }
@@ -462,12 +512,34 @@ public void requeue(Map<String, Object> annotations) {
462512 delivery .disposition (DeliveryState .modified (false , false , annotations ), true );
463513 unsettledMessageCount .decrementAndGet ();
464514 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
465- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
466- LOGGER .debug ("message requeue (modified) failed: {}" , e .getMessage ());
467- } catch (ClientException e ) {
468- throw ExceptionUtils .convert (e );
515+ } catch (Exception e ) {
516+ handleException (e , "requeue (modified)" );
469517 }
470518 }
471519 }
520+
521+ private void handleException (Exception ex , String operation ) {
522+ if (maybeCloseConsumerOnException (this .consumer , ex )) {
523+ return ;
524+ }
525+ if (ex instanceof ClientIllegalStateException
526+ || ex instanceof RejectedExecutionException
527+ || ex instanceof ClientIOException ) {
528+ LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
529+ } else if (ex instanceof ClientException ) {
530+ throw ExceptionUtils .convert ((ClientException ) ex );
531+ }
532+ }
533+ }
534+
535+ private static boolean maybeCloseConsumerOnException (AmqpConsumer consumer , Exception ex ) {
536+ if (ex instanceof ClientLinkRemotelyClosedException ) {
537+ ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException ) ex ;
538+ if (ExceptionUtils .notFound (e ) || ExceptionUtils .resourceDeleted (e )) {
539+ consumer .close (ExceptionUtils .convert (e ));
540+ return true ;
541+ }
542+ }
543+ return false ;
472544 }
473545}
0 commit comments