annotations);
+
+ /**
+ * Create a batch context to accumulate message contexts and settle them at once.
+ *
+ * The message context the batch context is created from is not added to the batch
+ * context.
+ *
+ * @return the created batch context
+ */
+ BatchContext batch(int batchSizeHint);
+ }
+
+ /**
+ * Context to accumulate message contexts and settle them at once.
+ *
+ *
A {@link BatchContext} is also a {@link Context}: the same methods are available to settle
+ * the messages.
+ *
+ *
Only "simple" (not batch) message contexts can be added to a batch context. Calling {@link
+ * Context#batch()} on a batch context returns the instance itself.
+ *
+ * @see AMQP
+ * 1.0 Disposition performative
+ */
+ interface BatchContext extends Context {
+
+ /**
+ * Add a message context to the batch context.
+ *
+ * @param context the message context to add
+ */
+ void add(Context context);
+
+ /**
+ * Get the current number of message contexts in the batch context.
+ *
+ * @return current number of message contexts in the batch
+ */
+ int size();
}
}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
index e805fb494..5b1b73425 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
@@ -19,7 +19,7 @@
import static com.rabbitmq.client.amqp.Resource.State.*;
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
-import static com.rabbitmq.client.amqp.impl.ExceptionUtils.*;
+import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.*;
import static java.time.Duration.ofSeconds;
import static java.util.Optional.ofNullable;
@@ -33,8 +33,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
import org.apache.qpid.protonj2.client.*;
import org.apache.qpid.protonj2.client.exceptions.*;
+import org.apache.qpid.protonj2.client.impl.ClientConversionSupport;
import org.apache.qpid.protonj2.client.impl.ClientReceiver;
import org.apache.qpid.protonj2.client.util.DeliveryQueue;
import org.apache.qpid.protonj2.engine.EventHandler;
@@ -43,6 +45,10 @@
import org.apache.qpid.protonj2.engine.impl.ProtonReceiver;
import org.apache.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow;
import org.apache.qpid.protonj2.types.DescribedType;
+import org.apache.qpid.protonj2.types.messaging.Accepted;
+import org.apache.qpid.protonj2.types.messaging.Modified;
+import org.apache.qpid.protonj2.types.messaging.Rejected;
+import org.apache.qpid.protonj2.types.messaging.Released;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -247,6 +253,7 @@ private java.util.function.Consumer createNativeHandler(MessageHandler
new DeliveryContext(
delivery,
this.protonExecutor,
+ this.protonReceiver,
this.metricsCollector,
this.unsettledMessageCount,
this.replenishCreditOperation,
@@ -382,9 +389,11 @@ enum PauseStatus {
private static class DeliveryContext implements Consumer.Context {
+ private static final DeliveryState REJECTED = DeliveryState.rejected(null, null);
private final AtomicBoolean settled = new AtomicBoolean(false);
private final Delivery delivery;
private final Scheduler protonExecutor;
+ private final ProtonReceiver protonReceiver;
private final MetricsCollector metricsCollector;
private final AtomicLong unsettledMessageCount;
private final Runnable replenishCreditOperation;
@@ -393,12 +402,14 @@ private static class DeliveryContext implements Consumer.Context {
private DeliveryContext(
Delivery delivery,
Scheduler protonExecutor,
+ ProtonReceiver protonReceiver,
MetricsCollector metricsCollector,
AtomicLong unsettledMessageCount,
Runnable replenishCreditOperation,
AmqpConsumer consumer) {
this.delivery = delivery;
this.protonExecutor = protonExecutor;
+ this.protonReceiver = protonReceiver;
this.metricsCollector = metricsCollector;
this.unsettledMessageCount = unsettledMessageCount;
this.replenishCreditOperation = replenishCreditOperation;
@@ -407,95 +418,199 @@ private DeliveryContext(
@Override
public void accept() {
- if (settled.compareAndSet(false, true)) {
- try {
- protonExecutor.execute(replenishCreditOperation);
- delivery.disposition(DeliveryState.accepted(), true);
- unsettledMessageCount.decrementAndGet();
- metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED);
- } catch (Exception e) {
- handleException(e, "accept");
- }
- }
+ this.settle(DeliveryState.accepted(), ACCEPTED, "accept");
}
@Override
public void discard() {
- if (settled.compareAndSet(false, true)) {
- try {
- protonExecutor.execute(replenishCreditOperation);
- delivery.disposition(DeliveryState.rejected("", ""), true);
- unsettledMessageCount.decrementAndGet();
- metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
- } catch (Exception e) {
- handleException(e, "discard");
- }
- }
+ settle(REJECTED, DISCARDED, "discard");
}
@Override
public void discard(Map annotations) {
+ annotations = annotations == null ? Collections.emptyMap() : annotations;
+ Utils.checkMessageAnnotations(annotations);
+ this.settle(DeliveryState.modified(true, true, annotations), DISCARDED, "discard (modified)");
+ }
+
+ @Override
+ public void requeue() {
+ settle(DeliveryState.released(), REQUEUED, "requeue");
+ }
+
+ @Override
+ public void requeue(Map annotations) {
+ annotations = annotations == null ? Collections.emptyMap() : annotations;
+ Utils.checkMessageAnnotations(annotations);
+ this.settle(
+ DeliveryState.modified(false, false, annotations), REQUEUED, "requeue (modified)");
+ }
+
+ @Override
+ public BatchContext batch(int batchSizeHint) {
+ return new BatchDeliveryContext(
+ batchSizeHint,
+ protonExecutor,
+ protonReceiver,
+ metricsCollector,
+ unsettledMessageCount,
+ replenishCreditOperation,
+ consumer);
+ }
+
+ private void settle(
+ DeliveryState state, MetricsCollector.ConsumeDisposition disposition, String label) {
if (settled.compareAndSet(false, true)) {
try {
- annotations = annotations == null ? Collections.emptyMap() : annotations;
- Utils.checkMessageAnnotations(annotations);
protonExecutor.execute(replenishCreditOperation);
- delivery.disposition(DeliveryState.modified(true, true, annotations), true);
+ delivery.disposition(state, true);
unsettledMessageCount.decrementAndGet();
- metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
+ metricsCollector.consumeDisposition(disposition);
} catch (Exception e) {
- handleException(e, "discard (modified)");
+ handleContextException(this.consumer, e, label);
}
}
}
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}';
+ }
+
+ private static final class BatchDeliveryContext implements BatchContext {
+
+ private static final org.apache.qpid.protonj2.types.transport.DeliveryState REJECTED =
+ new Rejected();
+ private final List contexts;
+ private final AtomicBoolean settled = new AtomicBoolean(false);
+ private final Scheduler protonExecutor;
+ private final ProtonReceiver protonReceiver;
+ private final MetricsCollector metricsCollector;
+ private final AtomicLong unsettledMessageCount;
+ private final Runnable replenishCreditOperation;
+ private final AmqpConsumer consumer;
+
+ private BatchDeliveryContext(
+ int batchSizeHint,
+ Scheduler protonExecutor,
+ ProtonReceiver protonReceiver,
+ MetricsCollector metricsCollector,
+ AtomicLong unsettledMessageCount,
+ Runnable replenishCreditOperation,
+ AmqpConsumer consumer) {
+ this.contexts = new ArrayList<>(batchSizeHint);
+ this.protonExecutor = protonExecutor;
+ this.protonReceiver = protonReceiver;
+ this.metricsCollector = metricsCollector;
+ this.unsettledMessageCount = unsettledMessageCount;
+ this.replenishCreditOperation = replenishCreditOperation;
+ this.consumer = consumer;
+ }
@Override
- public void requeue() {
- if (settled.compareAndSet(false, true)) {
- try {
- protonExecutor.execute(replenishCreditOperation);
- delivery.disposition(DeliveryState.released(), true);
- unsettledMessageCount.decrementAndGet();
- metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
- } catch (Exception e) {
- handleException(e, "requeue");
+ public void add(Consumer.Context context) {
+ if (this.settled.get()) {
+ throw new IllegalStateException("Batch is closed");
+ } else {
+ if (context instanceof DeliveryContext) {
+ DeliveryContext dctx = (DeliveryContext) context;
+ // marking the context as settled avoids operation on it and deduplicates as well
+ if (dctx.settled.compareAndSet(false, true)) {
+ this.contexts.add(dctx);
+ } else {
+ throw new IllegalStateException("Message already settled");
+ }
+ } else {
+ throw new IllegalArgumentException("Context type not supported: " + context);
}
}
}
+ @Override
+ public int size() {
+ return this.contexts.size();
+ }
+
+ @Override
+ public void accept() {
+ this.settle(Accepted.getInstance(), ACCEPTED, "accept");
+ }
+
+ @Override
+ public void discard() {
+ this.settle(REJECTED, DISCARDED, "discard");
+ }
+
+ @Override
+ public void discard(Map annotations) {
+ annotations = annotations == null ? Collections.emptyMap() : annotations;
+ Utils.checkMessageAnnotations(annotations);
+ Modified state =
+ new Modified(false, true, ClientConversionSupport.toSymbolKeyedMap(annotations));
+ this.settle(state, DISCARDED, "discard (modified)");
+ }
+
+ @Override
+ public void requeue() {
+ this.settle(Released.getInstance(), REQUEUED, "requeue");
+ }
+
@Override
public void requeue(Map annotations) {
+ annotations = annotations == null ? Collections.emptyMap() : annotations;
+ Utils.checkMessageAnnotations(annotations);
+ Modified state =
+ new Modified(false, false, ClientConversionSupport.toSymbolKeyedMap(annotations));
+ this.settle(state, REQUEUED, "requeue (modified)");
+ }
+
+ @Override
+ public BatchContext batch(int batchSizeHint) {
+ return this;
+ }
+
+ private void settle(
+ org.apache.qpid.protonj2.types.transport.DeliveryState state,
+ MetricsCollector.ConsumeDisposition disposition,
+ String label) {
if (settled.compareAndSet(false, true)) {
+ int batchSize = this.contexts.size();
try {
- annotations = annotations == null ? Collections.emptyMap() : annotations;
- Utils.checkMessageAnnotations(annotations);
protonExecutor.execute(replenishCreditOperation);
- delivery.disposition(DeliveryState.modified(false, false, annotations), true);
- unsettledMessageCount.decrementAndGet();
- metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
+ long[][] ranges =
+ SerialNumberUtils.ranges(this.contexts, ctx -> ctx.delivery.getDeliveryId());
+ this.protonExecutor.execute(
+ () -> {
+ for (long[] range : ranges) {
+ this.protonReceiver.disposition(state, range);
+ }
+ });
+ unsettledMessageCount.addAndGet(-batchSize);
+ IntStream.range(0, batchSize)
+ .forEach(
+ ignored -> {
+ metricsCollector.consumeDisposition(disposition);
+ });
} catch (Exception e) {
- handleException(e, "requeue (modified)");
+ handleContextException(this.consumer, e, label);
}
}
}
-
- private void handleException(Exception ex, String operation) {
- if (maybeCloseConsumerOnException(this.consumer, ex)) {
- return;
- }
- if (ex instanceof ClientIllegalStateException
- || ex instanceof RejectedExecutionException
- || ex instanceof ClientIOException) {
- LOGGER.debug("message {} failed: {}", operation, ex.getMessage());
- } else if (ex instanceof ClientException) {
- throw ExceptionUtils.convert((ClientException) ex);
- }
- }
}
- @Override
- public String toString() {
- return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}';
+ private static void handleContextException(
+ AmqpConsumer consumer, Exception ex, String operation) {
+ if (maybeCloseConsumerOnException(consumer, ex)) {
+ return;
+ }
+ if (ex instanceof ClientIllegalStateException
+ || ex instanceof RejectedExecutionException
+ || ex instanceof ClientIOException) {
+ LOGGER.debug("message {} failed: {}", operation, ex.getMessage());
+ } else if (ex instanceof ClientException) {
+ throw ExceptionUtils.convert((ClientException) ex);
+ }
}
private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilArrays.java b/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilArrays.java
new file mode 100644
index 000000000..fb968b957
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilArrays.java
@@ -0,0 +1,472 @@
+/*
+ * Copyright (C) 2002-2024 Sebastiano Vigna
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.rabbitmq.client.amqp.impl;
+
+import java.util.ArrayList;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveAction;
+
+
+/** A class providing static methods and objects that do useful things with arrays.
+ *
+ * In addition to commodity methods, this class contains {@link FastUtilSwapper}-based implementations
+ * of {@linkplain #quickSort(int, int, FastUtilIntComparator, FastUtilSwapper) quicksort} and of
+ * a stable, in-place {@linkplain #mergeSort(int, int, FastUtilIntComparator, FastUtilSwapper) mergesort}. These
+ * generic sorting methods can be used to sort any kind of list, but they find their natural
+ * usage, for instance, in sorting arrays in parallel.
+ *
+ *
Some algorithms provide a parallel version that will by default use the
+ * {@linkplain ForkJoinPool#commonPool() common pool}, but this can be overridden by calling the
+ * function in a task already in the {@link ForkJoinPool} that the operation should run in. For example,
+ * something along the lines of "{@code poolToParallelSortIn.invoke(() -> parallelQuickSort(arrayToSort))}"
+ * will run the parallel sort in {@code poolToParallelSortIn} instead of the default pool.
+ *
+ * @see Arrays
+ */
+
+final class FastUtilArrays {
+
+ private FastUtilArrays() {}
+
+ /** This is a safe value used by {@link ArrayList} (as of Java 7) to avoid
+ * throwing {@link OutOfMemoryError} on some JVMs. We adopt the same value. */
+ public static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ /**
+ * Ensures that a range given by its first (inclusive) and last (exclusive) elements fits an array
+ * of given length.
+ *
+ *
+ * This method may be used whenever an array range check is needed.
+ *
+ *
+ * In Java 9 and up, this method should be considered deprecated in favor of the
+ * {@link java.util.Objects#checkFromToIndex(int, int, int)} method, which may be intrinsified in
+ * recent JVMs.
+ *
+ * @param arrayLength an array length (must be nonnegative).
+ * @param from a start index (inclusive).
+ * @param to an end index (exclusive).
+ * @throws IllegalArgumentException if {@code from} is greater than {@code to}.
+ * @throws ArrayIndexOutOfBoundsException if {@code from} or {@code to} are greater than
+ * {@code arrayLength} or negative.
+ *
+ * @implNote An {@code assert} checks whether {@code arrayLength} is nonnegative.
+ */
+ public static void ensureFromTo(final int arrayLength, final int from, final int to) {
+ assert arrayLength >= 0;
+ // When Java 9 becomes the minimum, use Objects#checkFromToIndex, as that can be an intrinsic
+ if (from < 0) throw new ArrayIndexOutOfBoundsException("Start index (" + from + ") is negative");
+ if (from > to) throw new IllegalArgumentException("Start index (" + from + ") is greater than end index (" + to + ")");
+ if (to > arrayLength) throw new ArrayIndexOutOfBoundsException("End index (" + to + ") is greater than array length (" + arrayLength + ")");
+ }
+
+ /**
+ * Ensures that a range given by an offset and a length fits an array of given length.
+ *
+ *
+ * This method may be used whenever an array range check is needed.
+ *
+ *
+ * In Java 9 and up, this method should be considered deprecated in favor of the
+ * {@link java.util.Objects#checkFromIndexSize(int, int, int)} method, which may be intrinsified in
+ * recent JVMs.
+ *
+ * @param arrayLength an array length (must be nonnegative).
+ * @param offset a start index for the fragment
+ * @param length a length (the number of elements in the fragment).
+ * @throws IllegalArgumentException if {@code length} is negative.
+ * @throws ArrayIndexOutOfBoundsException if {@code offset} is negative or
+ * {@code offset}+{@code length} is greater than {@code arrayLength}.
+ *
+ * @implNote An {@code assert} checks whether {@code arrayLength} is nonnegative.
+ */
+ public static void ensureOffsetLength(final int arrayLength, final int offset, final int length) {
+ assert arrayLength >= 0;
+ // When Java 9 becomes the minimum, use Objects#checkFromIndexSize, as that can be an intrinsic
+ if (offset < 0) throw new ArrayIndexOutOfBoundsException("Offset (" + offset + ") is negative");
+ if (length < 0) throw new IllegalArgumentException("Length (" + length + ") is negative");
+ if (length > arrayLength - offset) throw new ArrayIndexOutOfBoundsException("Last index (" + ((long)offset + length) + ") is greater than array length (" + arrayLength + ")");
+ }
+
+ /**
+ * Transforms two consecutive sorted ranges into a single sorted range. The initial ranges are
+ * {@code [first..middle)} and {@code [middle..last)}, and the resulting range is
+ * {@code [first..last)}. Elements in the first input range will precede equal elements in
+ * the second.
+ */
+ private static void inPlaceMerge(final int from, int mid, final int to, final FastUtilIntComparator comp, final FastUtilSwapper swapper) {
+ if (from >= mid || mid >= to) return;
+ if (to - from == 2) {
+ if (comp.compare(mid, from) < 0) swapper.swap(from, mid);
+ return;
+ }
+
+ int firstCut;
+ int secondCut;
+
+ if (mid - from > to - mid) {
+ firstCut = from + (mid - from) / 2;
+ secondCut = lowerBound(mid, to, firstCut, comp);
+ }
+ else {
+ secondCut = mid + (to - mid) / 2;
+ firstCut = upperBound(from, mid, secondCut, comp);
+ }
+
+ final int first2 = firstCut;
+ final int middle2 = mid;
+ final int last2 = secondCut;
+ if (middle2 != first2 && middle2 != last2) {
+ int first1 = first2;
+ int last1 = middle2;
+ while (first1 < --last1)
+ swapper.swap(first1++, last1);
+ first1 = middle2;
+ last1 = last2;
+ while (first1 < --last1)
+ swapper.swap(first1++, last1);
+ first1 = first2;
+ last1 = last2;
+ while (first1 < --last1)
+ swapper.swap(first1++, last1);
+ }
+
+ mid = firstCut + (secondCut - mid);
+ inPlaceMerge(from, firstCut, mid, comp, swapper);
+ inPlaceMerge(mid, secondCut, to, comp, swapper);
+ }
+
+ /**
+ * Performs a binary search on an already-sorted range: finds the first position where an
+ * element can be inserted without violating the ordering. Sorting is by a user-supplied
+ * comparison function.
+ *
+ * @param from the index of the first element (inclusive) to be included in the binary search.
+ * @param to the index of the last element (exclusive) to be included in the binary search.
+ * @param pos the position of the element to be searched for.
+ * @param comp the comparison function.
+ * @return the largest index i such that, for every j in the range {@code [first..i)},
+ * {@code comp.compare(j, pos)} is {@code true}.
+ */
+ private static int lowerBound(int from, final int to, final int pos, final FastUtilIntComparator comp) {
+ // if (comp==null) throw new NullPointerException();
+ int len = to - from;
+ while (len > 0) {
+ final int half = len / 2;
+ final int middle = from + half;
+ if (comp.compare(middle, pos) < 0) {
+ from = middle + 1;
+ len -= half + 1;
+ }
+ else {
+ len = half;
+ }
+ }
+ return from;
+ }
+
+
+ /**
+ * Performs a binary search on an already sorted range: finds the last position where an element
+ * can be inserted without violating the ordering. Sorting is by a user-supplied comparison
+ * function.
+ *
+ * @param from the index of the first element (inclusive) to be included in the binary search.
+ * @param to the index of the last element (exclusive) to be included in the binary search.
+ * @param pos the position of the element to be searched for.
+ * @param comp the comparison function.
+ * @return The largest index i such that, for every j in the range {@code [first..i)},
+ * {@code comp.compare(pos, j)} is {@code false}.
+ */
+ private static int upperBound(int from, final int mid, final int pos, final FastUtilIntComparator comp) {
+ // if (comp==null) throw new NullPointerException();
+ int len = mid - from;
+ while (len > 0) {
+ final int half = len / 2;
+ final int middle = from + half;
+ if (comp.compare(pos, middle) < 0) {
+ len = half;
+ }
+ else {
+ from = middle + 1;
+ len -= half + 1;
+ }
+ }
+ return from;
+ }
+
+ /**
+ * Returns the index of the median of the three indexed chars.
+ */
+ private static int med3(final int a, final int b, final int c, final FastUtilIntComparator comp) {
+ final int ab = comp.compare(a, b);
+ final int ac = comp.compare(a, c);
+ final int bc = comp.compare(b, c);
+ return (ab < 0 ?
+ (bc < 0 ? b : ac < 0 ? c : a) :
+ (bc > 0 ? b : ac > 0 ? c : a));
+ }
+
+ private static final int MERGESORT_NO_REC = 16;
+
+ private static ForkJoinPool getPool() {
+ // Make sure to update Arrays.drv, BigArrays.drv, and src/it/unimi/dsi/fastutil/Arrays.java as well
+ final ForkJoinPool current = ForkJoinTask.getPool();
+ return current == null ? ForkJoinPool.commonPool() : current;
+ }
+
+ /** Sorts the specified range of elements using the specified swapper and according to the order induced by the specified
+ * comparator using mergesort.
+ *
+ *
This sort is guaranteed to be stable: equal elements will not be reordered as a result
+ * of the sort. The sorting algorithm is an in-place mergesort that is significantly slower than a
+ * standard mergesort, as its running time is O(n (log n)2), but it does not allocate additional memory; as a result, it can be
+ * used as a generic sorting algorithm.
+ *
+ * @param from the index of the first element (inclusive) to be sorted.
+ * @param to the index of the last element (exclusive) to be sorted.
+ * @param c the comparator to determine the order of the generic data (arguments are positions).
+ * @param swapper an object that knows how to swap the elements at any two positions.
+ */
+ public static void mergeSort(final int from, final int to, final FastUtilIntComparator c, final FastUtilSwapper swapper) {
+ /*
+ * We retain the same method signature as quickSort. Given only a comparator and swapper we
+ * do not know how to copy and move elements from/to temporary arrays. Hence, in contrast to
+ * the JDK mergesorts this is an "in-place" mergesort, i.e. does not allocate any temporary
+ * arrays. A non-inplace mergesort would perhaps be faster in most cases, but would require
+ * non-intuitive delegate objects...
+ */
+ final int length = to - from;
+
+ // Insertion sort on smallest arrays
+ if (length < MERGESORT_NO_REC) {
+ for (int i = from; i < to; i++) {
+ for (int j = i; j > from && (c.compare(j - 1, j) > 0); j--) {
+ swapper.swap(j, j - 1);
+ }
+ }
+ return;
+ }
+
+ // Recursively sort halves
+ final int mid = (from + to) >>> 1;
+ mergeSort(from, mid, c, swapper);
+ mergeSort(mid, to, c, swapper);
+
+ // If list is already sorted, nothing left to do. This is an
+ // optimization that results in faster sorts for nearly ordered lists.
+ if (c.compare(mid - 1, mid) <= 0) return;
+
+ // Merge sorted halves
+ inPlaceMerge(from, mid, to, c, swapper);
+ }
+
+ /** Swaps two sequences of elements using a provided swapper.
+ *
+ * @param swapper the swapper.
+ * @param a a position in {@code x}.
+ * @param b another position in {@code x}.
+ * @param n the number of elements to exchange starting at {@code a} and {@code b}.
+ */
+ protected static void swap(final FastUtilSwapper swapper, int a, int b, final int n) {
+ for (int i = 0; i < n; i++, a++, b++) swapper.swap(a, b);
+ }
+
+ private static final int QUICKSORT_NO_REC = 16;
+ private static final int PARALLEL_QUICKSORT_NO_FORK = 8192;
+ private static final int QUICKSORT_MEDIAN_OF_9 = 128;
+
+ protected static class ForkJoinGenericQuickSort extends RecursiveAction {
+ private static final long serialVersionUID = 1L;
+ private final int from;
+ private final int to;
+ private final FastUtilIntComparator comp;
+ private final FastUtilSwapper swapper;
+
+ public ForkJoinGenericQuickSort(final int from, final int to, final FastUtilIntComparator comp, final FastUtilSwapper swapper) {
+ this.from = from;
+ this.to = to;
+ this.comp = comp;
+ this.swapper = swapper;
+ }
+
+ @Override
+ protected void compute() {
+ final int len = to - from;
+ if (len < PARALLEL_QUICKSORT_NO_FORK) {
+ quickSort(from, to, comp, swapper);
+ return;
+ }
+ // Choose a partition element, v
+ int m = from + len / 2;
+ int l = from;
+ int n = to - 1;
+ int s = len / 8;
+ l = med3(l, l + s, l + 2 * s, comp);
+ m = med3(m - s, m, m + s, comp);
+ n = med3(n - 2 * s, n - s, n, comp);
+ m = med3(l, m, n, comp);
+ // Establish Invariant: v* (v)* v*
+ int a = from, b = a, c = to - 1, d = c;
+ while (true) {
+ int comparison;
+ while (b <= c && ((comparison = comp.compare(b, m)) <= 0)) {
+ if (comparison == 0) {
+ // Fix reference to pivot if necessary
+ if (a == m) m = b;
+ else if (b == m) m = a;
+ swapper.swap(a++, b);
+ }
+ b++;
+ }
+ while (c >= b && ((comparison = comp.compare(c, m)) >= 0)) {
+ if (comparison == 0) {
+ // Fix reference to pivot if necessary
+ if (c == m) m = d;
+ else if (d == m) m = c;
+ swapper.swap(c, d--);
+ }
+ c--;
+ }
+ if (b > c) break;
+ // Fix reference to pivot if necessary
+ if (b == m) m = d;
+ else if (c == m) m = c;
+ swapper.swap(b++, c--);
+ }
+
+ // Swap partition elements back to middle
+ s = Math.min(a - from, b - a);
+ swap(swapper, from, b - s, s);
+ s = Math.min(d - c, to - d - 1);
+ swap(swapper, b, to - s, s);
+
+ // Recursively sort non-partition-elements
+ int t;
+ s = b - a;
+ t = d - c;
+ if (s > 1 && t > 1) invokeAll(new ForkJoinGenericQuickSort(from, from + s, comp, swapper), new ForkJoinGenericQuickSort(to - t, to, comp, swapper));
+ else if (s > 1) invokeAll(new ForkJoinGenericQuickSort(from, from + s, comp, swapper));
+ else invokeAll(new ForkJoinGenericQuickSort(to - t, to, comp, swapper));
+ }
+ }
+
+ /** Sorts the specified range of elements using the specified swapper and according to the order induced by the specified
+ * comparator using a parallel quicksort.
+ *
+ * The sorting algorithm is a tuned quicksort adapted from Jon L. Bentley and M. Douglas
+ * McIlroy, “Engineering a Sort Function”, Software: Practice and Experience, 23(11), pages
+ * 1249−1265, 1993.
+ *
+ * @param from the index of the first element (inclusive) to be sorted.
+ * @param to the index of the last element (exclusive) to be sorted.
+ * @param comp the comparator to determine the order of the generic data.
+ * @param swapper an object that knows how to swap the elements at any two positions.
+ *
+ */
+ public static void parallelQuickSort(final int from, final int to, final FastUtilIntComparator comp, final FastUtilSwapper swapper) {
+ final ForkJoinPool pool = getPool();
+ if (to - from < PARALLEL_QUICKSORT_NO_FORK || pool.getParallelism() == 1) quickSort(from, to, comp, swapper);
+ else {
+ pool.invoke(new ForkJoinGenericQuickSort(from, to, comp, swapper));
+ }
+ }
+
+
+ /** Sorts the specified range of elements using the specified swapper and according to the order induced by the specified
+ * comparator using parallel quicksort.
+ *
+ *
The sorting algorithm is a tuned quicksort adapted from Jon L. Bentley and M. Douglas
+ * McIlroy, “Engineering a Sort Function”, Software: Practice and Experience, 23(11), pages
+ * 1249−1265, 1993.
+ *
+ * @param from the index of the first element (inclusive) to be sorted.
+ * @param to the index of the last element (exclusive) to be sorted.
+ * @param comp the comparator to determine the order of the generic data.
+ * @param swapper an object that knows how to swap the elements at any two positions.
+ *
+ */
+ public static void quickSort(final int from, final int to, final FastUtilIntComparator comp, final FastUtilSwapper swapper) {
+ final int len = to - from;
+ // Insertion sort on smallest arrays
+ if (len < QUICKSORT_NO_REC) {
+ for (int i = from; i < to; i++)
+ for (int j = i; j > from && (comp.compare(j - 1, j) > 0); j--) {
+ swapper.swap(j, j - 1);
+ }
+ return;
+ }
+
+ // Choose a partition element, v
+ int m = from + len / 2; // Small arrays, middle element
+ int l = from;
+ int n = to - 1;
+ if (len > QUICKSORT_MEDIAN_OF_9) { // Big arrays, pseudomedian of 9
+ final int s = len / 8;
+ l = med3(l, l + s, l + 2 * s, comp);
+ m = med3(m - s, m, m + s, comp);
+ n = med3(n - 2 * s, n - s, n, comp);
+ }
+ m = med3(l, m, n, comp); // Mid-size, med of 3
+ // int v = x[m];
+
+ int a = from;
+ int b = a;
+ int c = to - 1;
+ // Establish Invariant: v* (v)* v*
+ int d = c;
+ while (true) {
+ int comparison;
+ while (b <= c && ((comparison = comp.compare(b, m)) <= 0)) {
+ if (comparison == 0) {
+ // Fix reference to pivot if necessary
+ if (a == m) m = b;
+ else if (b == m) m = a;
+ swapper.swap(a++, b);
+ }
+ b++;
+ }
+ while (c >= b && ((comparison = comp.compare(c, m)) >= 0)) {
+ if (comparison == 0) {
+ // Fix reference to pivot if necessary
+ if (c == m) m = d;
+ else if (d == m) m = c;
+ swapper.swap(c, d--);
+ }
+ c--;
+ }
+ if (b > c) break;
+ // Fix reference to pivot if necessary
+ if (b == m) m = d;
+ else if (c == m) m = c;
+ swapper.swap(b++, c--);
+ }
+
+ // Swap partition elements back to middle
+ int s;
+ s = Math.min(a - from, b - a);
+ swap(swapper, from, b - s, s);
+ s = Math.min(d - c, to - d - 1);
+ swap(swapper, b, to - s, s);
+
+ // Recursively sort non-partition-elements
+ if ((s = b - a) > 1) quickSort(from, from + s, comp, swapper);
+ if ((s = d - c) > 1) quickSort(to - s, to, comp, swapper);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilIntComparator.java b/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilIntComparator.java
new file mode 100644
index 000000000..26be4e2f2
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilIntComparator.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2002-2024 Sebastiano Vigna
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.rabbitmq.client.amqp.impl;
+
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * A type-specific {@link Comparator}; provides methods to compare two primitive types both as
+ * objects and as primitive types.
+ *
+ *
+ * Note that {@code fastutil} provides a corresponding abstract class that can be used to implement
+ * this interface just by specifying the type-specific comparator.
+ *
+ * @see Comparator
+ */
+@FunctionalInterface
+interface FastUtilIntComparator extends Serializable {
+ /**
+ * Compares its two primitive-type arguments for order. Returns a negative integer, zero, or a
+ * positive integer as the first argument is less than, equal to, or greater than the second.
+ *
+ * @see java.util.Comparator
+ * @return a negative integer, zero, or a positive integer as the first argument is less than, equal
+ * to, or greater than the second.
+ */
+ int compare(int k1, int k2);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilSwapper.java b/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilSwapper.java
new file mode 100644
index 000000000..22feff204
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/FastUtilSwapper.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2010-2024 Sebastiano Vigna
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.rabbitmq.client.amqp.impl;
+
+import java.io.Serializable;
+
+/** An object that can swap elements whose position is specified by integers.
+ *
+ * @see Arrays#quickSort(int, int, it.unimi.dsi.fastutil.ints.IntComparator, Swapper)
+ */
+@FunctionalInterface
+interface FastUtilSwapper extends Serializable {
+
+ /** Swaps the data at the given positions.
+ *
+ * @param a the first position to swap.
+ * @param b the second position to swap.
+ */
+ void swap(int a, int b);
+}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/SerialNumberUtils.java b/src/main/java/com/rabbitmq/client/amqp/impl/SerialNumberUtils.java
new file mode 100644
index 000000000..17cc6a70e
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/SerialNumberUtils.java
@@ -0,0 +1,128 @@
+// Copyright (c) 2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.client.amqp.impl;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.*;
+import java.util.function.ToLongFunction;
+
+final class SerialNumberUtils {
+
+ // https://www.ietf.org/rfc/rfc1982.txt
+ // SERIAL_BITS = 32
+ // 2 ^ SERIAL_BITS
+ static final long SERIAL_SPACE = 0x100000000L;
+ // 2 ^ (SERIAL_BITS - 1)
+ private static final long COMPARE = 2_147_483_648L;
+
+ private SerialNumberUtils() {}
+
+ static long inc(long s) {
+ return (s + 1) % SERIAL_SPACE;
+ }
+
+ static List sort(List list, ToLongFunction serialNumberExtractor) {
+ FastUtilArrays.quickSort(
+ 0,
+ list.size(),
+ new SerialNumberComparator<>(list, serialNumberExtractor),
+ new ListSwapper<>(list));
+ return list;
+ }
+
+ /**
+ * Compute contiguous ranges of serial numbers.
+ *
+ * The list is sorted but the method assumes it contains no duplicates.
+ *
+ * @param list
+ * @param serialNumberExtractor
+ * @return
+ * @param
+ */
+ static long[][] ranges(List list, ToLongFunction serialNumberExtractor) {
+ if (list.isEmpty()) {
+ return new long[0][0];
+ }
+ sort(list, serialNumberExtractor);
+ long s1 = serialNumberExtractor.applyAsLong(list.get(0));
+ long[] range = new long[] {s1, s1};
+ List ranges = new ArrayList<>();
+ ranges.add(range);
+ for (int i = 1; i < list.size(); i++) {
+ long v = serialNumberExtractor.applyAsLong(list.get(i));
+ if (v == inc(range[1])) {
+ range[1] = v;
+ } else {
+ range = new long[] {v, v};
+ ranges.add(range);
+ }
+ }
+ return ranges.toArray(new long[][] {});
+ }
+
+ static int compare(long s1, long s2) {
+ if (s1 == s2) {
+ return 0;
+ } else if (((s1 < s2) && (s2 - s1) < COMPARE) || ((s1 > s2) && (s1 - s2) > COMPARE)) {
+ return -1;
+ } else if (((s1 < s2) && (s2 - s1) > COMPARE) || ((s1 > s2) && (s1 - s2) < COMPARE)) {
+ return 1;
+ }
+ throw new IllegalArgumentException("Cannot compare serial numbers " + s1 + " and " + s2);
+ }
+
+ private static class SerialNumberComparator implements FastUtilIntComparator {
+
+ private static final long serialVersionUID = -1979133464402603205L;
+
+ private final List list;
+
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private final ToLongFunction serialNumberExtractor;
+
+ private SerialNumberComparator(List list, ToLongFunction serialNumberExtractor) {
+ this.list = list;
+ this.serialNumberExtractor = serialNumberExtractor;
+ }
+
+ @Override
+ public int compare(int k1, int k2) {
+ return SerialNumberUtils.compare(
+ serialNumberExtractor.applyAsLong(list.get(k1)),
+ serialNumberExtractor.applyAsLong(list.get(k2)));
+ }
+ }
+
+ private static final class ListSwapper implements FastUtilSwapper {
+
+ private static final long serialVersionUID = -4992779583870196665L;
+ private final List list;
+
+ private ListSwapper(List list) {
+ this.list = list;
+ }
+
+ @Override
+ public void swap(int a, int b) {
+ T t = list.get(a);
+ list.set(a, list.get(b));
+ list.set(b, t);
+ }
+ }
+}
diff --git a/src/main/qpid/org/apache/qpid/protonj2/client/Delivery.java b/src/main/qpid/org/apache/qpid/protonj2/client/Delivery.java
index ee2c78709..25aef422c 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/client/Delivery.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/client/Delivery.java
@@ -200,4 +200,6 @@ public interface Delivery {
*/
int messageFormat() throws ClientException;
+ long getDeliveryId();
+
}
\ No newline at end of file
diff --git a/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientConversionSupport.java b/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientConversionSupport.java
index 7e284f77a..59257db54 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientConversionSupport.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientConversionSupport.java
@@ -37,7 +37,7 @@
/**
* Utilities used by various classes in the Client core
*/
-abstract class ClientConversionSupport {
+public abstract class ClientConversionSupport {
public static Symbol[] toSymbolArray(String[] stringArray) {
Symbol[] result = null;
diff --git a/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientDelivery.java b/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
index dcdabf458..f59cae13a 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
@@ -109,6 +109,10 @@ public Map annotations() throws ClientException {
}
}
+ public long getDeliveryId() {
+ return this.delivery.getDeliveryId();
+ }
+
//----- Internal API not meant to be used from outside the client package.
void deliveryAnnotations(DeliveryAnnotations deliveryAnnotations) {
diff --git a/src/main/qpid/org/apache/qpid/protonj2/engine/IncomingDelivery.java b/src/main/qpid/org/apache/qpid/protonj2/engine/IncomingDelivery.java
index 3ac0c10ac..7e17a1752 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/engine/IncomingDelivery.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/engine/IncomingDelivery.java
@@ -155,6 +155,8 @@ public interface IncomingDelivery {
*/
DeliveryTag getTag();
+ long getDeliveryId();
+
/**
* @return the {@link DeliveryState} at the local side of this Delivery.
*/
diff --git a/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java b/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
index 4152c0420..cdf70c534 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
@@ -297,7 +297,8 @@ boolean isFirstTransfer() {
return transferCount <= 1;
}
- long getDeliveryId() {
+ @Override
+ public long getDeliveryId() {
return deliveryId;
}
diff --git a/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java b/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
index d6db559d1..dee258281 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
@@ -46,6 +47,12 @@
*/
public class ProtonReceiver extends ProtonLink implements Receiver {
+ private static final Consumer RELEASE_INCOMING_DELIVERY_TAG_CALLBACK = d -> {
+ if (d.getTag() != null) {
+ d.getTag().release();
+ }
+ };
+
private EventHandler deliveryReadEventHandler = null;
private EventHandler deliveryAbortedEventHandler = null;
private EventHandler deliveryUpdatedEventHandler = null;
@@ -239,6 +246,14 @@ void disposition(ProtonIncomingDelivery delivery) {
}
}
+ public void disposition(DeliveryState state, long[] range) {
+ try {
+ sessionWindow.processDisposition(state, range);
+ } finally {
+ unsettled.removeEach((int) range[0], (int) range[1], RELEASE_INCOMING_DELIVERY_TAG_CALLBACK);
+ }
+ }
+
void deliveryRead(ProtonIncomingDelivery delivery, int bytesRead) {
if (areDeliveriesStillActive()) {
sessionWindow.deliveryRead(delivery, bytesRead);
diff --git a/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java b/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index 9187b8a6c..fb0b87b31 100644
--- a/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++ b/src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -21,11 +21,7 @@
import org.apache.qpid.protonj2.engine.util.SequenceNumber;
import org.apache.qpid.protonj2.engine.util.UnsettledMap;
import org.apache.qpid.protonj2.types.UnsignedInteger;
-import org.apache.qpid.protonj2.types.transport.Begin;
-import org.apache.qpid.protonj2.types.transport.Disposition;
-import org.apache.qpid.protonj2.types.transport.Flow;
-import org.apache.qpid.protonj2.types.transport.Role;
-import org.apache.qpid.protonj2.types.transport.Transfer;
+import org.apache.qpid.protonj2.types.transport.*;
/**
* Tracks the incoming window and provides management of that window in relation to receiver links.
@@ -235,6 +231,18 @@ void processDisposition(ProtonReceiver receiver, ProtonIncomingDelivery delivery
}
}
+ void processDisposition(DeliveryState state, long [] range) {
+ unsettled.removeEach((int) range[0], (int) range[1], d -> { });
+ cachedDisposition.reset();
+ cachedDisposition.setFirst(range[0]);
+ cachedDisposition.setLast(range[1]);
+ cachedDisposition.setRole(Role.RECEIVER);
+ cachedDisposition.setSettled(true);
+ cachedDisposition.setState(state);
+
+ engine.fireWrite(cachedDisposition, session.getLocalChannel());
+ }
+
void deliveryRead(ProtonIncomingDelivery delivery, int bytesRead) {
this.incomingBytes -= bytesRead;
if (incomingWindow == 0) {
diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/Api.java b/src/test/java/com/rabbitmq/client/amqp/docs/Api.java
index b167b0461..6cfce9e6d 100644
--- a/src/test/java/com/rabbitmq/client/amqp/docs/Api.java
+++ b/src/test/java/com/rabbitmq/client/amqp/docs/Api.java
@@ -86,6 +86,41 @@ void metricsCollectorMicrometerPrometheus() {
// end::metrics-micrometer-prometheus[]
}
+ void settlingMessagesInBatch() {
+ Connection connection = null;
+
+ // tag::settling-message-in-batch[]
+ int batchSize = 10;
+ Consumer.MessageHandler handler = new Consumer.MessageHandler() {
+ volatile Consumer.BatchContext batch = null; // <1>
+ @Override
+ public void handle(Consumer.Context context, Message message) {
+ if (batch == null) {
+ batch = context.batch(batchSize); // <2>
+ }
+ boolean success = process(message);
+ if (success) {
+ batch.add(context); // <3>
+ if (batch.size() == batchSize) {
+ batch.accept(); // <4>
+ batch = null; // <5>
+ }
+ } else {
+ context.discard(); // <6>
+ }
+ }
+ };
+ Consumer consumer = connection.consumerBuilder()
+ .queue("some-queue")
+ .messageHandler(handler)
+ .build();
+ // end::settling-message-in-batch[]
+ }
+
+ boolean process(Message message) {
+ return true;
+ }
+
void micrometerObservation() {
ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
// tag::micrometer-observation[]
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
index ad6599b33..74d86ecb9 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
@@ -511,7 +511,7 @@ void consumerPauseThenClose() {
@Test
void consumerGracefulShutdownExample() {
connection.management().queue(name).exclusive(true).declare();
- int messageCount = 100;
+ int messageCount = 1000;
int initialCredits = messageCount / 10;
Publisher publisher = connection.publisherBuilder().queue(name).build();
Sync publishSync = sync(messageCount);
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/BatchDispositionTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/BatchDispositionTest.java
new file mode 100644
index 000000000..2b9e1043b
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/BatchDispositionTest.java
@@ -0,0 +1,234 @@
+// Copyright (c) 2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.client.amqp.impl;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.rabbitmq.client.amqp.Consumer;
+import com.rabbitmq.client.amqp.Message;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+public class BatchDispositionTest {
+
+ static final int failureLimit = 1;
+ static int batchSize = 100;
+ static int messageCount = 100_000;
+ static final Random random = new Random();
+ static final MetricRegistry metrics = new MetricRegistry();
+ static final Counter successfulCount = metrics.counter("successful");
+ static final Counter failedCount = metrics.counter("failed");
+ static final Counter dispositionFrameCount = metrics.counter("disposition");
+ static final Counter dispositionFirstOnlyFrameCount = metrics.counter("disposition.first.only");
+ static final Counter dispositionFirstLastFrameCount = metrics.counter("disposition.first.last");
+ static final Histogram dispositionRangeSize = metrics.histogram("disposition.range.size");
+ static final int DISPOSITION_FIRST_ONLY_SIZE = 94;
+ static final int DISPOSITION_FIRST_LAST_SIZE = 98;
+
+ @Test
+ @Disabled
+ void test() {
+ AtomicReference batchReference = new AtomicReference<>();
+ Consumer.MessageHandler handler =
+ new Consumer.MessageHandler() {
+ Consumer.BatchContext batch;
+
+ @Override
+ public void handle(Consumer.Context context, Message message) {
+ if (batch == null) {
+ batch = context.batch(batchSize);
+ }
+
+ boolean success = processMessage(message);
+ if (success) {
+ successfulCount.inc();
+ batch.add(context);
+ } else {
+ failedCount.inc();
+ context.discard();
+ }
+
+ if (batch.size() == batchSize) {
+ batch.accept();
+ batch = null;
+ }
+ batchReference.set(batch);
+ }
+ };
+
+ IntStream.range(0, messageCount)
+ .forEach(
+ i -> {
+ handler.handle(new TestContext(i), null);
+ });
+
+ if (batchReference.get() == null) {
+ batchReference.get().accept();
+ }
+
+ final ConsoleReporter reporter =
+ ConsoleReporter.forRegistry(metrics)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+
+ reporter.start(1, TimeUnit.SECONDS);
+ reporter.report();
+
+ long firstOnlyBytes = dispositionFirstOnlyFrameCount.getCount() * DISPOSITION_FIRST_ONLY_SIZE;
+ long firstLastBytes = dispositionFirstLastFrameCount.getCount() * DISPOSITION_FIRST_LAST_SIZE;
+ System.out.printf(
+ "Number of first-only disposition frames: %d (bytes: %d)%n",
+ dispositionFirstOnlyFrameCount.getCount(), firstOnlyBytes);
+ System.out.printf(
+ "Number of first-last disposition frames: %d (bytes: %d)%n",
+ dispositionFirstLastFrameCount.getCount(), firstLastBytes);
+
+ long monoAckBytes = (long) messageCount * DISPOSITION_FIRST_ONLY_SIZE;
+ System.out.printf("Traffic with message-by-message disposition: %d%n", monoAckBytes);
+ System.out.printf(
+ "Gain: %s%n",
+ (double) (monoAckBytes - (firstOnlyBytes + firstLastBytes)) / (double) monoAckBytes);
+ }
+
+ static class TestContext implements Consumer.Context {
+
+ private final long deliveryId;
+
+ TestContext(long deliveryId) {
+ this.deliveryId = deliveryId;
+ }
+
+ @Override
+ public void accept() {
+ dispose();
+ }
+
+ @Override
+ public void discard() {
+ dispose();
+ }
+
+ @Override
+ public void discard(Map annotations) {
+ dispose();
+ }
+
+ @Override
+ public void requeue() {
+ dispose();
+ }
+
+ @Override
+ public void requeue(Map annotations) {
+ dispose();
+ }
+
+ private void dispose() {
+ dispositionFrameCount.inc();
+ dispositionRangeSize.update(1);
+ dispositionFirstOnlyFrameCount.inc();
+ }
+
+ @Override
+ public Consumer.BatchContext batch(int batchSizeHint) {
+ return new TestBatchContext(batchSizeHint);
+ }
+ }
+
+ static class TestBatchContext implements Consumer.BatchContext {
+
+ private final List contexts;
+ private final AtomicBoolean disposed = new AtomicBoolean(false);
+
+ public TestBatchContext(int batchSizeHint) {
+ this.contexts = new ArrayList<>(batchSizeHint);
+ }
+
+ @Override
+ public void add(Consumer.Context context) {
+ this.contexts.add((TestContext) context);
+ }
+
+ @Override
+ public int size() {
+ return this.contexts.size();
+ }
+
+ @Override
+ public void accept() {
+ dispose();
+ }
+
+ @Override
+ public void discard() {
+ dispose();
+ }
+
+ @Override
+ public void discard(Map annotations) {
+ dispose();
+ }
+
+ @Override
+ public void requeue() {
+ dispose();
+ }
+
+ @Override
+ public void requeue(Map annotations) {
+ dispose();
+ }
+
+ private void dispose() {
+ if (this.disposed.compareAndSet(false, true)) {
+ List deliveryIds =
+ this.contexts.stream().map(c -> c.deliveryId).collect(Collectors.toList());
+ long[][] ranges = SerialNumberUtils.ranges(deliveryIds, Long::longValue);
+ for (long[] range : ranges) {
+ dispositionFrameCount.inc();
+ long rangeSize = range[1] - range[0] + 1;
+ dispositionRangeSize.update(rangeSize);
+ if (rangeSize > 1) {
+ dispositionFirstLastFrameCount.inc();
+ } else {
+ dispositionFirstOnlyFrameCount.inc();
+ }
+ }
+ }
+ }
+
+ @Override
+ public Consumer.BatchContext batch(int batchSizeHint) {
+ return this;
+ }
+ }
+
+ static boolean processMessage(Message message) {
+ int v = random.nextInt(100);
+ return v >= failureLimit;
+ }
+}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ConsumerOutcomeTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ConsumerOutcomeTest.java
index 20f284a7d..7fbcdefdc 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/ConsumerOutcomeTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/ConsumerOutcomeTest.java
@@ -20,24 +20,22 @@
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
+import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
import static org.assertj.core.api.Assertions.assertThat;
-import com.rabbitmq.client.amqp.Connection;
-import com.rabbitmq.client.amqp.Management;
-import com.rabbitmq.client.amqp.Message;
-import com.rabbitmq.client.amqp.Publisher;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import com.rabbitmq.client.amqp.*;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
@AmqpTestInfrastructure
public class ConsumerOutcomeTest {
@@ -112,8 +110,9 @@ void requeuedMessageShouldBeRequeued() {
waitAtMost(() -> management.queueInfo(q).messageCount() == 0);
}
- @Test
- void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery(boolean batch) {
this.management.queue().name(q).type(QUORUM).declare();
Publisher publisher = this.connection.publisherBuilder().queue(q).build();
@@ -124,13 +123,18 @@ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() {
.consumerBuilder()
.queue(q)
.messageHandler(
- (context, message) -> {
+ (ctx, message) -> {
deliveryCount.incrementAndGet();
messages.offer(message);
if (deliveryCount.get() == 1) {
- context.requeue(ANNOTATIONS);
+ if (batch) {
+ Consumer.BatchContext bc = ctx.batch(1);
+ bc.add(ctx);
+ ctx = bc;
+ }
+ ctx.requeue(ANNOTATIONS);
} else {
- context.accept();
+ ctx.accept();
redeliveredSync.down();
}
})
@@ -179,15 +183,24 @@ void discardedMessageShouldBeDeadLeadLetteredWhenConfigured() {
waitAtMost(() -> management.queueInfo(dlq).messageCount() == 0);
}
- @Test
- void
- discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured(
+ boolean batch) {
declareDeadLetterTopology();
Publisher publisher = this.connection.publisherBuilder().queue(q).build();
this.connection
.consumerBuilder()
.queue(q)
- .messageHandler((ctx, msg) -> ctx.discard(ANNOTATIONS))
+ .messageHandler(
+ (ctx, msg) -> {
+ if (batch) {
+ Consumer.BatchContext bc = ctx.batch(1);
+ bc.add(ctx);
+ ctx = bc;
+ }
+ ctx.discard(ANNOTATIONS);
+ })
.build();
TestUtils.Sync deadLetteredSync = TestUtils.sync();
@@ -217,6 +230,61 @@ void discardedMessageShouldBeDeadLeadLetteredWhenConfigured() {
waitAtMost(() -> management.queueInfo(dlq).messageCount() == 0);
}
+ @Test
+ void batchAcceptShouldSettleMessages() {
+ declareDeadLetterTopology();
+ Management management = connection.management();
+ int messageCount = 1000;
+ TestUtils.Sync confirmSync = sync(messageCount);
+ Publisher.Callback callback =
+ context -> {
+ if (context.status() == Publisher.Status.ACCEPTED) {
+ confirmSync.down();
+ }
+ };
+ Publisher publisher = connection.publisherBuilder().queue(q).build();
+ IntStream.range(0, messageCount)
+ .forEach(ignored -> publisher.publish(publisher.message(), callback));
+ Assertions.assertThat(confirmSync).completes();
+ publisher.close();
+
+ int batchSize = messageCount / 100;
+ AtomicReference batchContext =
+ new AtomicReference<>();
+ Random random = new Random();
+ TestUtils.Sync receivedSync = sync(messageCount);
+ AtomicInteger discardedCount = new AtomicInteger();
+ connection
+ .consumerBuilder()
+ .queue(q)
+ .messageHandler(
+ (ctx, msg) -> {
+ if (batchContext.get() == null) {
+ batchContext.set(ctx.batch(batchSize));
+ }
+ if (random.nextInt(10) == 0) {
+ ctx.discard();
+ discardedCount.incrementAndGet();
+ } else {
+ batchContext.get().add(ctx);
+ if (batchContext.get().size() == batchSize) {
+ batchContext.get().accept();
+ batchContext.set(null);
+ }
+ }
+ receivedSync.down();
+ })
+ .build();
+
+ Assertions.assertThat(receivedSync).completes();
+ Consumer.BatchContext bctx = batchContext.get();
+ if (bctx != null && bctx.size() != 0 && bctx.size() < batchSize) {
+ bctx.accept();
+ }
+ waitAtMost(() -> management.queueInfo(q).messageCount() == 0);
+ waitAtMost(() -> management.queueInfo(dlq).messageCount() == discardedCount.get());
+ }
+
private void declareDeadLetterTopology() {
this.management.exchange(dlx).type(FANOUT).autoDelete(true).declare();
this.management.queue(dlq).exclusive(true).declare();
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/SerialNumberUtilsTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/SerialNumberUtilsTest.java
new file mode 100644
index 000000000..e8bfa7369
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/SerialNumberUtilsTest.java
@@ -0,0 +1,127 @@
+// Copyright (c) 2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.client.amqp.impl;
+
+import static com.rabbitmq.client.amqp.impl.SerialNumberUtils.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.ToLongFunction;
+import org.junit.jupiter.api.Test;
+
+public class SerialNumberUtilsTest {
+
+ @Test
+ void testInc() {
+ assertThat(inc(SERIAL_SPACE - 1)).isZero();
+ assertThat(inc(inc(SERIAL_SPACE - 1))).isEqualTo(1);
+ }
+
+ @Test
+ void testCompare() {
+ assertThat(compare(0, 0)).isZero();
+ assertThat(compare(SERIAL_SPACE - 1, SERIAL_SPACE - 1)).isZero();
+ assertThat(compare(0, 1)).isNegative();
+ assertThat(compare(1, 0)).isPositive();
+ assertThat(compare(0, 2)).isNegative();
+ long maxAddend = (long) (Math.pow(2, 32 - 1) - 1);
+ assertThat(compare(0, maxAddend)).isNegative();
+ assertThatThrownBy(() -> compare(0, maxAddend + 1))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThat(compare(SERIAL_SPACE - 5, 30_000)).isNegative();
+ assertThat(compare(1, 0)).isPositive();
+ assertThat(compare(maxAddend, 0)).isPositive();
+ assertThatThrownBy(() -> compare(maxAddend + 1, 0))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testSort() {
+ assertThat(sort(sns(), toLongPrimitive())).isEmpty();
+ assertThat(sort(sns(3L), toLongPrimitive())).containsExactly(3L);
+ assertThat(sort(sns(3L), toLongPrimitive())).containsExactly(3L);
+ assertThat(
+ sort(
+ sns(4294967000L, 4294967293L, 4294967294L, 4294967295L, 0, 3, 4),
+ toLongPrimitive()))
+ .containsExactly(4294967000L, 4294967293L, 4294967294L, 4294967295L, 0L, 3L, 4L);
+ }
+
+ @Test
+ void testRanges() {
+ checkRanges(sns(), rgs());
+ checkRanges(sns(0), rgs(0, 0));
+ checkRanges(sns(0, 1), rgs(0, 1));
+ checkRanges(sns(1, 0), rgs(0, 1));
+ checkRanges(sns(0, 2), rgs(0, 0, 2, 2));
+ checkRanges(sns(2, 0), rgs(0, 0, 2, 2));
+
+ // SPACE - 1 = (2 ^ 32) - 1 = 4294967295
+ checkRanges(
+ sns(4294967290L, 4294967295L), rgs(4294967290L, 4294967290L, 4294967295L, 4294967295L));
+ checkRanges(
+ sns(4294967295L, 4294967290L), rgs(4294967290L, 4294967290L, 4294967295L, 4294967295L));
+ checkRanges(sns(0, 1, 3, 4, 5, 10, 18, 19), rgs(0, 1, 3, 5, 10, 10, 18, 19));
+ checkRanges(sns(1, 10, 0, 3, 4, 5, 19, 18), rgs(0, 1, 3, 5, 10, 10, 18, 19));
+
+ checkRanges(sns(4294967294L, 0), rgs(4294967294L, 4294967294L, 0, 0));
+ checkRanges(sns(0, 4294967294L), rgs(4294967294L, 4294967294L, 0, 0));
+
+ checkRanges(sns(4294967295L, 0), rgs(4294967295L, 0));
+
+ checkRanges(
+ sns(4294967294L, 4294967295L, 0, 1, 3, 4, 5, 10, 18, 19),
+ rgs(4294967294L, 1, 3, 5, 10, 10, 18, 19));
+ checkRanges(
+ sns(1, 10, 4294967294L, 0, 3, 4, 5, 19, 18, 4294967295L),
+ rgs(4294967294L, 1, 3, 5, 10, 10, 18, 19));
+ }
+
+ private static void checkRanges(List serialNumbers, long[][] ranges) {
+ assertThat(ranges(serialNumbers, toLongPrimitive())).isDeepEqualTo(ranges);
+ }
+
+ private static List sns(long... sns) {
+ List l = new ArrayList<>();
+ for (long sn : sns) {
+ l.add(sn);
+ }
+ return l;
+ }
+
+ long[][] rgs(long... flatRanges) {
+ if (flatRanges.length == 0) {
+ return new long[][] {};
+ }
+ if (flatRanges.length % 2 != 0) {
+ throw new IllegalArgumentException();
+ }
+ int rangeCount = flatRanges.length / 2;
+ long[][] ranges = new long[rangeCount][];
+ for (int i = 0; i < rangeCount; i++) {
+ ranges[i] = new long[] {flatRanges[i * 2], flatRanges[i * 2 + 1]};
+ }
+ return ranges;
+ }
+
+ static ToLongFunction toLongPrimitive() {
+ return Long::longValue;
+ }
+}
diff --git a/src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java b/src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java
index 80adb2050..3335ace49 100644
--- a/src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java
@@ -35,6 +35,7 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +89,43 @@ public static void main(String[] args) throws IOException {
management.queue().name(q).type(QUORUM).declare();
management.binding().sourceExchange(e).destinationQueue(q).key(rk).bind();
+ java.util.function.Consumer recordMessage =
+ msg -> {
+ try {
+ long time = readLong(msg.body());
+ metrics.latency(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ // not able to read the body, maybe not a message from the
+ // tool
+ }
+ };
+
+ int initialCredits = 1000;
+ Consumer.MessageHandler handler;
+ int disposeEvery = 1;
+
+ if (disposeEvery <= 1) {
+ handler =
+ (context, message) -> {
+ recordMessage.accept(message);
+ context.accept();
+ };
+ } else {
+ AtomicReference batch = new AtomicReference<>();
+ handler =
+ (context, message) -> {
+ recordMessage.accept(message);
+ if (batch.get() == null) {
+ batch.set(context.batch(disposeEvery));
+ }
+ batch.get().add(context);
+ if (batch.get().size() == disposeEvery) {
+ batch.get().accept();
+ batch.set(null);
+ }
+ };
+ }
+
connection
.consumerBuilder()
.listeners(
@@ -97,18 +135,8 @@ public static void main(String[] args) throws IOException {
}
})
.queue(q)
- .initialCredits(1000)
- .messageHandler(
- (context, message) -> {
- context.accept();
- try {
- long time = readLong(message.body());
- metrics.latency(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
- } catch (Exception ex) {
- // not able to read the body, maybe not a message from the
- // tool
- }
- })
+ .initialCredits(initialCredits)
+ .messageHandler(handler)
.build();
executorService.submit(