Skip to content

Commit 7e0fc0b

Browse files
authored
Merge pull request #843 from rabbitmq/credit-every-nth-chunk-strategy
Add flow strategy to credit every n chunks
2 parents 77fc9b0 + 52889ae commit 7e0fc0b

File tree

3 files changed

+156
-8
lines changed

3 files changed

+156
-8
lines changed

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ static ConsumerFlowStrategy creditOnChunkArrival() {
118118
*
119119
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
120120
*
121+
* <p>Consider using {@link #creditEveryNthChunk(int, int)} instead as it generates less network
122+
* traffic.
123+
*
121124
* @param initialCredits number of initial credits
122125
* @return flow strategy
123126
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
@@ -168,12 +171,79 @@ static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, do
168171
return new MessageCountConsumerFlowStrategy(initialCredits, ratio);
169172
}
170173

174+
/**
175+
* Strategy that provides the specified number of initial credits and <code>n</code> credits every
176+
* <code>n</code> chunks.
177+
*
178+
* <p>This strategy generates less network traffic than {@link
179+
* com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
180+
* be used instead, unless <code>n</code> is equal to 1.
181+
*
182+
* <p>A rule of thumb is to set <code>n</code> to half the value of initial credits.
183+
*
184+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
185+
*
186+
* @param initialCredits number of initial credits
187+
* @param n number of chunks and number of credits
188+
* @return flow strategy
189+
*/
190+
static ConsumerFlowStrategy creditEveryNthChunk(int initialCredits, int n) {
191+
return new CreditEveryNthChunkConsumerFlowStrategy(initialCredits, n);
192+
}
193+
194+
/**
195+
* Strategy that provides the specified number of initial credits and <code>n</code> credits every
196+
* <code>n</code> chunks.
197+
*
198+
* <p>This strategy generates less network traffic than {@link
199+
* com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
200+
* be used instead, unless <code>n</code> is equal to 1.
201+
*
202+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
203+
*/
204+
final class CreditEveryNthChunkConsumerFlowStrategy implements ConsumerFlowStrategy {
205+
206+
private static final MessageProcessedCallback CALLBACK = v -> {};
207+
208+
private final int initialCredits;
209+
private final AtomicLong chunkCount = new AtomicLong(0);
210+
private final int n;
211+
212+
private CreditEveryNthChunkConsumerFlowStrategy(int initialCredits, int n) {
213+
if (n <= 0) {
214+
throw new IllegalArgumentException("The n argument must be greater than 0");
215+
}
216+
if (initialCredits <= n) {
217+
throw new IllegalArgumentException(
218+
"The number of initial credits must be greater than the limit");
219+
}
220+
this.initialCredits = initialCredits;
221+
this.n = n;
222+
}
223+
224+
@Override
225+
public int initialCredits() {
226+
this.chunkCount.set(0);
227+
return this.initialCredits;
228+
}
229+
230+
@Override
231+
public MessageProcessedCallback start(Context context) {
232+
if (chunkCount.incrementAndGet() % n == 0) {
233+
context.credits(n);
234+
}
235+
return CALLBACK;
236+
}
237+
}
238+
171239
/**
172240
* Strategy that provides the specified number of initial credits and a credit on each new chunk.
173241
*
174242
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
175243
*/
176-
class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {
244+
final class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {
245+
246+
private static final MessageProcessedCallback CALLBACK = v -> {};
177247

178248
private final int initialCredits;
179249

@@ -189,7 +259,7 @@ public int initialCredits() {
189259
@Override
190260
public MessageProcessedCallback start(Context context) {
191261
context.credits(1);
192-
return value -> {};
262+
return CALLBACK;
193263
}
194264
}
195265

@@ -200,7 +270,7 @@ public MessageProcessedCallback start(Context context) {
200270
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
201271
* this strategy, otherwise the broker may stop sending messages to the consumer.
202272
*/
203-
class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
273+
final class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
204274

205275
private final int initialCredits;
206276
private final double ratio;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditEveryNthChunk;
18+
import static java.util.stream.IntStream.range;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
import com.rabbitmq.stream.ConsumerFlowStrategy;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.CsvSource;
27+
28+
public class CreditEveryNthChunkConsumerFlowStrategyTest {
29+
30+
AtomicInteger requestedCredits = new AtomicInteger();
31+
32+
@Test
33+
void invalidArguments() {
34+
assertThatThrownBy(() -> build(1, 1)).isInstanceOf(IllegalArgumentException.class);
35+
assertThatThrownBy(() -> build(1, 0)).isInstanceOf(IllegalArgumentException.class);
36+
assertThatThrownBy(() -> build(10, 0)).isInstanceOf(IllegalArgumentException.class);
37+
}
38+
39+
@ParameterizedTest
40+
@CsvSource({"10,5", "5,2", "2,1"})
41+
void test(int initialCredits, int limit) {
42+
ConsumerFlowStrategy strategy = build(initialCredits, limit);
43+
44+
range(0, limit - 1)
45+
.forEach(
46+
ignored -> {
47+
strategy.start(context());
48+
assertThat(requestedCredits).hasValue(0);
49+
});
50+
strategy.start(context());
51+
assertThat(requestedCredits).hasValue(limit);
52+
}
53+
54+
ConsumerFlowStrategy build(int initial, int limit) {
55+
return creditEveryNthChunk(initial, limit);
56+
}
57+
58+
ConsumerFlowStrategy.Context context() {
59+
requestedCredits.set(0);
60+
return new ConsumerFlowStrategy.Context() {
61+
@Override
62+
public void credits(int credits) {
63+
requestedCredits.addAndGet(credits);
64+
}
65+
66+
@Override
67+
public long messageCount() {
68+
throw new UnsupportedOperationException();
69+
}
70+
};
71+
}
72+
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditEveryNthChunk;
1718
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditWhenHalfMessagesProcessed;
1819
import static com.rabbitmq.stream.impl.Assertions.assertThat;
1920
import static com.rabbitmq.stream.impl.TestUtils.*;
@@ -53,6 +54,7 @@
5354
import org.junit.jupiter.api.extension.ExtendWith;
5455
import org.junit.jupiter.params.ParameterizedTest;
5556
import org.junit.jupiter.params.provider.MethodSource;
57+
import org.junit.jupiter.params.provider.ValueSource;
5658
import org.slf4j.Logger;
5759
import org.slf4j.LoggerFactory;
5860

@@ -160,8 +162,9 @@ void committedOffsetShouldBeSet() throws Exception {
160162
consumer.close();
161163
}
162164

163-
@Test
164-
void consume() throws Exception {
165+
@ParameterizedTest
166+
@ValueSource(booleans = {true, false})
167+
void consume(boolean creditEveryNthChunk) throws Exception {
165168
int messageCount = 100_000;
166169
CountDownLatch publishLatch = new CountDownLatch(messageCount);
167170
Client client =
@@ -183,15 +186,18 @@ void consume() throws Exception {
183186
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
184187

185188
AtomicLong chunkTimestamp = new AtomicLong();
186-
Consumer consumer =
189+
ConsumerBuilder builder =
187190
environment.consumerBuilder().stream(stream)
188191
.offset(OffsetSpecification.first())
189192
.messageHandler(
190193
(context, message) -> {
191194
chunkTimestamp.set(context.timestamp());
192195
consumeLatch.countDown();
193-
})
194-
.build();
196+
});
197+
if (creditEveryNthChunk) {
198+
builder.flow().strategy(creditEveryNthChunk(10, 5));
199+
}
200+
Consumer consumer = builder.build();
195201

196202
org.assertj.core.api.Assertions.assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
197203
org.assertj.core.api.Assertions.assertThat(chunkTimestamp.get()).isNotZero();

0 commit comments

Comments
 (0)