Skip to content

Commit 842e4e8

Browse files
committed
Add --credits option to grant credits every n chunks
References rabbitmq/rabbitmq-stream-java-client#843
1 parent ee77ac7 commit 842e4e8

File tree

3 files changed

+85
-7
lines changed

3 files changed

+85
-7
lines changed

src/main/java/com/rabbitmq/stream/perf/Converters.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ static class NotNegativeIntegerTypeConverter implements CommandLine.ITypeConvert
142142
@Override
143143
public Integer convert(String input) {
144144
try {
145-
Integer value = Integer.valueOf(input);
145+
int value = Integer.parseInt(input);
146146
if (value < 0) {
147147
throw new IllegalArgumentException();
148148
}
@@ -153,6 +153,35 @@ public Integer convert(String input) {
153153
}
154154
}
155155

156+
static class CreditsTypeConverter implements CommandLine.ITypeConverter<Credits> {
157+
158+
@Override
159+
public Credits convert(String input) {
160+
try {
161+
int[] credits;
162+
if (input.contains("-")) {
163+
String[] creditsStr = input.split("-");
164+
int initialCredits = Integer.parseInt(creditsStr[0]);
165+
int n = Integer.parseInt(creditsStr[1]);
166+
if (initialCredits < 0 || n < 0) {
167+
throw new IllegalArgumentException();
168+
}
169+
credits = new int[] {initialCredits, n};
170+
} else {
171+
int initialCredits = Integer.parseInt(input);
172+
if (initialCredits < 0) {
173+
throw new IllegalArgumentException();
174+
}
175+
credits = new int[] {initialCredits, initialCredits / 2};
176+
}
177+
return new Credits(credits[0], credits[1]);
178+
} catch (Exception e) {
179+
throw new CommandLine.TypeConversionException(
180+
"'" + input + "' is not valid, valid example values: 10-5, 5-2");
181+
}
182+
}
183+
}
184+
156185
static class ByteCapacityTypeConverter implements CommandLine.ITypeConverter<ByteCapacity> {
157186

158187
@Override
@@ -451,4 +480,23 @@ public Boolean convert(String value) {
451480
private static void throwConversionException(String format, String... arguments) {
452481
throw new CommandLine.TypeConversionException(String.format(format, (Object[]) arguments));
453482
}
483+
484+
static class Credits {
485+
486+
private final int initialCredits;
487+
private final int n;
488+
489+
Credits(int initialCredits, int n) {
490+
this.initialCredits = initialCredits;
491+
this.n = n;
492+
}
493+
494+
int initialCredits() {
495+
return this.initialCredits;
496+
}
497+
498+
int n() {
499+
return this.n;
500+
}
501+
}
454502
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.stream.impl.Client;
3232
import com.rabbitmq.stream.impl.ClientProperties;
3333
import com.rabbitmq.stream.metrics.MetricsCollector;
34+
import com.rabbitmq.stream.perf.Converters.Credits;
3435
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
3536
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
3637
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
@@ -648,6 +649,13 @@ static class InstanceSyncOptions {
648649
converter = Converters.NotNegativeIntegerTypeConverter.class)
649650
private int initialCredits;
650651

652+
@CommandLine.Option(
653+
names = {"--credits"},
654+
description = "initial credits and credits to grant every n chunks (e.g. 10-5)",
655+
defaultValue = "0",
656+
converter = Converters.CreditsTypeConverter.class)
657+
private Credits credits;
658+
651659
@CommandLine.Option(
652660
names = {"--heartbeat", "-b"},
653661
description = "requested heartbeat in seconds",
@@ -1309,12 +1317,17 @@ public Integer call() throws Exception {
13091317

13101318
String stream = stream(streams, i);
13111319
ConsumerBuilder consumerBuilder =
1312-
environment
1313-
.consumerBuilder()
1314-
.offset(this.offset)
1315-
.flow()
1316-
.initialCredits(this.initialCredits)
1317-
.builder();
1320+
environment.consumerBuilder().offset(this.offset);
1321+
1322+
if (this.credits.initialCredits() > 0) {
1323+
consumerBuilder
1324+
.flow()
1325+
.strategy(
1326+
ConsumerFlowStrategy.creditEveryNthChunk(
1327+
this.credits.initialCredits(), this.credits.n()));
1328+
} else {
1329+
consumerBuilder.flow().initialCredits(this.initialCredits);
1330+
}
13181331

13191332
if (this.superStreams) {
13201333
consumerBuilder.superStream(stream);

src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,23 @@ void durationTypeConverterKo(String value) {
225225
.hasMessageContaining("valid");
226226
}
227227

228+
@ParameterizedTest
229+
@CsvSource({"10-5,10,5", "5-2,5,2", "10,10,5", "5,5,2", "0,0,0"})
230+
void creditsTypeConverterOk(String value, int initialCredits, int n) {
231+
Converters.CreditsTypeConverter converter = new Converters.CreditsTypeConverter();
232+
Converters.Credits credits = converter.convert(value);
233+
assertThat(credits.initialCredits()).isEqualTo(initialCredits);
234+
assertThat(credits.n()).isEqualTo(n);
235+
}
236+
237+
@ParameterizedTest
238+
@ValueSource(strings = {"a", "1-", ""})
239+
void creditsTypeConverterKo(String value) {
240+
Converters.CreditsTypeConverter converter = new Converters.CreditsTypeConverter();
241+
assertThatThrownBy(() -> converter.convert(value))
242+
.isInstanceOfAny(IllegalArgumentException.class, CommandLine.TypeConversionException.class);
243+
}
244+
228245
private static Tag tag(String key, String value) {
229246
return Tag.of(key, value);
230247
}

0 commit comments

Comments
 (0)