Skip to content

Commit fa064d6

Browse files
[Storage] Added max_messages kwarg to receive_messages() (Azure#23654)
1 parent 0d22cc2 commit fa064d6

17 files changed

+4419
-12
lines changed

sdk/storage/azure-storage-queue/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
### Features Added
66
- Added support for `create_queue_if_not_exists()` for `QueueClient`
7+
- Added support for `max_messages` in `receive_messages()` to specify the maximum number of messages to receive from the queue.
8+
9+
### Other Changes
10+
- Updated documentation for `receive_messages()` to explain iterator behavior and life-cycle.
11+
- Added a sample to `queue_samples_message.py` (and async-equivalent) showcasing the use of `max_messages` in `receive_messages()`.
712

813
## 12.2.0 (2022-03-08)
914

sdk/storage/azure-storage-queue/azure/storage/queue/_models.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,10 @@ class MessagesPaged(PageIterator):
264264
:param callable command: Function to retrieve the next page of items.
265265
:param int results_per_page: The maximum number of messages to retrieve per
266266
call.
267+
:param int max_messages: The maximum number of messages to retrieve from
268+
the queue.
267269
"""
268-
def __init__(self, command, results_per_page=None, continuation_token=None):
270+
def __init__(self, command, results_per_page=None, continuation_token=None, max_messages=None):
269271
if continuation_token is not None:
270272
raise ValueError("This operation does not support continuation token")
271273

@@ -275,9 +277,16 @@ def __init__(self, command, results_per_page=None, continuation_token=None):
275277
)
276278
self._command = command
277279
self.results_per_page = results_per_page
280+
self._max_messages = max_messages
278281

279282
def _get_next_cb(self, continuation_token):
280283
try:
284+
if self._max_messages is not None:
285+
if self.results_per_page is None:
286+
self.results_per_page = 1
287+
if self._max_messages < 1:
288+
raise StopIteration("End of paging")
289+
self.results_per_page = min(self.results_per_page, self._max_messages)
281290
return self._command(number_of_messages=self.results_per_page)
282291
except HttpResponseError as error:
283292
process_storage_error(error)
@@ -286,6 +295,8 @@ def _extract_data_cb(self, messages): # pylint: disable=no-self-use
286295
# There is no concept of continuation token, so raising on my own condition
287296
if not messages:
288297
raise StopIteration("End of paging")
298+
if self._max_messages is not None:
299+
self._max_messages = self._max_messages - len(messages)
289300
return "TOKEN_IGNORED", [QueueMessage._from_generated(q) for q in messages] # pylint: disable=protected-access
290301

291302

sdk/storage/azure-storage-queue/azure/storage/queue/_queue_client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,9 @@ def receive_messages(self, **kwargs):
588588
content and a pop_receipt value, which is required to delete the message.
589589
The message is not automatically deleted from the queue, but after it has
590590
been retrieved, it is not visible to other clients for the time interval
591-
specified by the visibility_timeout parameter.
591+
specified by the visibility_timeout parameter. The iterator will continuously
592+
fetch messages until the queue is empty or max_messages is reached (if max_messages
593+
is set).
592594
593595
If the key-encryption-key or resolver field is set on the local service object, the messages will be
594596
decrypted before being returned.
@@ -618,6 +620,8 @@ def receive_messages(self, **kwargs):
618620
should be set to a value smaller than the time-to-live value.
619621
:keyword int timeout:
620622
The server timeout, expressed in seconds.
623+
:keyword int max_messages:
624+
An integer that specifies the maximum number of messages to retrieve from the queue.
621625
:return:
622626
Returns a message iterator of dict-like Message objects.
623627
:rtype: ~azure.core.paging.ItemPaged[~azure.storage.queue.QueueMessage]
@@ -634,6 +638,7 @@ def receive_messages(self, **kwargs):
634638
messages_per_page = kwargs.pop('messages_per_page', None)
635639
visibility_timeout = kwargs.pop('visibility_timeout', None)
636640
timeout = kwargs.pop('timeout', None)
641+
max_messages = kwargs.pop('max_messages', None)
637642
self._config.message_decode_policy.configure(
638643
require_encryption=self.require_encryption,
639644
key_encryption_key=self.key_encryption_key,
@@ -646,7 +651,11 @@ def receive_messages(self, **kwargs):
646651
cls=self._config.message_decode_policy,
647652
**kwargs
648653
)
649-
return ItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
654+
if max_messages is not None and messages_per_page is not None:
655+
if max_messages < messages_per_page:
656+
raise ValueError("max_messages must be greater or equal to messages_per_page")
657+
return ItemPaged(command, results_per_page=messages_per_page,
658+
page_iterator_class=MessagesPaged, max_messages=max_messages)
650659
except HttpResponseError as error:
651660
process_storage_error(error)
652661

sdk/storage/azure-storage-queue/azure/storage/queue/aio/_models.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ class MessagesPaged(AsyncPageIterator):
2121
:param callable command: Function to retrieve the next page of items.
2222
:param int results_per_page: The maximum number of messages to retrieve per
2323
call.
24+
:param int max_messages: The maximum number of messages to retrieve from
25+
the queue.
2426
"""
25-
def __init__(self, command, results_per_page=None, continuation_token=None):
27+
def __init__(self, command, results_per_page=None, continuation_token=None, max_messages=None):
2628
if continuation_token is not None:
2729
raise ValueError("This operation does not support continuation token")
2830

@@ -32,9 +34,16 @@ def __init__(self, command, results_per_page=None, continuation_token=None):
3234
)
3335
self._command = command
3436
self.results_per_page = results_per_page
37+
self._max_messages = max_messages
3538

3639
async def _get_next_cb(self, continuation_token):
3740
try:
41+
if self._max_messages is not None:
42+
if self.results_per_page is None:
43+
self.results_per_page = 1
44+
if self._max_messages < 1:
45+
raise StopAsyncIteration("End of paging")
46+
self.results_per_page = min(self.results_per_page, self._max_messages)
3847
return await self._command(number_of_messages=self.results_per_page)
3948
except HttpResponseError as error:
4049
process_storage_error(error)
@@ -43,6 +52,8 @@ async def _extract_data_cb(self, messages):
4352
# There is no concept of continuation token, so raising on my own condition
4453
if not messages:
4554
raise StopAsyncIteration("End of paging")
55+
if self._max_messages is not None:
56+
self._max_messages = self._max_messages - len(messages)
4657
return "TOKEN_IGNORED", [QueueMessage._from_generated(q) for q in messages] # pylint: disable=protected-access
4758

4859

sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,9 @@ def receive_messages(self, **kwargs):
490490
content and a pop_receipt value, which is required to delete the message.
491491
The message is not automatically deleted from the queue, but after it has
492492
been retrieved, it is not visible to other clients for the time interval
493-
specified by the visibility_timeout parameter.
493+
specified by the visibility_timeout parameter. The iterator will continuously
494+
fetch messages until the queue is empty or max_messages is reached (if max_messages
495+
is set).
494496
495497
If the key-encryption-key or resolver field is set on the local service object, the messages will be
496498
decrypted before being returned.
@@ -511,6 +513,8 @@ def receive_messages(self, **kwargs):
511513
should be set to a value smaller than the time-to-live value.
512514
:keyword int timeout:
513515
The server timeout, expressed in seconds.
516+
:keyword int max_messages:
517+
An integer that specifies the maximum number of messages to retrieve from the queue.
514518
:return:
515519
Returns a message iterator of dict-like Message objects.
516520
:rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.storage.queue.QueueMessage]
@@ -527,6 +531,7 @@ def receive_messages(self, **kwargs):
527531
messages_per_page = kwargs.pop('messages_per_page', None)
528532
visibility_timeout = kwargs.pop('visibility_timeout', None)
529533
timeout = kwargs.pop('timeout', None)
534+
max_messages = kwargs.pop('max_messages', None)
530535
self._config.message_decode_policy.configure(
531536
require_encryption=self.require_encryption,
532537
key_encryption_key=self.key_encryption_key,
@@ -540,7 +545,11 @@ def receive_messages(self, **kwargs):
540545
cls=self._config.message_decode_policy,
541546
**kwargs
542547
)
543-
return AsyncItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
548+
if max_messages is not None and messages_per_page is not None:
549+
if max_messages < messages_per_page:
550+
raise ValueError("max_messages must be greater or equal to messages_per_page")
551+
return AsyncItemPaged(command, results_per_page=messages_per_page,
552+
page_iterator_class=MessagesPaged, max_messages=max_messages)
544553
except HttpResponseError as error:
545554
process_storage_error(error)
546555

sdk/storage/azure-storage-queue/samples/queue_samples_message.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,43 @@ def update_message(self):
300300
# Delete the queue
301301
queue.delete_queue()
302302

303+
def receive_messages_with_max_messages(self):
304+
# Instantiate a queue client
305+
from azure.storage.queue import QueueClient
306+
queue = QueueClient.from_connection_string(self.connection_string, "myqueue9")
307+
308+
# Create the queue
309+
queue.create_queue()
310+
311+
try:
312+
queue.send_message(u"message1")
313+
queue.send_message(u"message2")
314+
queue.send_message(u"message3")
315+
queue.send_message(u"message4")
316+
queue.send_message(u"message5")
317+
queue.send_message(u"message6")
318+
queue.send_message(u"message7")
319+
queue.send_message(u"message8")
320+
queue.send_message(u"message9")
321+
queue.send_message(u"message10")
322+
323+
# Receive messages one-by-one
324+
messages = queue.receive_messages(max_messages=5)
325+
for msg in messages:
326+
print(msg.content)
327+
queue.delete_message(msg)
328+
329+
# Only prints 5 messages because 'max_messages'=5
330+
# >>message1
331+
# >>message2
332+
# >>message3
333+
# >>message4
334+
# >>message5
335+
336+
finally:
337+
# Delete the queue
338+
queue.delete_queue()
339+
303340

304341
if __name__ == '__main__':
305342
sample = QueueMessageSamples()
@@ -311,3 +348,4 @@ def update_message(self):
311348
sample.delete_and_clear_messages()
312349
sample.peek_messages()
313350
sample.update_message()
351+
sample.receive_messages_with_max_messages()

sdk/storage/azure-storage-queue/samples/queue_samples_message_async.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,44 @@ async def update_message_async(self):
279279
finally:
280280
# Delete the queue
281281
await queue.delete_queue()
282+
283+
async def receive_messages_with_max_messages(self):
284+
# Instantiate a queue client
285+
from azure.storage.queue.aio import QueueClient
286+
queue = QueueClient.from_connection_string(self.connection_string, "myqueue7")
287+
288+
# Create the queue
289+
async with queue:
290+
await queue.create_queue()
291+
292+
try:
293+
await queue.send_message(u"message1")
294+
await queue.send_message(u"message2")
295+
await queue.send_message(u"message3")
296+
await queue.send_message(u"message4")
297+
await queue.send_message(u"message5")
298+
await queue.send_message(u"message6")
299+
await queue.send_message(u"message7")
300+
await queue.send_message(u"message8")
301+
await queue.send_message(u"message9")
302+
await queue.send_message(u"message10")
303+
304+
# Receive messages one-by-one
305+
messages = queue.receive_messages(max_messages=5)
306+
async for msg in messages:
307+
print(msg.content)
308+
await queue.delete_message(msg)
309+
310+
# Only prints 5 messages because 'max_messages'=5
311+
# >>message1
312+
# >>message2
313+
# >>message3
314+
# >>message4
315+
# >>message5
316+
317+
finally:
318+
# Delete the queue
319+
await queue.delete_queue()
282320

283321

284322
async def main():
@@ -290,7 +328,8 @@ async def main():
290328
await sample.delete_and_clear_messages_async()
291329
await sample.peek_messages_async()
292330
await sample.update_message_async()
331+
await sample.receive_messages_with_max_messages()
293332

294333
if __name__ == '__main__':
295334
loop = asyncio.get_event_loop()
296-
loop.run_until_complete(main())
335+
loop.run_until_complete(main())

0 commit comments

Comments
 (0)