File tree Expand file tree Collapse file tree 6 files changed +38
-4
lines changed
main/java/com/rabbitmq/stream/impl
test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 6 files changed +38
-4
lines changed Original file line number Diff line number Diff line change @@ -32,11 +32,19 @@ jobs:
3232 run : ci/start-broker.sh
3333 env :
3434 RABBITMQ_IMAGE : ${{ matrix.rabbitmq-image }}
35- - name : Test
35+ - name : Test (no dynamic-batch publishing)
3636 run : |
3737 ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
38+ -Drabbitmq.stream.producer.dynamic.batch=false \
3839 -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
3940 -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4041 -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
42+ - name : Test (dynamic-batch publishing)
43+ run : |
44+ ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
45+ -Drabbitmq.stream.producer.dynamic.batch=true \
46+ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
47+ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
48+ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
4149 - name : Stop broker
4250 run : docker stop rabbitmq && docker rm rabbitmq
Original file line number Diff line number Diff line change @@ -33,12 +33,21 @@ jobs:
3333 run : ci/start-broker.sh
3434 - name : Display Java version
3535 run : ./mvnw --version
36- - name : Test
36+ - name : Test (no dynamic-batch publishing)
3737 run : |
3838 ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
39+ -Drabbitmq.stream.producer.dynamic.batch=false \
3940 -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4041 -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4142 -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
4243 -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
44+ - name : Test (dynamic-batch publishing)
45+ run : |
46+ ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
47+ -Drabbitmq.stream.producer.dynamic.batch=true \
48+ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
49+ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
50+ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
51+ -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
4352 - name : Stop broker
4453 run : docker stop rabbitmq && docker rm rabbitmq
Original file line number Diff line number Diff line change @@ -33,12 +33,20 @@ jobs:
3333 gpg-passphrase : MAVEN_GPG_PASSPHRASE
3434 - name : Start broker
3535 run : ci/start-broker.sh
36- - name : Test
36+ - name : Test (no dynamic-batch publishing)
3737 run : |
3838 ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
39+ -Drabbitmq.stream.producer.dynamic.batch=false \
3940 -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4041 -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4142 -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
43+ - name : Test (dynamic-batch publishing)
44+ run : |
45+ ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
46+ -Drabbitmq.stream.producer.dynamic.batch=true \
47+ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
48+ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
49+ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
4250 - name : Stop broker
4351 run : docker stop rabbitmq && docker rm rabbitmq
4452 - name : Upload Codecov report
Original file line number Diff line number Diff line change @@ -86,6 +86,7 @@ class StreamProducer implements Producer {
8686 String stream ,
8787 int subEntrySize ,
8888 int batchSize ,
89+ boolean dynamicBatch ,
8990 Compression compression ,
9091 Duration batchPublishingDelay ,
9192 int maxUnconfirmedMessages ,
@@ -172,7 +173,6 @@ public int fragmentLength(Object entity) {
172173 if (compression != null ) {
173174 compressionCodec = environment .compressionCodecFactory ().get (compression );
174175 }
175- boolean dynamicBatch = true ;
176176 this .accumulator =
177177 ProducerUtils .createMessageAccumulator (
178178 dynamicBatch ,
Original file line number Diff line number Diff line change 2727
2828class StreamProducerBuilder implements ProducerBuilder {
2929
30+ static final boolean DEFAULT_DYNAMIC_BATCH =
31+ Boolean .parseBoolean (System .getProperty ("rabbitmq.stream.producer.dynamic.batch" , "false" ));
32+
3033 private final StreamEnvironment environment ;
3134
3235 private String name ;
@@ -53,6 +56,8 @@ class StreamProducerBuilder implements ProducerBuilder {
5356
5457 private Function <Message , String > filterValueExtractor ;
5558
59+ private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH ;
60+
5661 StreamProducerBuilder (StreamEnvironment environment ) {
5762 this .environment = environment ;
5863 }
@@ -198,6 +203,7 @@ public Producer build() {
198203 stream ,
199204 subEntrySize ,
200205 batchSize ,
206+ dynamicBatch ,
201207 compression ,
202208 batchPublishingDelay ,
203209 maxUnconfirmedMessages ,
Original file line number Diff line number Diff line change @@ -176,6 +176,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
176176 "stream" ,
177177 subEntrySize ,
178178 10 ,
179+ StreamProducerBuilder .DEFAULT_DYNAMIC_BATCH ,
179180 Compression .NONE ,
180181 Duration .ofMillis (100 ),
181182 messageCount * 10 ,
@@ -226,6 +227,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
226227 "stream" ,
227228 subEntrySize ,
228229 10 ,
230+ StreamProducerBuilder .DEFAULT_DYNAMIC_BATCH ,
229231 Compression .NONE ,
230232 Duration .ZERO ,
231233 2 ,
@@ -266,6 +268,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
266268 "stream" ,
267269 subEntrySize ,
268270 10 ,
271+ StreamProducerBuilder .DEFAULT_DYNAMIC_BATCH ,
269272 Compression .NONE ,
270273 Duration .ZERO ,
271274 2 ,
You can’t perform that action at this time.
0 commit comments