Skip to content

Commit 5e091af

Browse files
authored
Merge pull request #906 from rabbitmq/improve-dynamic-batch
Improve producer adaptative batching
2 parents 53ac877 + 4bc58b2 commit 5e091af

File tree

3 files changed

+293
-31
lines changed

3 files changed

+293
-31
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,39 +30,54 @@ final class DynamicBatch<T> implements AutoCloseable {
3030
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
3131
private static final int MIN_BATCH_SIZE = 16;
3232

33-
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
33+
// Additive Increase, Multiplicative Decrease (AIMD) parameters
34+
private static final double MULTIPLICATIVE_DECREASE_FACTOR = 0.5;
35+
private static final int ADDITIVE_INCREASE_STEP_DIVISOR = 10; // configuredBatchSize / 10
36+
37+
private static final long RETRY_DELAY_MS = 50;
38+
39+
private final BlockingQueue<T> requests;
3440
private final BatchConsumer<T> consumer;
3541
private final int configuredBatchSize, minBatchSize, maxBatchSize;
42+
private final int additiveIncreaseStep;
3643
private final Thread thread;
44+
private volatile boolean running = true;
3745

3846
DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed, String id) {
3947
this.consumer = consumer;
48+
this.requests = new LinkedBlockingQueue<>(max(1, maxUnconfirmed));
4049
if (batchSize < maxUnconfirmed) {
4150
this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2);
4251
} else {
43-
this.minBatchSize = min(1, maxUnconfirmed / 2);
52+
this.minBatchSize = max(1, maxUnconfirmed / 2);
4453
}
4554
this.configuredBatchSize = batchSize;
4655
this.maxBatchSize = batchSize * 2;
56+
57+
// Calculate additive increase step: 10% of configured size, minimum 1
58+
this.additiveIncreaseStep = max(1, batchSize / ADDITIVE_INCREASE_STEP_DIVISOR);
59+
4760
this.thread = ThreadUtils.newInternalThread(id, this::loop);
61+
this.thread.setDaemon(true);
4862
this.thread.start();
4963
}
5064

5165
void add(T item) {
5266
try {
5367
requests.put(item);
5468
} catch (InterruptedException e) {
69+
Thread.currentThread().interrupt();
5570
throw new RuntimeException(e);
5671
}
5772
}
5873

5974
private void loop() {
60-
State<T> state = new State<>();
75+
// Initial allocation based on maxBatchSize to avoid resizing if it grows
76+
State<T> state = new State<>(new ArrayList<>(this.maxBatchSize));
6177
state.batchSize = this.configuredBatchSize;
62-
state.items = new ArrayList<>(state.batchSize);
6378
Thread currentThread = Thread.currentThread();
6479
T item;
65-
while (!currentThread.isInterrupted()) {
80+
while (running && !currentThread.isInterrupted()) {
6681
try {
6782
item = this.requests.poll(100, TimeUnit.MILLISECONDS);
6883
} catch (InterruptedException e) {
@@ -71,59 +86,87 @@ private void loop() {
7186
}
7287
if (item != null) {
7388
state.items.add(item);
74-
if (state.items.size() >= state.batchSize) {
75-
this.maybeCompleteBatch(state, true);
76-
} else {
77-
pump(state, 2);
89+
int remaining = state.batchSize - state.items.size();
90+
if (remaining > 0) {
91+
this.requests.drainTo(state.items, remaining);
7892
}
93+
this.maybeCompleteBatch(state, state.items.size() >= state.batchSize);
7994
} else {
8095
this.maybeCompleteBatch(state, false);
8196
}
8297
}
8398
}
8499

85-
private void pump(State<T> state, int pumpCount) {
86-
if (pumpCount <= 0) {
87-
return;
88-
}
89-
T item = this.requests.poll();
90-
if (item == null) {
91-
this.maybeCompleteBatch(state, false);
92-
} else {
93-
state.items.add(item);
94-
if (state.items.size() >= state.batchSize) {
95-
this.maybeCompleteBatch(state, true);
96-
}
97-
this.pump(state, pumpCount - 1);
98-
}
99-
}
100-
101100
private static final class State<T> {
102101

103102
int batchSize;
104-
List<T> items;
103+
final ArrayList<T> items;
104+
105+
private State(ArrayList<T> items) {
106+
this.items = items;
107+
}
105108
}
106109

107110
private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
111+
if (state.items.isEmpty()) {
112+
return;
113+
}
114+
108115
try {
109116
boolean completed = this.consumer.process(state.items);
110117
if (completed) {
111118
if (increaseIfCompleted) {
112-
state.batchSize = min(state.batchSize * 2, this.maxBatchSize);
119+
// AIMD: Additive Increase
120+
// Grow slowly and linearly when batch is full
121+
state.batchSize = min(state.batchSize + this.additiveIncreaseStep, this.maxBatchSize);
113122
} else {
114-
state.batchSize = max(state.batchSize / 2, this.minBatchSize);
123+
// AIMD: Multiplicative Decrease
124+
// React quickly to low utilization by cutting back
125+
state.batchSize =
126+
max((int) (state.batchSize * MULTIPLICATIVE_DECREASE_FACTOR), this.minBatchSize);
115127
}
116-
state.items = new ArrayList<>(state.batchSize);
128+
state.items.clear();
129+
return;
117130
}
118131
} catch (Exception e) {
119-
// e.printStackTrace();
120-
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
132+
LOGGER.warn(
133+
"Error during dynamic batch completion, batch size: {}, items: {}",
134+
state.batchSize,
135+
state.items.size(),
136+
e);
137+
}
138+
try {
139+
Thread.sleep(RETRY_DELAY_MS);
140+
} catch (InterruptedException e) {
141+
Thread.currentThread().interrupt();
121142
}
122143
}
123144

124145
@Override
125146
public void close() {
147+
this.running = false;
126148
this.thread.interrupt();
149+
try {
150+
this.thread.join(TimeUnit.SECONDS.toMillis(5));
151+
if (this.thread.isAlive()) {
152+
LOGGER.warn("Dynamic batch thread did not terminate within timeout");
153+
}
154+
} catch (InterruptedException e) {
155+
Thread.currentThread().interrupt();
156+
LOGGER.warn("Interrupted while waiting for dynamic batch thread to terminate");
157+
}
158+
// Process any remaining items in the queue
159+
if (!this.requests.isEmpty()) {
160+
List<T> remaining = new ArrayList<>();
161+
this.requests.drainTo(remaining);
162+
if (!remaining.isEmpty()) {
163+
try {
164+
this.consumer.process(remaining);
165+
} catch (Exception e) {
166+
LOGGER.warn("Error processing remaining {} items during shutdown", remaining.size(), e);
167+
}
168+
}
169+
}
127170
}
128171

129172
@FunctionalInterface

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public StreamProducerBuilder batchPublishingDelay(Duration batchPublishingDelay)
112112
}
113113

114114
@Override
115+
@SuppressWarnings("deprecation")
115116
public ProducerBuilder dynamicBatch(boolean dynamicBatch) {
116117
this.dynamicBatch = dynamicBatch;
117118
return this;

0 commit comments

Comments
 (0)