Skip to content

Commit 9847985

Browse files
committed
[ServiceBus] Delete batch messages (#33944)
* batch delete updates * edits * edits * updates * remove uneeded timestamp encode * add async side * update default * update msg * fix async side * uneeded code * return an int * timestamp amqp encoding * adding tests * more tests * remove logging from example * sample cleanup * updating tests * will support both modes * add peek/recieve to sample * uamqp conflict remove * add 10 hr buffer to time * add session samples/remove receivedeletemode * remove commented code * update params * complete msgs * pylint * uneeded if * make a batch/single send sample * rename to delete_messages() * add limit of 4000 on message count * fix async * docstring * pylint * add maximum tests * remove timeout kwarg * topic samples * datetime + 1 day * rename enqueued_time * spacing * comment * make it 2 optional params * if not none * kwarg * do best attempt * todo for next func * space * add purge_msgs * a * remove unused param * uncomment * update docs * add timeout kwarg * update samples * delete session samples unneeded * pylint * update tests * rename to purge * add purge * typo * update tests * remove tracing on num * missing async * topic preparer * await * typo * typo * unused * add tracing * update naming * add to changelog
1 parent 2ef297b commit 9847985

File tree

13 files changed

+738
-2
lines changed

13 files changed

+738
-2
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
### Features Added
66

7+
- `ServiceBusReceiver` now supports the ability to delete all messages from an entity using the `purge_messages` method. Callers may optionally request to limit the target messages to those earlier than a given date.
8+
9+
- `ServiceBusReceiver` now supports the ability to delete messages from an entity in batches using the `delete_messages` method. The messages selected for deletion will be the oldest in the entity, based on the enqueued date and callers may optionally request to limit them to only those earlier than a given date.
10+
711
### Breaking Changes
812

913
### Bugs Fixed

sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
VENDOR + b":cancel-scheduled-message"
4848
)
4949
REQUEST_RESPONSE_PEEK_OPERATION = VENDOR + b":peek-message"
50+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION = VENDOR + b":batch-delete-messages"
5051
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION = VENDOR + b":update-disposition"
5152
REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION = VENDOR + b":get-session-state"
5253
REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION = VENDOR + b":set-session-state"
@@ -75,6 +76,7 @@
7576
MGMT_REQUEST_SEQUENCE_NUMBERS = "sequence-numbers"
7677
MGMT_REQUEST_RECEIVER_SETTLE_MODE = "receiver-settle-mode"
7778
MGMT_REQUEST_FROM_SEQUENCE_NUMBER = "from-sequence-number"
79+
MGMT_REQUEST_ENQUEUED_TIME_UTC = "enqueued-time-utc"
7880
MGMT_REQUEST_MAX_MESSAGE_COUNT = "message-count"
7981
MGMT_REQUEST_MESSAGE = "message"
8082
MGMT_REQUEST_MESSAGES = "messages"

sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ def peek_op( # pylint: disable=inconsistent-return-statements
7575
_LOGGER, "Message peek failed.", condition, description, status_code
7676
)
7777

78+
def batch_delete_op( # pylint: disable=inconsistent-return-statements, unused-argument
79+
status_code, message, description, receiver, amqp_transport
80+
):
81+
condition = message.application_properties.get(
82+
MGMT_RESPONSE_MESSAGE_ERROR_CONDITION
83+
)
84+
85+
if status_code == 200:
86+
return message, message.value[b"message-count"]
87+
if status_code in [202, 204]:
88+
return message, 0
89+
90+
amqp_transport.handle_amqp_mgmt_error( # pylint: disable=protected-access
91+
_LOGGER, "Batch Delete failed.", condition, description, status_code
92+
)
93+
7894

7995
def list_sessions_op( # pylint: disable=inconsistent-return-statements
8096
status_code, message, description, amqp_transport

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
MGMT_REQUEST_DEAD_LETTER_REASON,
4848
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION,
4949
MGMT_RESPONSE_MESSAGE_EXPIRATION,
50+
MGMT_REQUEST_ENQUEUED_TIME_UTC,
51+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION
5052
)
5153
from ._common import mgmt_handlers
5254
from ._common.receiver_mixins import ReceiverMixin
@@ -854,6 +856,96 @@ def peek_messages(
854856
with receive_trace_context_manager(self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time):
855857
return messages
856858

859+
def delete_messages(
860+
self,
861+
*,
862+
max_message_count: Optional[int] = None,
863+
before_enqueued_time_utc: Optional[datetime.datetime] = None,
864+
timeout: Optional[float] = None,
865+
) -> int:
866+
"""
867+
This operation deletes messages in the queue that are older than the specified enqueued time.
868+
869+
:keyword int or None max_message_count: The maximum number of messages to delete. The default value is None,
870+
meaning it will attempt to delete up to 4,000 messages.
871+
:keyword datetime.datetime or None before_enqueued_time_utc: The UTC datetime value before which all messages
872+
should be deleted. The default value is None, meaning all messages in the queue will be considered.
873+
:keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
874+
The value must be greater than 0 if specified. The default value is None, meaning no timeout.
875+
:return: The number of messages deleted.
876+
:rtype: int
877+
878+
"""
879+
self._check_live()
880+
if timeout is not None and timeout <= 0:
881+
raise ValueError("The timeout must be greater than 0.")
882+
self._open()
883+
884+
message_count = max_message_count if max_message_count else 4000
885+
886+
message_send = {
887+
MGMT_REQUEST_ENQUEUED_TIME_UTC: before_enqueued_time_utc if before_enqueued_time_utc
888+
else datetime.datetime.now(datetime.timezone.utc),
889+
MGMT_REQUEST_MAX_MESSAGE_COUNT: message_count,
890+
}
891+
start_time = time.time_ns()
892+
893+
self._populate_message_properties(message_send)
894+
handler = functools.partial(mgmt_handlers.batch_delete_op, receiver=self, amqp_transport=self._amqp_transport)
895+
message, deleted = self._mgmt_request_response_with_retry(
896+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION, message_send, handler, timeout=timeout
897+
)
898+
links = get_receive_links(message)
899+
with receive_trace_context_manager(self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time):
900+
return deleted
901+
902+
def purge_messages(
903+
self,
904+
*,
905+
before_enqueued_time_utc: Optional[datetime.datetime] = None,
906+
timeout: Optional[float] = None,
907+
) -> int:
908+
"""
909+
This operation purges as many messages as possible in the queue that are older than the specified enqueued time.
910+
911+
:keyword datetime.datetime or None before_enqueued_time_utc: The UTC datetime value before which all messages
912+
should be deleted. The default value is None, meaning all messages from the current time and before
913+
in the queue will be considered.
914+
:keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
915+
The value must be greater than 0 if specified. The default value is None, meaning no timeout.
916+
:return: The number of messages deleted.
917+
:rtype: int
918+
919+
"""
920+
self._check_live()
921+
if timeout is not None and timeout <= 0:
922+
raise ValueError("The timeout must be greater than 0.")
923+
self._open()
924+
925+
message_send = {
926+
MGMT_REQUEST_ENQUEUED_TIME_UTC: before_enqueued_time_utc if before_enqueued_time_utc
927+
else datetime.datetime.now(datetime.timezone.utc),
928+
MGMT_REQUEST_MAX_MESSAGE_COUNT: 4000,
929+
}
930+
start_time = time.time_ns()
931+
932+
self._populate_message_properties(message_send)
933+
handler = functools.partial(mgmt_handlers.batch_delete_op, receiver=self, amqp_transport=self._amqp_transport)
934+
935+
batch_count = 0
936+
deleted = None
937+
messages = []
938+
while deleted != 0:
939+
message_received, deleted = self._mgmt_request_response_with_retry(
940+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION, message_send, handler, timeout=timeout
941+
)
942+
messages.append(message_received)
943+
batch_count += deleted
944+
945+
links = get_receive_links(messages)
946+
with receive_trace_context_manager(self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time):
947+
return batch_count
948+
857949
def complete_message(self, message: ServiceBusReceivedMessage) -> None:
858950
"""Complete the message.
859951

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
MGMT_REQUEST_DEAD_LETTER_REASON,
5050
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION,
5151
MGMT_RESPONSE_MESSAGE_EXPIRATION,
52+
MGMT_REQUEST_ENQUEUED_TIME_UTC,
53+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION
5254
)
5355
from .._common import mgmt_handlers
5456
from .._common.utils import utc_from_timestamp
@@ -827,6 +829,101 @@ async def peek_messages(
827829
):
828830
return messages
829831

832+
async def delete_messages(
833+
self,
834+
*,
835+
max_message_count: Optional[int] = None,
836+
before_enqueued_time_utc: Optional[datetime.datetime] = None,
837+
timeout: Optional[float] = None,
838+
) -> int:
839+
"""
840+
This operation deletes messages in the queue that are older than the specified enqueued time.
841+
842+
:keyword int or None max_message_count: The maximum number of messages to delete. The default value is None,
843+
meaning it will attempt to delete up to 4,000 messages.
844+
:keyword datetime.datetime or None before_enqueued_time_utc: The UTC datetime value before which all messages
845+
should be deleted. The default value is None, meaning all messages in the queue will be considered.
846+
:keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
847+
The value must be greater than 0 if specified. The default value is None, meaning no timeout.
848+
:return: The number of deleted messages.
849+
:rtype: int
850+
851+
"""
852+
self._check_live()
853+
if timeout is not None and timeout <= 0:
854+
raise ValueError("The timeout must be greater than 0.")
855+
await self._open()
856+
857+
message_count = max_message_count if max_message_count else 4000
858+
859+
message_send = {
860+
MGMT_REQUEST_ENQUEUED_TIME_UTC: before_enqueued_time_utc if before_enqueued_time_utc
861+
else datetime.datetime.now(datetime.timezone.utc),
862+
MGMT_REQUEST_MAX_MESSAGE_COUNT: message_count,
863+
}
864+
start_time = time.time_ns()
865+
866+
self._populate_message_properties(message_send)
867+
handler = functools.partial(mgmt_handlers.batch_delete_op, receiver=self, amqp_transport=self._amqp_transport)
868+
message, deleted = await self._mgmt_request_response_with_retry(
869+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION, message_send, handler, timeout=timeout
870+
)
871+
872+
links = get_receive_links(message)
873+
with receive_trace_context_manager(
874+
self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time
875+
):
876+
return deleted
877+
878+
async def purge_messages(
879+
self,
880+
*,
881+
before_enqueued_time_utc: Optional[datetime.datetime] = None,
882+
timeout: Optional[float] = None,
883+
) -> int:
884+
"""
885+
This operation purges as many messages as possible in the queue that are older than the specified enqueued time.
886+
887+
:keyword datetime.datetime or None before_enqueued_time_utc: The UTC datetime value before which all messages
888+
should be deleted. The default value is None, meaning all messages from the current time and before
889+
in the queue will be considered.
890+
:keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
891+
The value must be greater than 0 if specified. The default value is None, meaning no timeout.
892+
:return: The number of deleted messages.
893+
:rtype: int
894+
895+
"""
896+
self._check_live()
897+
if timeout is not None and timeout <= 0:
898+
raise ValueError("The timeout must be greater than 0.")
899+
await self._open()
900+
901+
message_send = {
902+
MGMT_REQUEST_ENQUEUED_TIME_UTC: before_enqueued_time_utc if before_enqueued_time_utc
903+
else datetime.datetime.now(datetime.timezone.utc),
904+
MGMT_REQUEST_MAX_MESSAGE_COUNT: 4000,
905+
}
906+
start_time = time.time_ns()
907+
908+
self._populate_message_properties(message_send)
909+
handler = functools.partial(mgmt_handlers.batch_delete_op, receiver=self, amqp_transport=self._amqp_transport)
910+
911+
batch_count = 0
912+
deleted = None
913+
messages = []
914+
while deleted != 0:
915+
message_received, deleted = await self._mgmt_request_response_with_retry(
916+
REQUEST_RESPONSE_DELETE_BATCH_OPERATION, message_send, handler, timeout=timeout
917+
)
918+
messages.append(message_received)
919+
batch_count += deleted
920+
921+
links = get_receive_links(messages)
922+
with receive_trace_context_manager(
923+
self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time
924+
):
925+
return batch_count
926+
830927
async def complete_message(self, message: ServiceBusReceivedMessage) -> None:
831928
"""Complete the message.
832929
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/usr/bin/env python
2+
3+
# --------------------------------------------------------------------------------------------
4+
# Copyright (c) Microsoft Corporation. All rights reserved.
5+
# Licensed under the MIT License. See License.txt in the project root for license information.
6+
# --------------------------------------------------------------------------------------------
7+
8+
"""
9+
Example to show deleting message(s) from a Service Bus Queue.
10+
"""
11+
12+
import os
13+
import asyncio
14+
from datetime import datetime, timezone, timedelta
15+
from azure.servicebus.aio import ServiceBusClient
16+
from azure.servicebus import ServiceBusMessage
17+
18+
19+
CONNECTION_STR = os.environ['SERVICEBUS_CONNECTION_STR']
20+
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]
21+
22+
async def send_single_message(sender, i):
23+
message = ServiceBusMessage(f"Single Message {i}")
24+
await sender.send_messages(message)
25+
26+
async def run():
27+
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
28+
async with servicebus_client:
29+
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
30+
async with sender:
31+
await send_single_message(sender, 1)
32+
await send_single_message(sender, 2)
33+
await send_single_message(sender, 3)
34+
print(f"All messages sent before {datetime.now(timezone.utc)}")
35+
36+
37+
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
38+
async with receiver:
39+
40+
# Deleting Messages
41+
new_time = datetime.now(timezone.utc) + timedelta(hours=10)
42+
43+
print(f"Deleting messages that are older than {new_time}")
44+
deleted_msgs = await receiver.delete_messages(
45+
max_message_count=10,
46+
before_enqueued_time_utc=new_time
47+
)
48+
print(f"{deleted_msgs} messages deleted.")
49+
50+
# Try to peek after deleting to see what is left
51+
peeked_msgs = await receiver.peek_messages(max_message_count=10)
52+
for msg in peeked_msgs:
53+
print(f"Message peeked has enqueued time of {msg.enqueued_time_utc}")
54+
55+
# Clear out the queue if any messages didn't delete
56+
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
57+
for msg in received_msgs:
58+
print(f"{msg} received.")
59+
await receiver.complete_message(msg)
60+
61+
print("Receive is done.")
62+
63+
asyncio.run(run())

0 commit comments

Comments
 (0)