Skip to content

Commit 7df6eba

Browse files
authored
[SB] Fix Peek Last Sequence ID Capture (Azure#39513)
* set last sequence number for next call * live test * fix test * fix increment * pylint fixes * make change in mgmt common * fix test * fix test with async cred * additional testing * fix sync tests
1 parent df1106c commit 7df6eba

File tree

5 files changed

+243
-5
lines changed

5 files changed

+243
-5
lines changed

sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ def peek_op( # pylint: disable=inconsistent-return-statements
5252
):
5353
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
5454
if status_code == 200:
55-
return amqp_transport.parse_received_message(
55+
parsed_messages = amqp_transport.parse_received_message(
5656
message, message_type=ServiceBusReceivedMessage, receiver=receiver, is_peeked_message=True
5757
)
58+
if parsed_messages:
59+
receiver._last_received_sequenced_number = parsed_messages[-1].sequence_number # pylint: disable=protected-access
60+
return parsed_messages
5861
if status_code in [202, 204]:
5962
return []
60-
6163
amqp_transport.handle_amqp_mgmt_error(
6264
_LOGGER, "Message peek failed.", condition, description, status_code
6365
)

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
_LOGGER = logging.getLogger(__name__)
7373

7474

75-
class ServiceBusReceiver(BaseHandler, ReceiverMixin):
75+
class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-many-instance-attributes
7676
"""The ServiceBusReceiver class defines a high level interface for
7777
receiving messages from the Azure Service Bus Queue or Topic Subscription.
7878
@@ -769,7 +769,11 @@ def peek_messages(
769769
if timeout is not None and timeout <= 0:
770770
raise ValueError("The timeout must be greater than 0.")
771771
if not sequence_number:
772-
sequence_number = self._last_received_sequenced_number or 1
772+
sequence_number = (
773+
self._last_received_sequenced_number + 1
774+
if self._last_received_sequenced_number
775+
else 1
776+
)
773777
if int(max_message_count) < 0:
774778
raise ValueError("max_message_count must be 1 or greater.")
775779

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,11 @@ async def peek_messages(
748748
if timeout is not None and timeout <= 0:
749749
raise ValueError("The timeout must be greater than 0.")
750750
if not sequence_number:
751-
sequence_number = self._last_received_sequenced_number or 1
751+
sequence_number = (
752+
self._last_received_sequenced_number + 1
753+
if self._last_received_sequenced_number
754+
else 1
755+
)
752756
if int(max_message_count) < 0:
753757
raise ValueError("max_message_count must be 1 or greater.")
754758

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3682,3 +3682,117 @@ async def test_queue_complete_message_on_different_receiver_async(
36823682
messages_in_queue = await receiver1.peek_messages()
36833683

36843684
assert len(messages_in_queue) == 0
3685+
3686+
@pytest.mark.asyncio
3687+
@pytest.mark.liveTest
3688+
@pytest.mark.live_test_only
3689+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
3690+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
3691+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
3692+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3693+
@ArgPasserAsync()
3694+
async def test_queue_async_by_queue_client_peek_auto_increment(
3695+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
3696+
):
3697+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
3698+
credential = get_credential(is_async=True)
3699+
async with ServiceBusClient(
3700+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
3701+
) as sb_client:
3702+
3703+
sender = sb_client.get_queue_sender(servicebus_queue.name)
3704+
async with sender:
3705+
messages = []
3706+
for i in range(3):
3707+
message = ServiceBusMessage("Handler message no. {}".format(i), application_properties={"index": i})
3708+
messages.append(message)
3709+
await sender.send_messages(messages)
3710+
3711+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3712+
async with receiver:
3713+
peek_message = await receiver.peek_messages()
3714+
assert peek_message[0].application_properties[b"index"] == 0
3715+
assert peek_message[0].sequence_number == 1
3716+
peek_message = await receiver.peek_messages()
3717+
assert peek_message[0].application_properties[b"index"] == 1
3718+
assert peek_message[0].sequence_number == 2
3719+
peek_message = await receiver.peek_messages()
3720+
assert peek_message[0].application_properties[b"index"] == 2
3721+
assert peek_message[0].sequence_number == 3
3722+
3723+
@pytest.mark.asyncio
3724+
@pytest.mark.liveTest
3725+
@pytest.mark.live_test_only
3726+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
3727+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
3728+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
3729+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3730+
@ArgPasserAsync()
3731+
async def test_queue_async_by_queue_client_peek_auto_increment_multiple(
3732+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
3733+
):
3734+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
3735+
credential = get_credential(is_async=True)
3736+
async with ServiceBusClient(
3737+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
3738+
) as sb_client:
3739+
3740+
sender = sb_client.get_queue_sender(servicebus_queue.name)
3741+
async with sender:
3742+
messages = []
3743+
for i in range(4):
3744+
message = ServiceBusMessage("Handler message no. {}".format(i), application_properties={"index": i})
3745+
messages.append(message)
3746+
await sender.send_messages(messages)
3747+
3748+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3749+
async with receiver:
3750+
peek_message = await receiver.peek_messages(max_message_count=2)
3751+
assert len(peek_message) == 2
3752+
assert peek_message[0].application_properties[b"index"] == 0
3753+
assert peek_message[0].sequence_number == 1
3754+
assert peek_message[1].application_properties[b"index"] == 1
3755+
assert peek_message[1].sequence_number == 2
3756+
peek_message = await receiver.peek_messages(max_message_count=2)
3757+
assert len(peek_message) == 2
3758+
assert peek_message[0].application_properties[b"index"] == 2
3759+
assert peek_message[0].sequence_number == 3
3760+
assert peek_message[1].application_properties[b"index"] == 3
3761+
assert peek_message[1].sequence_number == 4
3762+
3763+
@pytest.mark.asyncio
3764+
@pytest.mark.liveTest
3765+
@pytest.mark.live_test_only
3766+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
3767+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
3768+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
3769+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3770+
@ArgPasserAsync()
3771+
async def test_queue_async_by_queue_client_peek_and_receive(
3772+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
3773+
):
3774+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
3775+
credential = get_credential(is_async=True)
3776+
async with ServiceBusClient(
3777+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
3778+
) as sb_client:
3779+
3780+
sender = sb_client.get_queue_sender(servicebus_queue.name)
3781+
async with sender:
3782+
messages = []
3783+
for i in range(4):
3784+
message = ServiceBusMessage("Handler message no. {}".format(i), application_properties={"index": i})
3785+
messages.append(message)
3786+
await sender.send_messages(messages)
3787+
3788+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3789+
async with receiver:
3790+
peeked_messages = await receiver.peek_messages(max_message_count=2)
3791+
3792+
messages = await receiver.receive_messages(max_message_count=3)
3793+
last_received_sequnece_number = messages[-1].sequence_number
3794+
for message in messages:
3795+
await receiver.complete_message(message)
3796+
3797+
peeked_messages = await receiver.peek_messages(max_message_count=2)
3798+
assert peeked_messages[0].sequence_number == last_received_sequnece_number + 1

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3720,3 +3720,117 @@ def test_queue_complete_message_on_different_receiver(
37203720
messages_in_queue = receiver1.peek_messages()
37213721

37223722
assert len(messages_in_queue) == 0
3723+
3724+
@pytest.mark.liveTest
3725+
@pytest.mark.live_test_only
3726+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
3727+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
3728+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
3729+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3730+
@ArgPasser()
3731+
def test_queue_by_queue_client_peek_auto_increment(
3732+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
3733+
):
3734+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
3735+
credential = get_credential()
3736+
with ServiceBusClient(
3737+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
3738+
) as sb_client:
3739+
3740+
sender = sb_client.get_queue_sender(servicebus_queue.name)
3741+
with sender:
3742+
messages = []
3743+
for i in range(3):
3744+
message = ServiceBusMessage("Handler message no. {}".format(i), application_properties={"index": i})
3745+
messages.append(message)
3746+
sender.send_messages(messages)
3747+
3748+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3749+
with receiver:
3750+
peek_message = receiver.peek_messages()
3751+
assert peek_message[0].application_properties[b"index"] == 0
3752+
assert peek_message[0].sequence_number == 1
3753+
peek_message = receiver.peek_messages()
3754+
assert peek_message[0].application_properties[b"index"] == 1
3755+
assert peek_message[0].sequence_number == 2
3756+
peek_message = receiver.peek_messages()
3757+
assert peek_message[0].application_properties[b"index"] == 2
3758+
assert peek_message[0].sequence_number == 3
3759+
3760+
@pytest.mark.liveTest
3761+
@pytest.mark.live_test_only
3762+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
3763+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
3764+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
3765+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3766+
@ArgPasser()
3767+
def test_queue_by_queue_client_peek_auto_increment_multiple(
3768+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
3769+
):
3770+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
3771+
credential = get_credential()
3772+
with ServiceBusClient(
3773+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
3774+
) as sb_client:
3775+
3776+
sender = sb_client.get_queue_sender(servicebus_queue.name)
3777+
with sender:
3778+
messages = []
3779+
for i in range(4):
3780+
message = ServiceBusMessage("Handler message no. {}".format(i), application_properties={"index": i})
3781+
messages.append(message)
3782+
sender.send_messages(messages)
3783+
3784+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3785+
with receiver:
3786+
peek_message = receiver.peek_messages(max_message_count=2)
3787+
assert len(peek_message) == 2
3788+
assert peek_message[0].application_properties[b"index"] == 0
3789+
assert peek_message[0].sequence_number == 1
3790+
assert peek_message[1].application_properties[b"index"] == 1
3791+
assert peek_message[1].sequence_number == 2
3792+
peek_message = receiver.peek_messages(max_message_count=2)
3793+
assert len(peek_message) == 2
3794+
assert peek_message[0].application_properties[b"index"] == 2
3795+
assert peek_message[0].sequence_number == 3
3796+
assert peek_message[1].application_properties[b"index"] == 3
3797+
assert peek_message[1].sequence_number == 4
3798+
3799+
@pytest.mark.liveTest
3800+
@pytest.mark.live_test_only
3801+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
3802+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
3803+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
3804+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3805+
@ArgPasser()
3806+
def test_queue_by_queue_client_peek_and_receive(
3807+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
3808+
):
3809+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
3810+
credential = get_credential()
3811+
with ServiceBusClient(
3812+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
3813+
) as sb_client:
3814+
3815+
sender = sb_client.get_queue_sender(servicebus_queue.name)
3816+
with sender:
3817+
messages = []
3818+
for i in range(4):
3819+
message = ServiceBusMessage("Handler message no. {}".format(i), application_properties={"index": i})
3820+
messages.append(message)
3821+
sender.send_messages(messages)
3822+
3823+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3824+
with receiver:
3825+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
3826+
peeked_messages = receiver.peek_messages(max_message_count=2)
3827+
3828+
messages = receiver.receive_messages(max_message_count=3)
3829+
last_received_sequnece_number = messages[-1].sequence_number
3830+
for message in messages:
3831+
receiver.complete_message(message)
3832+
3833+
peeked_messages = receiver.peek_messages(max_message_count=2)
3834+
assert peeked_messages[0].sequence_number == last_received_sequnece_number + 1
3835+
3836+

0 commit comments

Comments
 (0)