Skip to content

Commit ed95889

Browse files
authored
Fixing ordering problem when sending messages in a batch (Azure#27366)
* batch ordering fix
1 parent f106f3b commit ed95889

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
### Bugs Fixed
1010
- Removed the incorrect use of lock primitives from `ServiceBusMessageBatch.tryAddMessage()` implementation and documented that this API is not thread-safe. ([#25910](https://github.com/Azure/azure-sdk-for-java/issues/25910))
11+
- Fixed incorrect ordering of message when sent as a batch. ([#25112](https://github.com/Azure/azure-sdk-for-java/issues/25112), [#25599](https://github.com/Azure/azure-sdk-for-java/issues/25599))
1112

1213
### Other Changes
1314

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBus
657657
}
658658

659659
return createMessageBatch().flatMap(messageBatch -> {
660-
StreamSupport.stream(messages.spliterator(), true)
660+
StreamSupport.stream(messages.spliterator(), false)
661661
.forEach(message -> messageBatch.tryAddMessage(message));
662662
return sendInternal(messageBatch, transaction);
663663
});
@@ -723,7 +723,7 @@ private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact
723723

724724
AtomicReference<Context> sharedContext = new AtomicReference<>(Context.NONE);
725725
final List<org.apache.qpid.proton.message.Message> messages = Collections.synchronizedList(new ArrayList<>());
726-
batch.getMessages().parallelStream().forEach(serviceBusMessage -> {
726+
batch.getMessages().forEach(serviceBusMessage -> {
727727
if (isTracingEnabled) {
728728
parentContext.set(serviceBusMessage.getContext());
729729
if (sharedContext.get().equals(Context.NONE)) {

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,12 @@
5656
import java.util.ArrayList;
5757
import java.util.Arrays;
5858
import java.util.Collections;
59+
import java.util.Iterator;
5960
import java.util.List;
6061
import java.util.UUID;
6162
import java.util.concurrent.atomic.AtomicInteger;
63+
import java.util.regex.Matcher;
64+
import java.util.regex.Pattern;
6265
import java.util.stream.IntStream;
6366

6467
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
@@ -487,7 +490,6 @@ void sendMessagesListWithTransaction() {
487490
void sendMessagesList() {
488491
// Arrange
489492
final int count = 4;
490-
final byte[] contents = TEST_CONTENTS.toBytes();
491493
final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString());
492494

493495
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
@@ -701,6 +703,48 @@ void cancelScheduleMessages() {
701703
Assertions.assertEquals(sequenceNumbers.size(), actualTotal.get());
702704
}
703705

706+
/**
707+
* Verifies that sending multiple message will result in calling sender.send(Message...).
708+
*/
709+
@Test
710+
void verifyMessageOrdering() {
711+
// Arrange
712+
final ServiceBusMessage firstMessage = new ServiceBusMessage("First message " + UUID.randomUUID());
713+
final ServiceBusMessage secondMessage = new ServiceBusMessage("Second message " + UUID.randomUUID());
714+
final ServiceBusMessage thirdMessage = new ServiceBusMessage("Third message " + UUID.randomUUID());
715+
final ServiceBusMessage fourthMessage = new ServiceBusMessage("Fourth message " + UUID.randomUUID());
716+
final ServiceBusMessage fifthMessage = new ServiceBusMessage("Fifth message " + UUID.randomUUID());
717+
final List<ServiceBusMessage> messages = new ArrayList<>();
718+
messages.add(firstMessage);
719+
messages.add(secondMessage);
720+
messages.add(thirdMessage);
721+
messages.add(fourthMessage);
722+
messages.add(fifthMessage);
723+
724+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
725+
.thenReturn(Mono.just(sendLink));
726+
when(sendLink.send(anyList())).thenReturn(Mono.empty());
727+
728+
// Act
729+
StepVerifier.create(sender.sendMessages(messages))
730+
.verifyComplete();
731+
732+
// Assert
733+
verify(sendLink).send(messagesCaptor.capture());
734+
735+
final List<Message> messagesSent = messagesCaptor.getValue();
736+
Assertions.assertEquals(messages.size(), messagesSent.size());
737+
738+
Iterator<ServiceBusMessage> iterator = messages.iterator();
739+
Pattern regex = Pattern.compile("\\{(.*)\\}");
740+
for (Message message : messagesSent) {
741+
Matcher matcher = regex.matcher(message.getBody().toString());
742+
String content = matcher.find() ? matcher.group(1) : "";
743+
Assertions.assertEquals(content, iterator.next().getBody().toString());
744+
}
745+
messagesSent.forEach(message -> Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType()));
746+
}
747+
704748
/**
705749
* Verifies that the onClientClose is called.
706750
*/

0 commit comments

Comments
 (0)