1919
2020import static com .rabbitmq .client .amqp .Resource .State .*;
2121import static com .rabbitmq .client .amqp .impl .AmqpConsumerBuilder .*;
22- import static com .rabbitmq .client .amqp .impl . ExceptionUtils .*;
22+ import static com .rabbitmq .client .amqp .metrics . MetricsCollector . ConsumeDisposition .*;
2323import static java .time .Duration .ofSeconds ;
2424import static java .util .Optional .ofNullable ;
2525
3333import java .util .concurrent .atomic .AtomicBoolean ;
3434import java .util .concurrent .atomic .AtomicLong ;
3535import java .util .concurrent .atomic .AtomicReference ;
36+ import java .util .stream .IntStream ;
3637import org .apache .qpid .protonj2 .client .*;
3738import org .apache .qpid .protonj2 .client .exceptions .*;
39+ import org .apache .qpid .protonj2 .client .impl .ClientConversionSupport ;
3840import org .apache .qpid .protonj2 .client .impl .ClientReceiver ;
3941import org .apache .qpid .protonj2 .client .util .DeliveryQueue ;
4042import org .apache .qpid .protonj2 .engine .EventHandler ;
4345import org .apache .qpid .protonj2 .engine .impl .ProtonReceiver ;
4446import org .apache .qpid .protonj2 .engine .impl .ProtonSessionIncomingWindow ;
4547import org .apache .qpid .protonj2 .types .DescribedType ;
48+ import org .apache .qpid .protonj2 .types .messaging .Accepted ;
49+ import org .apache .qpid .protonj2 .types .messaging .Modified ;
50+ import org .apache .qpid .protonj2 .types .messaging .Rejected ;
51+ import org .apache .qpid .protonj2 .types .messaging .Released ;
4652import org .slf4j .Logger ;
4753import org .slf4j .LoggerFactory ;
4854
@@ -247,6 +253,7 @@ private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler
247253 new DeliveryContext (
248254 delivery ,
249255 this .protonExecutor ,
256+ this .protonReceiver ,
250257 this .metricsCollector ,
251258 this .unsettledMessageCount ,
252259 this .replenishCreditOperation ,
@@ -382,9 +389,11 @@ enum PauseStatus {
382389
383390 private static class DeliveryContext implements Consumer .Context {
384391
392+ private static final DeliveryState REJECTED = DeliveryState .rejected (null , null );
385393 private final AtomicBoolean settled = new AtomicBoolean (false );
386394 private final Delivery delivery ;
387395 private final Scheduler protonExecutor ;
396+ private final ProtonReceiver protonReceiver ;
388397 private final MetricsCollector metricsCollector ;
389398 private final AtomicLong unsettledMessageCount ;
390399 private final Runnable replenishCreditOperation ;
@@ -393,12 +402,14 @@ private static class DeliveryContext implements Consumer.Context {
393402 private DeliveryContext (
394403 Delivery delivery ,
395404 Scheduler protonExecutor ,
405+ ProtonReceiver protonReceiver ,
396406 MetricsCollector metricsCollector ,
397407 AtomicLong unsettledMessageCount ,
398408 Runnable replenishCreditOperation ,
399409 AmqpConsumer consumer ) {
400410 this .delivery = delivery ;
401411 this .protonExecutor = protonExecutor ;
412+ this .protonReceiver = protonReceiver ;
402413 this .metricsCollector = metricsCollector ;
403414 this .unsettledMessageCount = unsettledMessageCount ;
404415 this .replenishCreditOperation = replenishCreditOperation ;
@@ -407,95 +418,196 @@ private DeliveryContext(
407418
408419 @ Override
409420 public void accept () {
410- if (settled .compareAndSet (false , true )) {
411- try {
412- protonExecutor .execute (replenishCreditOperation );
413- delivery .disposition (DeliveryState .accepted (), true );
414- unsettledMessageCount .decrementAndGet ();
415- metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .ACCEPTED );
416- } catch (Exception e ) {
417- handleException (e , "accept" );
418- }
419- }
421+ this .settle (DeliveryState .accepted (), ACCEPTED , "accept" );
420422 }
421423
422424 @ Override
423425 public void discard () {
424- if (settled .compareAndSet (false , true )) {
425- try {
426- protonExecutor .execute (replenishCreditOperation );
427- delivery .disposition (DeliveryState .rejected ("" , "" ), true );
428- unsettledMessageCount .decrementAndGet ();
429- metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
430- } catch (Exception e ) {
431- handleException (e , "discard" );
432- }
433- }
426+ settle (REJECTED , DISCARDED , "discard" );
434427 }
435428
436429 @ Override
437430 public void discard (Map <String , Object > annotations ) {
431+ annotations = annotations == null ? Collections .emptyMap () : annotations ;
432+ Utils .checkMessageAnnotations (annotations );
433+ this .settle (DeliveryState .modified (true , true , annotations ), DISCARDED , "discard (modified)" );
434+ }
435+
436+ @ Override
437+ public void requeue () {
438+ settle (DeliveryState .released (), REQUEUED , "requeue" );
439+ }
440+
441+ @ Override
442+ public void requeue (Map <String , Object > annotations ) {
443+ annotations = annotations == null ? Collections .emptyMap () : annotations ;
444+ Utils .checkMessageAnnotations (annotations );
445+ this .settle (
446+ DeliveryState .modified (false , false , annotations ), REQUEUED , "requeue (modified)" );
447+ }
448+
449+ @ Override
450+ public BatchContext batch () {
451+ return new BatchDeliveryContext (
452+ protonExecutor ,
453+ protonReceiver ,
454+ metricsCollector ,
455+ unsettledMessageCount ,
456+ replenishCreditOperation ,
457+ consumer );
458+ }
459+
460+ private void settle (
461+ DeliveryState state , MetricsCollector .ConsumeDisposition disposition , String label ) {
438462 if (settled .compareAndSet (false , true )) {
439463 try {
440- annotations = annotations == null ? Collections .emptyMap () : annotations ;
441- Utils .checkMessageAnnotations (annotations );
442464 protonExecutor .execute (replenishCreditOperation );
443- delivery .disposition (DeliveryState . modified ( true , true , annotations ) , true );
465+ delivery .disposition (state , true );
444466 unsettledMessageCount .decrementAndGet ();
445- metricsCollector .consumeDisposition (MetricsCollector . ConsumeDisposition . DISCARDED );
467+ metricsCollector .consumeDisposition (disposition );
446468 } catch (Exception e ) {
447- handleException ( e , "discard (modified)" );
469+ handleContextException ( this . consumer , e , label );
448470 }
449471 }
450472 }
473+ }
474+
475+ @ Override
476+ public String toString () {
477+ return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}' ;
478+ }
479+
480+ private static final class BatchDeliveryContext implements BatchContext {
481+
482+ private static final org .apache .qpid .protonj2 .types .transport .DeliveryState REJECTED =
483+ new Rejected ();
484+ private final List <DeliveryContext > contexts = new ArrayList <>();
485+ private final AtomicBoolean settled = new AtomicBoolean (false );
486+ private final Scheduler protonExecutor ;
487+ private final ProtonReceiver protonReceiver ;
488+ private final MetricsCollector metricsCollector ;
489+ private final AtomicLong unsettledMessageCount ;
490+ private final Runnable replenishCreditOperation ;
491+ private final AmqpConsumer consumer ;
492+
493+ private BatchDeliveryContext (
494+ Scheduler protonExecutor ,
495+ ProtonReceiver protonReceiver ,
496+ MetricsCollector metricsCollector ,
497+ AtomicLong unsettledMessageCount ,
498+ Runnable replenishCreditOperation ,
499+ AmqpConsumer consumer ) {
500+ this .protonExecutor = protonExecutor ;
501+ this .protonReceiver = protonReceiver ;
502+ this .metricsCollector = metricsCollector ;
503+ this .unsettledMessageCount = unsettledMessageCount ;
504+ this .replenishCreditOperation = replenishCreditOperation ;
505+ this .consumer = consumer ;
506+ }
451507
452508 @ Override
453- public void requeue () {
454- if (settled .compareAndSet (false , true )) {
455- try {
456- protonExecutor .execute (replenishCreditOperation );
457- delivery .disposition (DeliveryState .released (), true );
458- unsettledMessageCount .decrementAndGet ();
459- metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
460- } catch (Exception e ) {
461- handleException (e , "requeue" );
509+ public void add (Consumer .Context context ) {
510+ if (this .settled .get ()) {
511+ throw new IllegalStateException ("Batch is closed" );
512+ } else {
513+ if (context instanceof DeliveryContext ) {
514+ DeliveryContext dctx = (DeliveryContext ) context ;
515+ // marking the context as settled avoids operation on it and deduplicates as well
516+ if (dctx .settled .compareAndSet (false , true )) {
517+ this .contexts .add (dctx );
518+ } else {
519+ throw new IllegalStateException ("Message already settled" );
520+ }
521+ } else {
522+ throw new IllegalArgumentException ("Context type not supported: " + context );
462523 }
463524 }
464525 }
465526
527+ @ Override
528+ public int size () {
529+ return this .contexts .size ();
530+ }
531+
532+ @ Override
533+ public void accept () {
534+ this .settle (Accepted .getInstance (), ACCEPTED , "accept" );
535+ }
536+
537+ @ Override
538+ public void discard () {
539+ this .settle (REJECTED , DISCARDED , "discard" );
540+ }
541+
542+ @ Override
543+ public void discard (Map <String , Object > annotations ) {
544+ annotations = annotations == null ? Collections .emptyMap () : annotations ;
545+ Utils .checkMessageAnnotations (annotations );
546+ Modified state =
547+ new Modified (false , true , ClientConversionSupport .toSymbolKeyedMap (annotations ));
548+ this .settle (state , DISCARDED , "discard (modified)" );
549+ }
550+
551+ @ Override
552+ public void requeue () {
553+ this .settle (Released .getInstance (), REQUEUED , "requeue" );
554+ }
555+
466556 @ Override
467557 public void requeue (Map <String , Object > annotations ) {
558+ annotations = annotations == null ? Collections .emptyMap () : annotations ;
559+ Utils .checkMessageAnnotations (annotations );
560+ Modified state =
561+ new Modified (false , false , ClientConversionSupport .toSymbolKeyedMap (annotations ));
562+ this .settle (state , REQUEUED , "requeue (modified)" );
563+ }
564+
565+ @ Override
566+ public BatchContext batch () {
567+ return this ;
568+ }
569+
570+ private void settle (
571+ org .apache .qpid .protonj2 .types .transport .DeliveryState state ,
572+ MetricsCollector .ConsumeDisposition disposition ,
573+ String label ) {
468574 if (settled .compareAndSet (false , true )) {
575+ int batchSize = this .contexts .size ();
469576 try {
470- annotations = annotations == null ? Collections .emptyMap () : annotations ;
471- Utils .checkMessageAnnotations (annotations );
472577 protonExecutor .execute (replenishCreditOperation );
473- delivery .disposition (DeliveryState .modified (false , false , annotations ), true );
474- unsettledMessageCount .decrementAndGet ();
475- metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
578+ long [][] ranges =
579+ SerialNumberUtils .ranges (this .contexts , ctx -> ctx .delivery .getDeliveryId ());
580+ this .protonExecutor .execute (
581+ () -> {
582+ for (long [] range : ranges ) {
583+ this .protonReceiver .disposition (state , range );
584+ }
585+ });
586+ unsettledMessageCount .addAndGet (-batchSize );
587+ IntStream .range (0 , batchSize )
588+ .forEach (
589+ ignored -> {
590+ metricsCollector .consumeDisposition (disposition );
591+ });
476592 } catch (Exception e ) {
477- handleException ( e , "requeue (modified)" );
593+ handleContextException ( this . consumer , e , label );
478594 }
479595 }
480596 }
481-
482- private void handleException (Exception ex , String operation ) {
483- if (maybeCloseConsumerOnException (this .consumer , ex )) {
484- return ;
485- }
486- if (ex instanceof ClientIllegalStateException
487- || ex instanceof RejectedExecutionException
488- || ex instanceof ClientIOException ) {
489- LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
490- } else if (ex instanceof ClientException ) {
491- throw ExceptionUtils .convert ((ClientException ) ex );
492- }
493- }
494597 }
495598
496- @ Override
497- public String toString () {
498- return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}' ;
599+ private static void handleContextException (
600+ AmqpConsumer consumer , Exception ex , String operation ) {
601+ if (maybeCloseConsumerOnException (consumer , ex )) {
602+ return ;
603+ }
604+ if (ex instanceof ClientIllegalStateException
605+ || ex instanceof RejectedExecutionException
606+ || ex instanceof ClientIOException ) {
607+ LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
608+ } else if (ex instanceof ClientException ) {
609+ throw ExceptionUtils .convert ((ClientException ) ex );
610+ }
499611 }
500612
501613 private static boolean maybeCloseConsumerOnException (AmqpConsumer consumer , Exception ex ) {
0 commit comments