Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit d96a93f

Browse files
authored
Merge pull request #67 from pmackowski/63-sender-confirm-backpressure
Add maxInFlight in sender options and back pressure in publish confirms
2 parents ca61772 + e328fdc commit d96a93f

File tree

4 files changed

+282
-38
lines changed

4 files changed

+282
-38
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Connection;
20+
import org.openjdk.jmh.annotations.*;
21+
import org.openjdk.jmh.infra.Blackhole;
22+
import reactor.core.publisher.Flux;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
@BenchmarkMode(Mode.Throughput)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@State(Scope.Benchmark)
29+
@Warmup(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
30+
@Measurement(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS)
31+
@Fork(1)
32+
@Threads(2)
33+
public class SenderMaxInFlightBenchmark {
34+
35+
Connection connection;
36+
Sender sender;
37+
String queue;
38+
Flux<OutboundMessage> msgFlux;
39+
40+
@Param({"1", "10", "100", "1000"})
41+
public int nbMessages;
42+
43+
@Param({"1", "256", Integer.MAX_VALUE + ""})
44+
public int maxInFlight;
45+
46+
@Setup
47+
public void setupConnection() throws Exception {
48+
connection = SenderBenchmarkUtils.newConnection();
49+
}
50+
51+
@TearDown
52+
public void closeConnection() throws Exception {
53+
connection.close();
54+
}
55+
56+
@Setup(Level.Iteration)
57+
public void setupSender() throws Exception {
58+
queue = SenderBenchmarkUtils.declareQueue(connection);
59+
sender = RabbitFlux.createSender();
60+
msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
61+
}
62+
63+
@TearDown(Level.Iteration)
64+
public void tearDownSender() throws Exception {
65+
SenderBenchmarkUtils.deleteQueue(connection, queue);
66+
if (sender != null) {
67+
sender.close();
68+
}
69+
}
70+
71+
@Benchmark
72+
public void sendWithPublishConfirmsMaxInFlight(Blackhole blackhole) {
73+
blackhole.consume(sender.sendWithPublishConfirms(msgFlux, new SendOptions().maxInFlight(maxInFlight)).blockLast());
74+
}
75+
76+
}

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.rabbitmq.client.Channel;
2020
import reactor.core.publisher.Mono;
2121
import reactor.core.publisher.SignalType;
22+
import reactor.core.scheduler.Scheduler;
23+
import reactor.core.scheduler.Schedulers;
2224

2325
import java.time.Duration;
2426
import java.util.function.BiConsumer;
@@ -32,6 +34,21 @@ public class SendOptions {
3234
Duration.ofSeconds(10), Duration.ofMillis(200), ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
3335
);
3436

37+
/**
38+
* The maximum number of in-flight records that are fetched
39+
* from the outbound record publisher while publisher confirms are pending.
40+
*
41+
* @since 1.1.1
42+
*/
43+
private Integer maxInFlight;
44+
45+
/**
46+
* The scheduler used for publishing send results.
47+
*
48+
* @since 1.1.1
49+
*/
50+
private Scheduler scheduler = Schedulers.immediate();
51+
3552
/**
3653
* Channel mono to use to send messages.
3754
*
@@ -46,6 +63,56 @@ public class SendOptions {
4663
*/
4764
private BiConsumer<SignalType, Channel> channelCloseHandler;
4865

66+
/**
67+
* Returns the maximum number of in-flight records that are fetched
68+
* from the outbound record publisher while publisher confirms are pending.
69+
*
70+
* @return maximum number of in-flight records
71+
* @since 1.1.1
72+
*/
73+
public Integer getMaxInFlight() {
74+
return maxInFlight;
75+
}
76+
77+
/**
78+
* Set the maximum number of in-flight records that are fetched
79+
* from the outbound record publisher while publisher confirms are pending.
80+
*
81+
* @param maxInFlight
82+
* @return this {@link SendOptions} instance
83+
* @since 1.1.1
84+
*/
85+
public SendOptions maxInFlight(int maxInFlight) {
86+
this.maxInFlight = maxInFlight;
87+
return this;
88+
}
89+
90+
/**
91+
* Set the maximum number of in-flight records that are fetched
92+
* from the outbound record publisher while publisher confirms are pending.
93+
* Results are run on a supplied {@link Scheduler}
94+
*
95+
* @param maxInFlight
96+
* @param scheduler
97+
* @return this {@link SendOptions} instance
98+
* @since 1.1.1
99+
*/
100+
public SendOptions maxInFlight(int maxInFlight, Scheduler scheduler) {
101+
this.maxInFlight = maxInFlight;
102+
this.scheduler = scheduler;
103+
return this;
104+
}
105+
106+
/**
107+
* The scheduler used for publishing send results.
108+
*
109+
* @return scheduler used for publishing send results
110+
* @since 1.1.1
111+
*/
112+
public Scheduler getScheduler() {
113+
return scheduler;
114+
}
115+
49116
public BiConsumer<Sender.SendContext, Exception> getExceptionHandler() {
50117
return exceptionHandler;
51118
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
143143
final Mono<? extends Channel> currentChannelMono = getChannelMono(options);
144144
final BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(options);
145145

146-
return currentChannelMono.map(channel -> {
146+
Flux<OutboundMessageResult> result = currentChannelMono.map(channel -> {
147147
try {
148148
channel.confirmSelect();
149149
} catch (IOException e) {
@@ -162,6 +162,11 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
162162
channelCloseThreadPool.execute(() -> channelCloseHandler.accept(signalType, channel));
163163
}
164164
}));
165+
166+
if (sendOptions.getMaxInFlight() != null) {
167+
result = result.publishOn(sendOptions.getScheduler(), sendOptions.getMaxInFlight());
168+
}
169+
return result;
165170
}
166171

167172
// package-protected for testing
@@ -520,11 +525,11 @@ public void subscribe(CoreSubscriber<? super OutboundMessageResult> actual) {
520525
}
521526

522527
private static class PublishConfirmSubscriber implements
523-
CoreSubscriber<OutboundMessage> {
528+
CoreSubscriber<OutboundMessage>, Subscription {
524529

525530
private final AtomicReference<SubscriberState> state = new AtomicReference<>(SubscriberState.INIT);
526531

527-
private final AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
532+
private final AtomicReference<Throwable> firstException = new AtomicReference<>();
528533

529534
private final ConcurrentNavigableMap<Long, OutboundMessage> unconfirmed = new ConcurrentSkipListMap<>();
530535

@@ -534,54 +539,69 @@ private static class PublishConfirmSubscriber implements
534539

535540
private final BiConsumer<SendContext, Exception> exceptionHandler;
536541

542+
private Subscription subscription;
543+
537544
private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult> subscriber, SendOptions options) {
538545
this.channel = channel;
539546
this.subscriber = subscriber;
540547
this.exceptionHandler = options.getExceptionHandler();
541548
}
542549

550+
@Override
551+
public void request(long n) {
552+
subscription.request(n);
553+
}
554+
555+
@Override
556+
public void cancel() {
557+
subscription.cancel();
558+
}
559+
543560
@Override
544561
public void onSubscribe(Subscription subscription) {
545-
channel.addConfirmListener(new ConfirmListener() {
562+
if (Operators.validate(this.subscription, subscription)) {
563+
channel.addConfirmListener(new ConfirmListener() {
546564

547-
@Override
548-
public void handleAck(long deliveryTag, boolean multiple) {
549-
handleAckNack(deliveryTag, multiple, true);
550-
}
565+
@Override
566+
public void handleAck(long deliveryTag, boolean multiple) {
567+
handleAckNack(deliveryTag, multiple, true);
568+
}
551569

552-
@Override
553-
public void handleNack(long deliveryTag, boolean multiple) {
554-
handleAckNack(deliveryTag, multiple, false);
555-
}
570+
@Override
571+
public void handleNack(long deliveryTag, boolean multiple) {
572+
handleAckNack(deliveryTag, multiple, false);
573+
}
556574

557-
private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
558-
if (multiple) {
559-
try {
560-
ConcurrentNavigableMap<Long, OutboundMessage> unconfirmedToSend = unconfirmed.headMap(deliveryTag, true);
561-
Iterator<Map.Entry<Long, OutboundMessage>> iterator = unconfirmedToSend.entrySet().iterator();
562-
while (iterator.hasNext()) {
563-
subscriber.onNext(new OutboundMessageResult(iterator.next().getValue(), ack));
564-
iterator.remove();
575+
private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
576+
if (multiple) {
577+
try {
578+
ConcurrentNavigableMap<Long, OutboundMessage> unconfirmedToSend = unconfirmed.headMap(deliveryTag, true);
579+
Iterator<Map.Entry<Long, OutboundMessage>> iterator = unconfirmedToSend.entrySet().iterator();
580+
while (iterator.hasNext()) {
581+
subscriber.onNext(new OutboundMessageResult(iterator.next().getValue(), ack));
582+
iterator.remove();
583+
}
584+
} catch (Exception e) {
585+
handleError(e, null);
586+
}
587+
} else {
588+
OutboundMessage outboundMessage = unconfirmed.get(deliveryTag);
589+
try {
590+
unconfirmed.remove(deliveryTag);
591+
subscriber.onNext(new OutboundMessageResult(outboundMessage, ack));
592+
} catch (Exception e) {
593+
handleError(e, new OutboundMessageResult(outboundMessage, ack));
565594
}
566-
} catch (Exception e) {
567-
handleError(e, null);
568595
}
569-
} else {
570-
OutboundMessage outboundMessage = unconfirmed.get(deliveryTag);
571-
try {
572-
unconfirmed.remove(deliveryTag);
573-
subscriber.onNext(new OutboundMessageResult(outboundMessage, ack));
574-
} catch (Exception e) {
575-
handleError(e, new OutboundMessageResult(outboundMessage, ack));
596+
if (unconfirmed.size() == 0) {
597+
maybeComplete();
576598
}
577599
}
578-
if (unconfirmed.size() == 0) {
579-
maybeComplete();
580-
}
581-
}
582-
});
583-
state.set(SubscriberState.ACTIVE);
584-
subscriber.onSubscribe(subscription);
600+
});
601+
state.set(SubscriberState.ACTIVE);
602+
this.subscription = subscription;
603+
subscriber.onSubscribe(this);
604+
}
585605
}
586606

587607
@Override

0 commit comments

Comments
 (0)