Skip to content

Commit 924a4e7

Browse files
authored
Update the synchronous work from queue when current work is completed (Azure#27573)
* Queue work when current work is not terminated * Update work when request is 0 * Add syncSubscribed to sync subscribe * Update CHANGELOG * Add UT to test updating work when current work completed
1 parent 6b5238a commit 924a4e7

File tree

4 files changed

+116
-56
lines changed

4 files changed

+116
-56
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Fixed a bug that when the current `SynchronousReceiveWork` is completed, the queued `SynchronousReceiveWork` is not updated. ([#27578](https://github.com/Azure/azure-sdk-for-java/issues/27578))
1011

1112
### Other Changes
1213

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.time.Duration;
1919
import java.time.OffsetDateTime;
2020
import java.util.Objects;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.atomic.AtomicInteger;
2223
import java.util.concurrent.atomic.AtomicReference;
2324
import java.util.function.Consumer;
@@ -60,6 +61,8 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
6061

6162
/* To hold each receive work item to be processed.*/
6263
private final AtomicReference<SynchronousMessageSubscriber> synchronousMessageSubscriber = new AtomicReference<>();
64+
/* To ensure synchronousMessageSubscriber is subscribed only once. */
65+
private final AtomicBoolean syncSubscribed = new AtomicBoolean(false);
6366

6467
/**
6568
* Creates a synchronous receiver given its asynchronous counterpart.
@@ -719,28 +722,36 @@ private void queueWork(int maximumMessageCount, Duration maxWaitTime,
719722

720723
final long id = idGenerator.getAndIncrement();
721724
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);
722-
final SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
725+
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
723726

724727
if (messageSubscriber != null) {
725728
messageSubscriber.queueWork(work);
726729
return;
727730
}
728731

729-
final SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(asyncClient,
730-
work,
731-
isPrefetchDisabled,
732-
operationTimeout);
732+
messageSubscriber = synchronousMessageSubscriber.updateAndGet(subscriber -> {
733+
// Ensuring we create SynchronousMessageSubscriber only once.
734+
if (subscriber == null) {
735+
return new SynchronousMessageSubscriber(asyncClient,
736+
work,
737+
isPrefetchDisabled,
738+
operationTimeout);
739+
} else {
740+
return subscriber;
741+
}
742+
});
733743

734744
// NOTE: We asynchronously send the credit to the service as soon as receiveMessage() API is called (for first
735745
// time).
736746
// This means that there may be messages internally buffered before users start iterating the IterableStream.
737747
// If users do not iterate through the stream and their lock duration expires, it is possible that the
738748
// Service Bus message's delivery count will be incremented.
739-
if (synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
740-
asyncClient.receiveMessagesNoBackPressure().subscribeWith(newSubscriber);
749+
if (!syncSubscribed.getAndSet(true)) {
750+
// The 'subscribeWith' has side effects hence must not be called from
751+
// the above updateFunction of AtomicReference::updateAndGet.
752+
asyncClient.receiveMessagesNoBackPressure().subscribeWith(messageSubscriber);
741753
} else {
742-
newSubscriber.dispose();
743-
synchronousMessageSubscriber.get().queueWork(work);
754+
messageSubscriber.queueWork(work);
744755
}
745756

746757
logger.atVerbose()

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ void queueWork(SynchronousReceiveWork work) {
136136
.addKeyValue("numberOfEvents", work.getNumberOfEvents())
137137
.addKeyValue("timeout", work.getTimeout());
138138

139-
// If previous work items were completed, the message queue is empty and currentWork == null. Update the
140-
// current work and request items upstream if we need to.
141-
if (workQueue.peek() == work) {
139+
// If previous work items were completed, the message queue is empty, and currentWork is null or terminal,
140+
// Update the current work and request items upstream if we need to.
141+
if (workQueue.peek() == work && (currentWork == null || currentWork.isTerminal())) {
142142
logBuilder.log("First work in queue. Requesting upstream if needed.");
143143
getOrUpdateCurrentWork();
144144
} else {
@@ -236,6 +236,11 @@ private void drainQueue() {
236236
numberRequested = REQUESTED.addAndGet(this, -numberConsumed);
237237
}
238238
}
239+
if (numberRequested == 0L) {
240+
logger.atVerbose()
241+
.log("Current work is completed. Schedule next work.");
242+
getOrUpdateCurrentWork();
243+
}
239244
}
240245

241246
/**
@@ -272,35 +277,17 @@ private SynchronousReceiveWork getOrUpdateCurrentWork() {
272277
}
273278

274279
currentWork = workQueue.poll();
275-
while (currentWork != null) {
276-
// For the terminal work, subtract the remaining number of messages from our current request
277-
// count. This is so we don't keep adding credits for work that was expired, but we never
278-
// received messages for.
279-
if (currentWork.isTerminal()) {
280-
REQUESTED.updateAndGet(this, currentRequest -> {
281-
final int remainingEvents = currentWork.getRemainingEvents();
282-
283-
// The work had probably emitted all its messages and then terminated.
284-
// The currentRequest is fine.
285-
if (remainingEvents < 1) {
286-
return currentRequest;
287-
}
288-
289-
final long difference = currentRequest - remainingEvents;
290-
291-
logger.atVerbose()
292-
.addKeyValue(NUMBER_OF_REQUESTED_MESSAGES_KEY, currentRequest)
293-
.addKeyValue("remainingEvents", remainingEvents)
294-
.addKeyValue("difference", difference)
295-
.log("Updating REQUESTED because current work item is terminal.");
296-
297-
return difference < 0 ? 0 : difference;
298-
});
299-
300-
currentWork = workQueue.poll();
301-
continue;
302-
}
280+
//The work in queue will not be terminal, here is double check
281+
while (currentWork != null && currentWork.isTerminal()) {
282+
logger.atVerbose()
283+
.addKeyValue(WORK_ID_KEY, currentWork.getId())
284+
.addKeyValue("numberOfEvents", currentWork.getNumberOfEvents())
285+
.log("This work from queue is terminal. Skip it.");
303286

287+
currentWork = workQueue.poll();
288+
}
289+
290+
if (currentWork != null) {
304291
final SynchronousReceiveWork work = currentWork;
305292
logger.atVerbose()
306293
.addKeyValue(WORK_ID_KEY, work.getId())
@@ -312,8 +299,6 @@ private SynchronousReceiveWork getOrUpdateCurrentWork() {
312299
// Now that we updated REQUESTED to account for credits already on the line, we're good to
313300
// place any credits for this new work item.
314301
requestUpstream(work.getNumberOfEvents());
315-
316-
return work;
317302
}
318303

319304
return currentWork;

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SynchronousMessageSubscriberTest.java

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717
import reactor.core.publisher.Mono;
1818

1919
import java.time.Duration;
20-
import java.util.HashSet;
2120
import java.util.List;
22-
import java.util.Set;
2321
import java.util.concurrent.atomic.AtomicBoolean;
2422
import java.util.concurrent.atomic.AtomicInteger;
2523

2624
import static org.junit.jupiter.api.Assertions.assertEquals;
2725
import static org.junit.jupiter.api.Assertions.assertFalse;
28-
import static org.junit.jupiter.api.Assertions.assertTrue;
2926
import static org.mockito.ArgumentMatchers.any;
3027
import static org.mockito.ArgumentMatchers.eq;
3128
import static org.mockito.ArgumentMatchers.isNull;
@@ -124,15 +121,11 @@ public void queueWorkTest() {
124121
}
125122

126123
/**
127-
* Verifies that this processes multiple work items.
124+
* Verifies that this processes multiple work items and current work encounter timeout
128125
*/
129126
@Test
130-
public void processesMultipleWorkItems() {
127+
public void processesMultipleWorkItemsAndCurrentWorkTimeout() {
131128
// Arrange
132-
final SynchronousReceiveWork work3 = mock(SynchronousReceiveWork.class);
133-
when(work3.getId()).thenReturn(3L);
134-
when(work3.getNumberOfEvents()).thenReturn(1);
135-
136129
final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class);
137130
final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class);
138131
final ServiceBusReceivedMessage message3 = mock(ServiceBusReceivedMessage.class);
@@ -154,9 +147,15 @@ public void processesMultipleWorkItems() {
154147
doAnswer(invocation -> isTerminal.get()).when(work1).isTerminal();
155148
doAnswer(invocation -> remaining.get()).when(work1).getRemainingEvents();
156149

150+
// WORK 2 is update to current work after the work1 is terminal and successfully emits message3
157151
when(work2.emitNext(message3)).thenReturn(true);
158152
when(work2.isTerminal()).thenReturn(false);
159153

154+
// WORK 3 is placed in queue
155+
final SynchronousReceiveWork work3 = mock(SynchronousReceiveWork.class);
156+
when(work3.getId()).thenReturn(3L);
157+
when(work3.getNumberOfEvents()).thenReturn(1);
158+
160159
syncSubscriber = new SynchronousMessageSubscriber(asyncClient, work1, false, operationTimeout);
161160
syncSubscriber.queueWork(work2);
162161
syncSubscriber.queueWork(work3);
@@ -188,15 +187,79 @@ public void processesMultipleWorkItems() {
188187
// - work2.getNumberOfEvents() - (REQUESTED - work1.getRemainingItems());
189188
verify(subscription, times(2)).request(subscriptionArgumentCaptor.capture());
190189
final List<Long> allRequests = subscriptionArgumentCaptor.getAllValues();
191-
final Set<Long> expected = new HashSet<>();
192-
expected.add((long) work1.getNumberOfEvents());
190+
assertEquals(NUMBER_OF_WORK_ITEMS, allRequests.get(0));
193191

194192
final long requestedAfterWork1 = NUMBER_OF_WORK_ITEMS - remaining.get();
195193
final long expectedDifference = work2.getNumberOfEvents() - requestedAfterWork1;
196-
expected.add(expectedDifference);
194+
assertEquals(expectedDifference, allRequests.get(1));
195+
}
196+
197+
/**
198+
* Verifies that this processes multiple work items and current work can emit all messages successfully
199+
*/
200+
@Test
201+
public void processesMultipleWorkItemsAndCurrentWorkEmitAllMessages() {
202+
// Arrange
203+
final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class);
204+
final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class);
205+
final ServiceBusReceivedMessage message3 = mock(ServiceBusReceivedMessage.class);
206+
final ServiceBusReceivedMessage message4 = mock(ServiceBusReceivedMessage.class);
207+
final ServiceBusReceivedMessage message5 = mock(ServiceBusReceivedMessage.class);
208+
209+
// WORK 1 successfully emits all messages and is terminal after emit message4.
210+
final AtomicBoolean isTerminal = new AtomicBoolean(false);
211+
final AtomicInteger remaining = new AtomicInteger(NUMBER_OF_WORK_ITEMS);
212+
doAnswer(invocation -> {
213+
ServiceBusReceivedMessage arg = invocation.getArgument(0);
214+
remaining.decrementAndGet();
215+
if (arg == message4) {
216+
isTerminal.set(true);
217+
}
218+
return true;
219+
}).when(work1).emitNext(any(ServiceBusReceivedMessage.class));
220+
doAnswer(invocation -> isTerminal.get()).when(work1).isTerminal();
221+
doAnswer(invocation -> remaining.get()).when(work1).getRemainingEvents();
222+
223+
// WORK 2 is updated to current work after work1 completed and successfully emit message5
224+
when(work2.isTerminal()).thenReturn(false);
225+
when(work2.emitNext(message5)).thenReturn(true);
197226

198-
assertEquals(expected.size(), allRequests.size());
199-
allRequests.forEach(r -> assertTrue(expected.contains(r)));
227+
// WORK 3 is placed in queue
228+
final SynchronousReceiveWork work3 = mock(SynchronousReceiveWork.class);
229+
when(work3.getId()).thenReturn(3L);
230+
when(work3.getNumberOfEvents()).thenReturn(1);
231+
232+
syncSubscriber = new SynchronousMessageSubscriber(asyncClient, work1, false, operationTimeout);
233+
syncSubscriber.queueWork(work2);
234+
syncSubscriber.queueWork(work3);
235+
236+
syncSubscriber.hookOnSubscribe(subscription);
237+
238+
assertEquals(2, syncSubscriber.getWorkQueueSize());
239+
240+
// Act
241+
syncSubscriber.hookOnNext(message1);
242+
syncSubscriber.hookOnNext(message2);
243+
syncSubscriber.hookOnNext(message3);
244+
syncSubscriber.hookOnNext(message4);
245+
246+
// Assert
247+
verify(work2).start();
248+
249+
// work2 emits message5
250+
syncSubscriber.hookOnNext(message5);
251+
252+
verify(work2).emitNext(message5);
253+
254+
assertEquals(1, syncSubscriber.getWorkQueueSize());
255+
256+
// Verify that we requested:
257+
// 1st time: hookOnSubscribe(work1.getNumberOfEvents())
258+
// 2nd time: requestUpstream(work2.getNumberOfEvents()) and REQUESTED = 0
259+
verify(subscription, times(2)).request(subscriptionArgumentCaptor.capture());
260+
final List<Long> allRequests = subscriptionArgumentCaptor.getAllValues();
261+
assertEquals(NUMBER_OF_WORK_ITEMS, allRequests.get(0));
262+
assertEquals(NUMBER_OF_WORK_ITEMS_2, allRequests.get(1));
200263
}
201264

202265
/**

0 commit comments

Comments
 (0)