Skip to content

Commit d08221b

Browse files
authored
Merge pull request #905 from rabbitmq/deprecate-fixed-size-batching
Deprecated producer fixed-size batching
2 parents affedc4 + 8c8c8ab commit d08221b

File tree

8 files changed

+17
-60
lines changed

8 files changed

+17
-60
lines changed

.github/workflows/test-pr.yml

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,9 @@ jobs:
3030
cache: 'maven'
3131
- name: Start broker
3232
run: ci/start-broker.sh
33-
- name: Test (no dynamic-batch publishing)
34-
run: |
35-
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
36-
-Drabbitmq.stream.producer.dynamic.batch=false \
37-
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
38-
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
39-
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
40-
- name: Test (dynamic-batch publishing)
33+
- name: Test
4134
run: |
4235
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
43-
-Drabbitmq.stream.producer.dynamic.batch=true \
4436
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4537
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4638
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem

.github/workflows/test-rabbitmq-alphas.yml

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,9 @@ jobs:
4040
run: ci/start-broker.sh
4141
env:
4242
RABBITMQ_IMAGE: ${{ matrix.rabbitmq-image }}
43-
- name: Test (no dynamic-batch publishing)
44-
run: |
45-
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
46-
-Drabbitmq.stream.producer.dynamic.batch=false \
47-
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
48-
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
49-
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
50-
- name: Test (dynamic-batch publishing)
43+
- name: Test
5144
run: |
5245
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
53-
-Drabbitmq.stream.producer.dynamic.batch=true \
5446
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
5547
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
5648
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem

.github/workflows/test-supported-java-versions.yml

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,9 @@ jobs:
3030
run: ci/start-broker.sh
3131
- name: Display Java version
3232
run: ./mvnw --version
33-
- name: Test (no dynamic-batch publishing)
34-
run: |
35-
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
36-
-Drabbitmq.stream.producer.dynamic.batch=false \
37-
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
38-
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
39-
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
40-
-Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
41-
- name: Test (dynamic-batch publishing)
33+
- name: Test
4234
run: |
4335
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
44-
-Drabbitmq.stream.producer.dynamic.batch=true \
4536
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4637
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4738
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \

.github/workflows/test.yml

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,9 @@ jobs:
3636
gpg-passphrase: MAVEN_GPG_PASSPHRASE
3737
- name: Start broker
3838
run: ci/start-broker.sh
39-
- name: Test (no dynamic-batch publishing)
40-
run: |
41-
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
42-
-Drabbitmq.stream.producer.dynamic.batch=false \
43-
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
44-
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
45-
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
46-
- name: Test (dynamic-batch publishing)
39+
- name: Test
4740
run: |
4841
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
49-
-Drabbitmq.stream.producer.dynamic.batch=true \
5042
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
5143
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
5244
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,23 +110,18 @@ public interface ProducerBuilder {
110110
*
111111
* <p>Set this flag to <code>true</code> if you want as little delay as possible between calling
112112
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
113-
* Consumers should provide enough initial credits (between 5 and 10, depending on the workload),
114-
* see {@link ConsumerBuilder#flow()} and {@link
115-
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
116113
*
117114
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
118-
* want the highest throughput possible for both publishing and consuming. Consumers can provide 1
119-
* initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link
120-
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
115+
* want the highest throughput possible for both publishing and consuming.
121116
*
122117
* <p>Dynamic batch is activated by default (<code>dynamicBatch = true</code>).
123118
*
124119
* @param dynamicBatch
125120
* @return this builder instance
126121
* @since 0.20.0
127-
* @see ConsumerBuilder#flow()
128-
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
122+
* @deprecated dynamic/adaptive will be the default and only option in future releases
129123
*/
124+
@Deprecated
130125
ProducerBuilder dynamicBatch(boolean dynamicBatch);
131126

132127
/**

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131

3232
class StreamProducerBuilder implements ProducerBuilder {
3333

34-
static final boolean DEFAULT_DYNAMIC_BATCH =
35-
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
36-
3734
private final StreamEnvironment environment;
3835

3936
private String name;
@@ -60,7 +57,7 @@ class StreamProducerBuilder implements ProducerBuilder {
6057

6158
private Function<Message, String> filterValueExtractor;
6259

63-
private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH;
60+
private boolean dynamicBatch = true;
6461

6562
private final List<Resource.StateListener> listeners = new ArrayList<>();
6663

src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
231231
.mapToObj(
232232
i -> {
233233
String s = streams.get(i % streams.size());
234-
boolean dynamicBatch = i % 2 == 0;
235-
return new ProducerState(s, dynamicBatch, environment);
234+
return new ProducerState(s, environment);
236235
})
237236
.collect(toList());
238237
consumers =
@@ -369,7 +368,7 @@ void sacWithClusterRestart(boolean superStream) throws Exception {
369368
}
370369
sCreator.create();
371370

372-
pState = new ProducerState(s, true, superStream, environment);
371+
pState = new ProducerState(s, superStream, environment);
373372
pState.start();
374373

375374
Map<Integer, Boolean> consumerStatus = new ConcurrentHashMap<>();
@@ -476,14 +475,13 @@ private static class ProducerState implements AutoCloseable {
476475
final AtomicReference<Throwable> lastException = new AtomicReference<>();
477476
final AtomicReference<Instant> lastExceptionInstant = new AtomicReference<>();
478477

479-
private ProducerState(String stream, boolean dynamicBatch, Environment environment) {
480-
this(stream, dynamicBatch, false, environment);
478+
private ProducerState(String stream, Environment environment) {
479+
this(stream, false, environment);
481480
}
482481

483-
private ProducerState(
484-
String stream, boolean dynamicBatch, boolean superStream, Environment environment) {
482+
private ProducerState(String stream, boolean superStream, Environment environment) {
485483
this.stream = stream;
486-
ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch);
484+
ProducerBuilder builder = environment.producerBuilder();
487485
if (superStream) {
488486
builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString());
489487
} else {

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
187187
"stream",
188188
subEntrySize,
189189
10,
190-
StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH,
190+
true,
191191
Compression.NONE,
192192
Duration.ofMillis(100),
193193
messageCount * 10,
@@ -239,7 +239,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
239239
"stream",
240240
subEntrySize,
241241
10,
242-
StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH,
242+
true,
243243
Compression.NONE,
244244
Duration.ZERO,
245245
2,
@@ -281,7 +281,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
281281
"stream",
282282
subEntrySize,
283283
10,
284-
StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH,
284+
true,
285285
Compression.NONE,
286286
Duration.ZERO,
287287
2,

0 commit comments

Comments
 (0)