File tree Expand file tree Collapse file tree 3 files changed +33
-0
lines changed
main/java/com/rabbitmq/stream Expand file tree Collapse file tree 3 files changed +33
-0
lines changed Original file line number Diff line number Diff line change @@ -455,6 +455,10 @@ blocking when the limit is reached.
455455|Period to send a batch of messages.
456456|100 ms
457457
458+ |`dynamicBatch`
459+ |Adapt batch size depending on ingress rate.
460+ |false
461+
458462|`confirmTimeout`
459463|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
460464outstanding unconfirmed messages timed out.
Original file line number Diff line number Diff line change @@ -97,6 +97,28 @@ public interface ProducerBuilder {
9797 */
9898 ProducerBuilder batchPublishingDelay (Duration batchPublishingDelay );
9999
100+ /**
101+ * Adapt batch size depending on ingress rate.
102+ *
103+ * <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
104+ * for sustained high ingress rates.
105+ *
106+ * <p>Set this flag to <code>true</code> if you want as little delay as possible before calling
107+ * {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
108+ *
109+ * <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
110+ * want the highest throughput possible for both publishing and consuming.
111+ *
112+ * <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
113+ *
114+ * <p>Dynamic batch is experimental.
115+ *
116+ * @param dynamicBatch
117+ * @return this builder instance
118+ * @since 0.20.0
119+ */
120+ ProducerBuilder dynamicBatch (boolean dynamicBatch );
121+
100122 /**
101123 * The maximum number of unconfirmed outbound messages.
102124 *
Original file line number Diff line number Diff line change @@ -102,11 +102,18 @@ public ProducerBuilder compression(Compression compression) {
102102 return this ;
103103 }
104104
105+ @ Override
105106 public StreamProducerBuilder batchPublishingDelay (Duration batchPublishingDelay ) {
106107 this .batchPublishingDelay = batchPublishingDelay ;
107108 return this ;
108109 }
109110
111+ @ Override
112+ public ProducerBuilder dynamicBatch (boolean dynamicBatch ) {
113+ this .dynamicBatch = dynamicBatch ;
114+ return this ;
115+ }
116+
110117 @ Override
111118 public ProducerBuilder maxUnconfirmedMessages (int maxUnconfirmedMessages ) {
112119 if (maxUnconfirmedMessages <= 0 ) {
You can’t perform that action at this time.
0 commit comments