Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 8e5d955

Browse files
committed
Sender publish confirm backpressure
1 parent d1c8f44 commit 8e5d955

File tree

4 files changed

+156
-24
lines changed

4 files changed

+156
-24
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Connection;
20+
import org.openjdk.jmh.annotations.*;
21+
import org.openjdk.jmh.infra.Blackhole;
22+
import reactor.core.publisher.Flux;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
@BenchmarkMode(Mode.Throughput)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@State(Scope.Benchmark)
29+
@Warmup(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
30+
@Measurement(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
31+
@Fork(1)
32+
@Threads(2)
33+
public class SenderMaxInFlightBenchmark {
34+
35+
Connection connection;
36+
Sender sender;
37+
String queue;
38+
Flux<OutboundMessage> msgFlux;
39+
40+
@Param({"1", "10", "100", "1000"})
41+
public int nbMessages;
42+
43+
@Param({"1", "256", Integer.MAX_VALUE + ""})
44+
public int maxInFlight;
45+
46+
@Setup
47+
public void setupConnection() throws Exception {
48+
connection = SenderBenchmarkUtils.newConnection();
49+
}
50+
51+
@TearDown
52+
public void closeConnection() throws Exception {
53+
connection.close();
54+
}
55+
56+
@Setup(Level.Iteration)
57+
public void setupSender() throws Exception {
58+
queue = SenderBenchmarkUtils.declareQueue(connection);
59+
sender = RabbitFlux.createSender();
60+
msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
61+
}
62+
63+
@TearDown(Level.Iteration)
64+
public void tearDownSender() throws Exception {
65+
SenderBenchmarkUtils.deleteQueue(connection, queue);
66+
if (sender != null) {
67+
sender.close();
68+
}
69+
}
70+
71+
@Benchmark
72+
public void sendWithPublishConfirmsMaxInFlight(Blackhole blackhole) {
73+
blackhole.consume(sender.sendWithPublishConfirms(msgFlux, new SendOptions().maxInFlight(maxInFlight)).blockLast());
74+
}
75+
76+
}

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import reactor.core.publisher.SignalType;
2222
import reactor.core.scheduler.Scheduler;
2323
import reactor.core.scheduler.Schedulers;
24-
import reactor.util.concurrent.Queues;
2524

2625
import java.time.Duration;
2726
import java.util.function.BiConsumer;
@@ -41,14 +40,14 @@ public class SendOptions {
4140
*
4241
* @since 1.1.1
4342
*/
44-
private int maxInFlight = Queues.SMALL_BUFFER_SIZE;
43+
private Integer maxInFlight;
4544

4645
/**
4746
* The scheduler used for publishing send results.
4847
*
4948
* @since 1.1.1
5049
*/
51-
private Scheduler scheduler = Schedulers.newSingle("rabbitmq-sender-publishing");
50+
private Scheduler scheduler = Schedulers.immediate();
5251

5352
/**
5453
* Channel mono to use to send messages.
@@ -69,9 +68,9 @@ public class SendOptions {
6968
* from the outbound record publisher while publisher confirms are pending.
7069
*
7170
* @return maximum number of in-flight records
72-
* @since 1.1.18
71+
* @since 1.1.1
7372
*/
74-
public int getMaxInFlight() {
73+
public Integer getMaxInFlight() {
7574
return maxInFlight;
7675
}
7776

@@ -89,25 +88,29 @@ public SendOptions maxInFlight(int maxInFlight) {
8988
}
9089

9190
/**
92-
* The scheduler used for publishing send results.
91+
* Set the maximum number of in-flight records that are fetched
92+
* from the outbound record publisher while publisher confirms are pending.
93+
* Results are run on a supplied {@link Scheduler}
9394
*
94-
* @return scheduler used for publishing send results
95+
* @param maxInFlight
96+
* @param scheduler
97+
* @return this {@link SendOptions} instance
9598
* @since 1.1.1
9699
*/
97-
public Scheduler getScheduler() {
98-
return scheduler;
100+
public SendOptions maxInFlight(int maxInFlight, Scheduler scheduler) {
101+
this.maxInFlight = maxInFlight;
102+
this.scheduler = scheduler;
103+
return this;
99104
}
100105

101106
/**
102-
* Sets the scheduler used for publishing send results.
107+
* The scheduler used for publishing send results.
103108
*
104-
* @param scheduler
105-
* @return this {@link SendOptions} instance
109+
* @return scheduler used for publishing send results
106110
* @since 1.1.1
107111
*/
108-
public SendOptions scheduler(Scheduler scheduler) {
109-
this.scheduler = scheduler;
110-
return this;
112+
public Scheduler getScheduler() {
113+
return scheduler;
111114
}
112115

113116
public BiConsumer<Sender.SendContext, Exception> getExceptionHandler() {

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
143143
final Mono<? extends Channel> currentChannelMono = getChannelMono(options);
144144
final BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(options);
145145

146-
return currentChannelMono.map(channel -> {
146+
Flux<OutboundMessageResult> result = currentChannelMono.map(channel -> {
147147
try {
148148
channel.confirmSelect();
149149
} catch (IOException e) {
@@ -161,8 +161,12 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
161161
// so we need to complete in another thread
162162
channelCloseThreadPool.execute(() -> channelCloseHandler.accept(signalType, channel));
163163
}
164-
}))
165-
.publishOn(sendOptions.getScheduler(), sendOptions.getMaxInFlight());
164+
}));
165+
166+
if (sendOptions.getMaxInFlight() != null) {
167+
result = result.publishOn(sendOptions.getScheduler(), sendOptions.getMaxInFlight());
168+
}
169+
return result;
166170
}
167171

168172
// package-protected for testing
@@ -525,7 +529,7 @@ private static class PublishConfirmSubscriber implements
525529

526530
private final AtomicReference<SubscriberState> state = new AtomicReference<>(SubscriberState.INIT);
527531

528-
private final AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
532+
private final AtomicReference<Throwable> firstException = new AtomicReference<>();
529533

530534
private final ConcurrentNavigableMap<Long, OutboundMessage> unconfirmed = new ConcurrentSkipListMap<>();
531535

@@ -535,7 +539,7 @@ private static class PublishConfirmSubscriber implements
535539

536540
private final BiConsumer<SendContext, Exception> exceptionHandler;
537541

538-
Subscription s;
542+
private Subscription subscription;
539543

540544
private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult> subscriber, SendOptions options) {
541545
this.channel = channel;
@@ -545,12 +549,12 @@ private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMes
545549

546550
@Override
547551
public void request(long n) {
548-
s.request(n);
552+
subscription.request(n);
549553
}
550554

551555
@Override
552556
public void cancel() {
553-
s.cancel();
557+
subscription.cancel();
554558
}
555559

556560
@Override
@@ -594,8 +598,8 @@ private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
594598
}
595599
});
596600
state.set(SubscriberState.ACTIVE);
597-
if (Operators.validate(this.s, subscription)) {
598-
this.s = subscription;
601+
if (Operators.validate(this.subscription, subscription)) {
602+
this.subscription = subscription;
599603
subscriber.onSubscribe(this);
600604
}
601605
}

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,55 @@ public void publishConfirms() throws Exception {
708708
assertEquals(nbMessages, counter.get());
709709
}
710710

711+
@Test
712+
public void publishConfirmsBackpressure() throws Exception {
713+
int nbMessages = 10;
714+
int subscriberRequest = 3;
715+
CountDownLatch consumedLatch = new CountDownLatch(subscriberRequest);
716+
CountDownLatch confirmedLatch = new CountDownLatch(subscriberRequest);
717+
AtomicInteger counter = new AtomicInteger();
718+
Channel channel = connection.createChannel();
719+
channel.basicConsume(queue, true, (consumerTag, delivery) -> {
720+
counter.incrementAndGet();
721+
consumedLatch.countDown();
722+
}, consumerTag -> {
723+
});
724+
725+
Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));
726+
727+
sender = createSender();
728+
sender.sendWithPublishConfirms(msgFlux).subscribe(new BaseSubscriber<OutboundMessageResult>() {
729+
@Override
730+
protected void hookOnSubscribe(Subscription subscription) {
731+
subscription.request(subscriberRequest);
732+
}
733+
734+
@Override
735+
protected void hookOnNext(OutboundMessageResult outboundMessageResult) {
736+
if (outboundMessageResult.getOutboundMessage() != null) {
737+
confirmedLatch.countDown();
738+
}
739+
}
740+
});
741+
742+
assertTrue(consumedLatch.await(1, TimeUnit.SECONDS));
743+
assertTrue(confirmedLatch.await(1, TimeUnit.SECONDS));
744+
assertEquals(subscriberRequest, counter.get());
745+
}
746+
747+
@Test
748+
public void publishConfirmsEmptyPublisher() throws Exception {
749+
CountDownLatch finallyLatch = new CountDownLatch(1);
750+
Flux<OutboundMessage> msgFlux = Flux.empty();
751+
752+
sender = createSender();
753+
sender.sendWithPublishConfirms(msgFlux)
754+
.doFinally(signalType -> finallyLatch.countDown())
755+
.subscribe();
756+
757+
assertTrue(finallyLatch.await(1, TimeUnit.SECONDS));
758+
}
759+
711760
@Test
712761
void publishConfirmsMaxInFlight() throws InterruptedException {
713762
int maxConcurrency = 4;

0 commit comments

Comments
 (0)