Skip to content

Commit 4a0dd37

Browse files
authored
[Event Hub] Improve flush functionality of buffered partition producer. (Azure#44904)
* Improve flush functionality of buffered parition producer. * Remove EventDataBatchCarrier and use EventDataBatch.EMPTY instead * address review feedback
1 parent a07de80 commit 4a0dd37

File tree

7 files changed

+297
-90
lines changed

7 files changed

+297
-90
lines changed

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@
88

99
### Bugs Fixed
1010

11-
- Fixes issue where `EventHubBufferedProducerClient` and `EventHubBufferedProducerAsyncClient` are unable to enqueue events when `SendOptions.getPartitionId()` is set. ([#44392](https://github.com/Azure/azure-sdk-for-java/pull/44392))
12-
- Fixes issue where `EventHubBufferedProducerClient` and `EventHubBufferedProducerAsyncClient` returns 0 after enqueueing events or calling `getBufferedEventCount()`. ([#44392](https://github.com/Azure/azure-sdk-for-java/pull/44392))
11+
- Fixes issue where `EventHubBufferedProducerClient` and `EventHubBufferedProducerAsyncClient` are unable to `flush`. ([#44904](https://github.com/Azure/azure-sdk-for-java/pull/44904))
1312

1413
## 5.20.2 (2025-03-24)
1514

15+
### Bugs Fixed
16+
17+
- Fixes issue where `EventHubBufferedProducerClient` and `EventHubBufferedProducerAsyncClient` are unable to enqueue events when `SendOptions.getPartitionId()` is set. ([#44392](https://github.com/Azure/azure-sdk-for-java/pull/44392))
18+
- Fixes issue where `EventHubBufferedProducerClient` and `EventHubBufferedProducerAsyncClient` returns 0 after enqueueing events or calling `getBufferedEventCount()`. ([#44392](https://github.com/Azure/azure-sdk-for-java/pull/44392))
19+
1620
### Other Changes
1721

1822
#### Dependency Updates

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,12 @@ public void onComplete() {
239239
* @param alwaysPublish {@code true} to always push batch downstream. {@code false}, otherwise.
240240
*/
241241
private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) {
242-
if (alwaysPublish) {
243-
publishDownstream();
242+
final boolean isFlush = isFlushSignal(eventData);
243+
if (alwaysPublish || isFlush) {
244+
publishDownstream(isFlush);
244245
return;
245-
} else if (eventData == null) {
246+
}
247+
if (eventData == null) {
246248
// EventData will be null in the case when options.maxWaitTime() has elapsed and we want to push the
247249
// batch downstream.
248250
return;
@@ -256,7 +258,7 @@ private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) {
256258
return;
257259
}
258260

259-
publishDownstream();
261+
publishDownstream(false);
260262
added = currentBatch.tryAdd(eventData);
261263
}
262264

@@ -271,19 +273,24 @@ private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) {
271273
/**
272274
* Publishes batch downstream if there are events in the batch and updates it.
273275
*/
274-
private void publishDownstream() {
276+
private void publishDownstream(boolean isFlush) {
275277
EventDataBatch previous = null;
276-
277278
try {
278279
synchronized (lock) {
279280
previous = this.currentBatch;
280-
281281
if (previous == null) {
282282
logger.warning("Batch should not be null, setting a new batch.");
283-
284283
this.currentBatch = batchSupplier.get();
284+
if (isFlush) {
285+
downstream.onNext(EventDataBatch.EMPTY);
286+
}
285287
return;
286288
} else if (previous.getEvents().isEmpty()) {
289+
if (isFlush) {
290+
// Even if the batch is empty, we'll push EMPTY on a flush signal.
291+
// This ensures any flush related flags set at the call site are reset.
292+
downstream.onNext(EventDataBatch.EMPTY);
293+
}
287294
return;
288295
}
289296

@@ -332,5 +339,9 @@ private void publishDownstream() {
332339
}
333340
}
334341
}
342+
343+
private static boolean isFlushSignal(EventData eventData) {
344+
return eventData instanceof FlushSignal;
345+
}
335346
}
336347
}

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
* producer.
3636
*/
3737
public final class EventDataBatch {
38+
static final EventDataBatch EMPTY
39+
= new EventDataBatch(0, null, null, null, EventHubsProducerInstrumentation.NOOP_INSTANCE);
3840
private static final ClientLogger LOGGER = new ClientLogger(EventDataBatch.class);
3941
private final int maxMessageSize;
4042
private final String partitionKey;

0 commit comments

Comments
 (0)