Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

receiver.consumeManualAck does not seem compatible with take(int) reactor operatorΒ #176

@chibenwa

Description

@chibenwa

As part of the Apache James project, in some of our tests we want to dequeue only some items from a rabbitMQ queue while still keeping the others enqueue.

In order to achieve this, our first approach was to use the take(int) operator.

This proves to be unstable.

Expected Behavior

I expect to be able to use the take operator out of reactor-rabbitmq library primitives.

I expect violations of the TCK to be well documented when not achievable with a list of impcted oerators, if this issue cannot be fixed.

Actual Behavior

take operator is buggy, and this limitation is undocumented.

Steps to Reproduce

        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTakeAndFlatMap() throws Exception {
            ReceiverProvider receiverProvider = rabbitMQExtension.getReceiverProvider();
            int counter = 5;
            CountDownLatch countDownLatch = new CountDownLatch(counter);

            Flux.using(receiverProvider::createReceiver,
                    receiver -> receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions()),
                    Receiver::close)
                .filter(getResponse -> getResponse.getBody() != null)
                .take(counter)
                .flatMap(acknowledgableDelivery -> Mono.fromCallable(() -> {
                    acknowledgableDelivery.ack(true);
                    countDownLatch.countDown();
                    return acknowledgableDelivery;
                }).subscribeOn(Schedulers.elastic()))
                .subscribe();

            assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
        }

=> Sometime fails by ignoring some elements...

Also fails with contatMap.

        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTake() throws Exception {
            ReceiverProvider receiverProvider = rabbitMQExtension.getReceiverProvider();
            int counter = 5;
            CountDownLatch countDownLatch = new CountDownLatch(counter);

            Flux.using(receiverProvider::createReceiver,
                    receiver -> receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions()),
                    Receiver::close)
                .filter(getResponse -> getResponse.getBody() != null)
                .take(counter)
                .concatMap(acknowledgableDelivery -> Mono.fromCallable(() -> {
                    acknowledgableDelivery.ack(true);
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                    return acknowledgableDelivery;
                }).subscribeOn(Schedulers.elastic()))
                .subscribe();

            assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
        }

=> Fails reliably with:

09:03:46.066 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:439)
	at reactor.rabbitmq.AcknowledgableDelivery.basicAck(AcknowledgableDelivery.java:110)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:62)
	... 10 common frames omitted
Wrapped by: reactor.rabbitmq.RabbitFluxException: Not retryable exception, cannot retry
	at reactor.rabbitmq.ExceptionHandlers$SimpleRetryTemplate.retry(ExceptionHandlers.java:125)
	at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:143)
	at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:130)
	at reactor.rabbitmq.AcknowledgableDelivery.retry(AcknowledgableDelivery.java:130)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:64)
	at org.apache.james.backends.rabbitmq.RabbitMQTest$ConcurrencyTest.lambda$consumingShouldSuccessWhenAckConcurrentWithFluxTake$7(RabbitMQTest.java:665)
	at reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)
	at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:227)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

Possible Solution

No idea here. We complexified our testing code to leverage this situation...

Your Environment

Reactor versions: (BOM) 2020.0.19
JVM 11
OS Ubuntu 2020.4

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions