Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 8209180

Browse files
committed
Polishing
1 parent 91a96f6 commit 8209180

File tree

2 files changed

+33
-25
lines changed

2 files changed

+33
-25
lines changed

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,20 @@ public class SendOptions {
3737
/**
3838
* The maximum number of in-flight records that are fetched
3939
* from the outbound record publisher while publisher confirms are pending.
40+
* <p>
41+
* The number of in-flight records is not limited by default.
4042
*
41-
* @since 1.1.1
43+
* @since 1.2.0
4244
*/
4345
private Integer maxInFlight;
4446

4547
/**
46-
* The scheduler used for publishing send results.
48+
* The scheduler used for publishing publisher confirms when in-flight records are limited.
49+
* <p>
50+
* The default is {@link Schedulers#immediate()}, so the caller's thread.
4751
*
48-
* @since 1.1.1
52+
* @see #maxInFlight
53+
* @since 1.2.0
4954
*/
5055
private Scheduler scheduler = Schedulers.immediate();
5156

@@ -68,7 +73,7 @@ public class SendOptions {
6873
* from the outbound record publisher while publisher confirms are pending.
6974
*
7075
* @return maximum number of in-flight records
71-
* @since 1.1.1
76+
* @since 1.2.0
7277
*/
7378
public Integer getMaxInFlight() {
7479
return maxInFlight;
@@ -77,10 +82,12 @@ public Integer getMaxInFlight() {
7782
/**
7883
* Set the maximum number of in-flight records that are fetched
7984
* from the outbound record publisher while publisher confirms are pending.
85+
* <p>
86+
* The number of in-flight records is not limited by default.
8087
*
8188
* @param maxInFlight
8289
* @return this {@link SendOptions} instance
83-
* @since 1.1.1
90+
* @since 1.2.0
8491
*/
8592
public SendOptions maxInFlight(int maxInFlight) {
8693
this.maxInFlight = maxInFlight;
@@ -95,7 +102,7 @@ public SendOptions maxInFlight(int maxInFlight) {
95102
* @param maxInFlight
96103
* @param scheduler
97104
* @return this {@link SendOptions} instance
98-
* @since 1.1.1
105+
* @since 1.2.0
99106
*/
100107
public SendOptions maxInFlight(int maxInFlight, Scheduler scheduler) {
101108
this.maxInFlight = maxInFlight;
@@ -107,7 +114,7 @@ public SendOptions maxInFlight(int maxInFlight, Scheduler scheduler) {
107114
* The scheduler used for publishing send results.
108115
*
109116
* @return scheduler used for publishing send results
110-
* @since 1.1.1
117+
* @since 1.2.0
111118
*/
112119
public Scheduler getScheduler() {
113120
return scheduler;

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -726,17 +726,17 @@ public void publishConfirmsBackpressure() throws Exception {
726726

727727
sender = createSender();
728728
sender.sendWithPublishConfirms(msgFlux).subscribe(new BaseSubscriber<OutboundMessageResult>() {
729-
@Override
730-
protected void hookOnSubscribe(Subscription subscription) {
731-
subscription.request(subscriberRequest);
732-
}
729+
@Override
730+
protected void hookOnSubscribe(Subscription subscription) {
731+
subscription.request(subscriberRequest);
732+
}
733733

734-
@Override
735-
protected void hookOnNext(OutboundMessageResult outboundMessageResult) {
736-
if (outboundMessageResult.getOutboundMessage() != null) {
737-
confirmedLatch.countDown();
738-
}
734+
@Override
735+
protected void hookOnNext(OutboundMessageResult outboundMessageResult) {
736+
if (outboundMessageResult.getOutboundMessage() != null) {
737+
confirmedLatch.countDown();
739738
}
739+
}
740740
});
741741

742742
assertTrue(consumedLatch.await(1, TimeUnit.SECONDS));
@@ -768,20 +768,21 @@ void publishConfirmsMaxInFlight() throws InterruptedException {
768768

769769
Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> {
770770
int current = inflight.incrementAndGet();
771-
if (current > maxInflight.get())
771+
if (current > maxInflight.get()) {
772772
maxInflight.set(current);
773+
}
773774
return new OutboundMessage("", queue, "".getBytes());
774775
});
775776

776777
sender = createSender();
777778
sender
778-
.sendWithPublishConfirms(msgFlux, new SendOptions().maxInFlight(maxConcurrency))
779-
.subscribe(outboundMessageResult -> {
780-
inflight.decrementAndGet();
781-
if (outboundMessageResult.isAck() && outboundMessageResult.getOutboundMessage() != null) {
782-
confirmedLatch.countDown();
783-
}
784-
});
779+
.sendWithPublishConfirms(msgFlux, new SendOptions().maxInFlight(maxConcurrency))
780+
.subscribe(outboundMessageResult -> {
781+
inflight.decrementAndGet();
782+
if (outboundMessageResult.isAck() && outboundMessageResult.getOutboundMessage() != null) {
783+
confirmedLatch.countDown();
784+
}
785+
});
785786

786787
assertTrue(confirmedLatch.await(1, TimeUnit.SECONDS));
787788
assertThat(maxInflight.get()).isLessThanOrEqualTo(maxConcurrency);
@@ -814,7 +815,7 @@ public void publishConfirmsErrorWhilePublishing() throws Exception {
814815
sender.sendWithPublishConfirms(msgFlux, new SendOptions().exceptionHandler((ctx, e) -> {
815816
throw new RabbitFluxException(e);
816817
})) // Before change: (onNext -> onError -> onNext )
817-
// After change (maxInFlight): (onNext -> onError)
818+
// After change (maxInFlight): (onNext -> onError)
818819
.subscribe(outboundMessageResult -> {
819820
if (outboundMessageResult.getOutboundMessage() != null) {
820821
confirmLatch.countDown();

0 commit comments

Comments
 (0)