@@ -53,7 +53,6 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5353
5454 private volatile ClientReceiver nativeReceiver ;
5555 private final AtomicBoolean closed = new AtomicBoolean (false );
56- private volatile Future <?> receiveLoop ;
5756 private final int initialCredits ;
5857 private final MessageHandler messageHandler ;
5958 private final Long id ;
@@ -70,6 +69,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7069 private final SessionHandler sessionHandler ;
7170 private final AtomicLong unsettledMessageCount = new AtomicLong (0 );
7271 private final Runnable replenishCreditOperation = this ::replenishCreditIfNeeded ;
72+ private final ExecutorService executorService = Executors .newSingleThreadExecutor ();
73+ private final java .util .function .Consumer <Delivery > nativeHandler ;
74+ private final java .util .function .Consumer <ClientException > nativeReceiverCloseHandler ;
7375 // native receiver internal state, accessed only in the native executor/scheduler
7476 private ProtonReceiver protonReceiver ;
7577 private volatile Scheduler protonExecutor ;
@@ -96,16 +98,33 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
9698 ofNullable (builder .subscriptionListener ()).orElse (NO_OP_SUBSCRIPTION_LISTENER );
9799 this .connection = builder .connection ();
98100 this .sessionHandler = this .connection .createSessionHandler ();
101+
102+ this .nativeHandler = createNativeHandler (messageHandler );
103+ this .nativeReceiverCloseHandler =
104+ e ->
105+ this .executorService .submit (
106+ () -> {
107+ // get result to make spotbugs happy
108+ boolean ignored = maybeCloseConsumerOnException (this , e );
109+ });
99110 this .nativeReceiver =
100111 this .createNativeReceiver (
101112 this .sessionHandler .session (),
102113 this .address ,
103114 this .linkProperties ,
104115 this .filters ,
105- this .subscriptionListener );
116+ this .subscriptionListener ,
117+ this .nativeHandler ,
118+ this .nativeReceiverCloseHandler );
106119 this .initStateFromNativeReceiver (this .nativeReceiver );
107120 this .metricsCollector = this .connection .metricsCollector ();
108- this .startReceivingLoop ();
121+ try {
122+ this .nativeReceiver .addCredit (this .initialCredits );
123+ } catch (ClientException e ) {
124+ AmqpException ex = ExceptionUtils .convert (e );
125+ this .close (ex );
126+ throw ex ;
127+ }
109128 this .state (OPEN );
110129 this .metricsCollector .openConsumer ();
111130 }
@@ -163,7 +182,9 @@ private ClientReceiver createNativeReceiver(
163182 String address ,
164183 Map <String , Object > properties ,
165184 Map <String , DescribedType > filters ,
166- SubscriptionListener subscriptionListener ) {
185+ SubscriptionListener subscriptionListener ,
186+ java .util .function .Consumer <Delivery > nativeHandler ,
187+ java .util .function .Consumer <ClientException > closeHandler ) {
167188 try {
168189 filters = new LinkedHashMap <>(filters );
169190 StreamOptions streamOptions = AmqpConsumerBuilder .streamOptions (filters );
@@ -173,6 +194,8 @@ private ClientReceiver createNativeReceiver(
173194 .deliveryMode (DeliveryMode .AT_LEAST_ONCE )
174195 .autoAccept (false )
175196 .autoSettle (false )
197+ .handler (nativeHandler )
198+ .closeHandler (closeHandler )
176199 .creditWindow (0 )
177200 .properties (properties );
178201 Map <String , Object > localSourceFilters = Collections .emptyMap ();
@@ -201,6 +224,37 @@ private ClientReceiver createNativeReceiver(
201224 }
202225 }
203226
227+ private java .util .function .Consumer <Delivery > createNativeHandler (MessageHandler handler ) {
228+ return delivery -> {
229+ this .unsettledMessageCount .incrementAndGet ();
230+ this .metricsCollector .consume ();
231+ this .executorService .submit (
232+ () -> {
233+ AmqpMessage message ;
234+ try {
235+ message = new AmqpMessage (delivery .message ());
236+ } catch (ClientException e ) {
237+ LOGGER .warn ("Error while decoding message: {}" , e .getMessage ());
238+ try {
239+ delivery .disposition (DeliveryState .rejected ("" , "" ), true );
240+ } catch (ClientException ex ) {
241+ LOGGER .warn ("Error while rejecting non-decoded message: {}" , ex .getMessage ());
242+ }
243+ return ;
244+ }
245+ Consumer .Context context =
246+ new DeliveryContext (
247+ delivery ,
248+ this .protonExecutor ,
249+ this .metricsCollector ,
250+ this .unsettledMessageCount ,
251+ this .replenishCreditOperation ,
252+ this );
253+ handler .handle (context , message );
254+ });
255+ };
256+ }
257+
204258 private Runnable createReceiveTask (Receiver receiver , MessageHandler messageHandler ) {
205259 return () -> {
206260 try {
@@ -217,7 +271,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
217271 this .protonExecutor ,
218272 this .metricsCollector ,
219273 this .unsettledMessageCount ,
220- this .replenishCreditOperation );
274+ this .replenishCreditOperation ,
275+ this );
221276 messageHandler .handle (context , message );
222277 }
223278 }
@@ -237,11 +292,6 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
237292 };
238293 }
239294
240- private void startReceivingLoop () {
241- Runnable receiveTask = createReceiveTask (nativeReceiver , messageHandler );
242- this .receiveLoop = this .connection .environment ().consumerExecutorService ().submit (receiveTask );
243- }
244-
245295 void recoverAfterConnectionFailure () {
246296 this .nativeReceiver =
247297 RetryUtils .callAndMaybeRetry (
@@ -251,7 +301,9 @@ void recoverAfterConnectionFailure() {
251301 this .address ,
252302 this .linkProperties ,
253303 this .filters ,
254- this .subscriptionListener ),
304+ this .subscriptionListener ,
305+ this .nativeHandler ,
306+ this .nativeReceiverCloseHandler ),
255307 e -> {
256308 boolean shouldRetry =
257309 e instanceof AmqpException .AmqpResourceClosedException
@@ -267,22 +319,29 @@ void recoverAfterConnectionFailure() {
267319 this .initStateFromNativeReceiver (this .nativeReceiver );
268320 this .pauseStatus .set (PauseStatus .UNPAUSED );
269321 this .unsettledMessageCount .set (0 );
270- startReceivingLoop ();
322+ try {
323+ this .nativeReceiver .addCredit (this .initialCredits );
324+ } catch (ClientException e ) {
325+ throw ExceptionUtils .convert (e );
326+ }
271327 }
272328
273329 private void close (Throwable cause ) {
274330 if (this .closed .compareAndSet (false , true )) {
275331 this .state (CLOSING , cause );
276332 this .connection .removeConsumer (this );
277- if (this .receiveLoop != null ) {
278- this .receiveLoop .cancel (true );
333+ try {
334+ this .executorService .shutdownNow ();
335+ } catch (Exception e ) {
336+ LOGGER .warn ("Error while closing consumer executor service" );
279337 }
280338 try {
281339 this .nativeReceiver .close ();
282340 this .sessionHandler .close ();
283341 } catch (Exception e ) {
284342 LOGGER .warn ("Error while closing receiver" , e );
285343 }
344+
286345 this .state (CLOSED , cause );
287346 this .metricsCollector .closeConsumer ();
288347 }
@@ -372,18 +431,21 @@ private static class DeliveryContext implements Consumer.Context {
372431 private final MetricsCollector metricsCollector ;
373432 private final AtomicLong unsettledMessageCount ;
374433 private final Runnable replenishCreditOperation ;
434+ private final AmqpConsumer consumer ;
375435
376436 private DeliveryContext (
377437 Delivery delivery ,
378438 Scheduler protonExecutor ,
379439 MetricsCollector metricsCollector ,
380440 AtomicLong unsettledMessageCount ,
381- Runnable replenishCreditOperation ) {
441+ Runnable replenishCreditOperation ,
442+ AmqpConsumer consumer ) {
382443 this .delivery = delivery ;
383444 this .protonExecutor = protonExecutor ;
384445 this .metricsCollector = metricsCollector ;
385446 this .unsettledMessageCount = unsettledMessageCount ;
386447 this .replenishCreditOperation = replenishCreditOperation ;
448+ this .consumer = consumer ;
387449 }
388450
389451 @ Override
@@ -394,10 +456,8 @@ public void accept() {
394456 delivery .disposition (DeliveryState .accepted (), true );
395457 unsettledMessageCount .decrementAndGet ();
396458 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .ACCEPTED );
397- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
398- LOGGER .debug ("message accept failed: {}" , e .getMessage ());
399- } catch (ClientException e ) {
400- throw ExceptionUtils .convert (e );
459+ } catch (Exception e ) {
460+ handleException (e , "accept" );
401461 }
402462 }
403463 }
@@ -410,10 +470,8 @@ public void discard() {
410470 delivery .disposition (DeliveryState .rejected ("" , "" ), true );
411471 unsettledMessageCount .decrementAndGet ();
412472 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
413- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
414- LOGGER .debug ("message discard failed: {}" , e .getMessage ());
415- } catch (ClientException e ) {
416- throw ExceptionUtils .convert (e );
473+ } catch (Exception e ) {
474+ handleException (e , "discard" );
417475 }
418476 }
419477 }
@@ -428,10 +486,8 @@ public void discard(Map<String, Object> annotations) {
428486 delivery .disposition (DeliveryState .modified (true , true , annotations ), true );
429487 unsettledMessageCount .decrementAndGet ();
430488 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
431- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
432- LOGGER .debug ("message discard (modified) failed: {}" , e .getMessage ());
433- } catch (ClientException e ) {
434- throw ExceptionUtils .convert (e );
489+ } catch (Exception e ) {
490+ handleException (e , "discard (modified)" );
435491 }
436492 }
437493 }
@@ -444,10 +500,8 @@ public void requeue() {
444500 delivery .disposition (DeliveryState .released (), true );
445501 unsettledMessageCount .decrementAndGet ();
446502 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
447- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
448- LOGGER .debug ("message requeue failed: {}" , e .getMessage ());
449- } catch (ClientException e ) {
450- throw ExceptionUtils .convert (e );
503+ } catch (Exception e ) {
504+ handleException (e , "requeue" );
451505 }
452506 }
453507 }
@@ -462,12 +516,34 @@ public void requeue(Map<String, Object> annotations) {
462516 delivery .disposition (DeliveryState .modified (false , false , annotations ), true );
463517 unsettledMessageCount .decrementAndGet ();
464518 metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
465- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
466- LOGGER .debug ("message requeue (modified) failed: {}" , e .getMessage ());
467- } catch (ClientException e ) {
468- throw ExceptionUtils .convert (e );
519+ } catch (Exception e ) {
520+ handleException (e , "requeue (modified)" );
469521 }
470522 }
471523 }
524+
525+ private void handleException (Exception ex , String operation ) {
526+ if (maybeCloseConsumerOnException (this .consumer , ex )) {
527+ return ;
528+ }
529+ if (ex instanceof ClientIllegalStateException
530+ || ex instanceof RejectedExecutionException
531+ || ex instanceof ClientIOException ) {
532+ LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
533+ } else if (ex instanceof ClientException ) {
534+ throw ExceptionUtils .convert ((ClientException ) ex );
535+ }
536+ }
537+ }
538+
539+ private static boolean maybeCloseConsumerOnException (AmqpConsumer consumer , Exception ex ) {
540+ if (ex instanceof ClientLinkRemotelyClosedException ) {
541+ ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException ) ex ;
542+ if (ExceptionUtils .notFound (e ) || ExceptionUtils .resourceDeleted (e )) {
543+ consumer .close (ExceptionUtils .convert (e ));
544+ return true ;
545+ }
546+ }
547+ return false ;
472548 }
473549}
0 commit comments