Skip to content

Commit 233fbea

Browse files
authored
[ServiceBus] release message in background (Azure#23285)
* release message in background * decouple keep receiving and fix async * small fix * fix mypy and pylint * review feedback * fix pylint and update changelog * update test to reflect the latest change * add tests and replace uamqp keep alive * fix impl bug * add test non-releasing behavior * update release date
1 parent 5a89420 commit 233fbea

File tree

6 files changed

+441
-158
lines changed

6 files changed

+441
-158
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
# Release History
22

3-
## 7.6.1 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
8-
9-
### Bugs Fixed
3+
## 7.6.1 (2022-04-06)
104

115
### Other Changes
126

7+
- Improved receiving by releasing messages from internal buffer when the `prefetch_count` of `ServiceBusReceiver` is set 0 and there is no active receive call, this helps avoid receiving expired messages and incrementing delivery count of a message.
8+
139
## 7.6.0 (2022-02-10)
1410

1511
### Features Added

sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,11 @@ def _on_attach(self, source, target, properties, error):
176176
def _populate_message_properties(self, message):
177177
if self._session:
178178
message[MGMT_REQUEST_SESSION_ID] = self._session_id
179+
180+
def _enhanced_message_received(self, message):
181+
# pylint: disable=protected-access
182+
self._handler._was_message_received = True
183+
if self._receive_context.is_set():
184+
self._handler._received_messages.put(message)
185+
else:
186+
message.release()

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 90 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5+
# pylint:disable=too-many-lines
6+
import threading
57
import time
68
import logging
79
import functools
@@ -197,6 +199,7 @@ def __init__(
197199
self._session = (
198200
None if self._session_id is None else ServiceBusSession(self._session_id, self)
199201
)
202+
self._receive_context = threading.Event()
200203

201204
def __iter__(self):
202205
return self._iter_contextual_wrapper()
@@ -238,26 +241,34 @@ def _inner_next(self):
238241

239242
def __next__(self):
240243
# Normally this would wrap the yield of the iter, but for a direct next call we just trace imperitively.
241-
message = self._inner_next()
242-
links = get_receive_links(message)
243-
with receive_trace_context_manager(self, links=links):
244-
return message
244+
try:
245+
self._receive_context.set()
246+
message = self._inner_next()
247+
links = get_receive_links(message)
248+
with receive_trace_context_manager(self, links=links):
249+
return message
250+
finally:
251+
self._receive_context.clear()
245252

246253
next = __next__ # for python2.7
247254

248255
def _iter_next(self):
249-
self._open()
250-
if not self._message_iter:
251-
self._message_iter = self._handler.receive_messages_iter()
252-
uamqp_message = next(self._message_iter)
253-
message = self._build_message(uamqp_message)
254-
if (
255-
self._auto_lock_renewer
256-
and not self._session
257-
and self._receive_mode != ServiceBusReceiveMode.RECEIVE_AND_DELETE
258-
):
259-
self._auto_lock_renewer.register(self, message)
260-
return message
256+
try:
257+
self._receive_context.set()
258+
self._open()
259+
if not self._message_iter:
260+
self._message_iter = self._handler.receive_messages_iter()
261+
uamqp_message = next(self._message_iter)
262+
message = self._build_message(uamqp_message)
263+
if (
264+
self._auto_lock_renewer
265+
and not self._session
266+
and self._receive_mode != ServiceBusReceiveMode.RECEIVE_AND_DELETE
267+
):
268+
self._auto_lock_renewer.register(self, message)
269+
return message
270+
finally:
271+
self._receive_context.clear()
261272

262273
@classmethod
263274
def _from_connection_string(cls, conn_str, **kwargs):
@@ -338,9 +349,12 @@ def _create_handler(self, auth):
338349
else None,
339350
timeout=self._max_wait_time * 1000 if self._max_wait_time else 0,
340351
prefetch=self._prefetch_count,
341-
keep_alive_interval=self._config.keep_alive,
352+
# If prefetch is 1, then keep_alive coroutine serves as keep receiving for releasing messages
353+
keep_alive_interval=self._config.keep_alive if self._prefetch_count != 1 else 5,
342354
shutdown_after_timeout=False,
343355
)
356+
if self._prefetch_count == 1:
357+
self._handler._message_received = self._enhanced_message_received # pylint: disable=protected-access
344358

345359
def _open(self):
346360
# pylint: disable=protected-access
@@ -366,65 +380,68 @@ def _open(self):
366380
def _receive(self, max_message_count=None, timeout=None):
367381
# type: (Optional[int], Optional[float]) -> List[ServiceBusReceivedMessage]
368382
# pylint: disable=protected-access
369-
self._open()
370-
371-
amqp_receive_client = self._handler
372-
received_messages_queue = amqp_receive_client._received_messages
373-
max_message_count = max_message_count or self._prefetch_count
374-
timeout_ms = (
375-
1000 * (timeout or self._max_wait_time)
376-
if (timeout or self._max_wait_time)
377-
else 0
378-
)
379-
abs_timeout_ms = (
380-
amqp_receive_client._counter.get_current_ms() + timeout_ms
381-
if timeout_ms
382-
else 0
383-
)
384-
385-
batch = [] # type: List[Message]
386-
while not received_messages_queue.empty() and len(batch) < max_message_count:
387-
batch.append(received_messages_queue.get())
388-
received_messages_queue.task_done()
389-
if len(batch) >= max_message_count:
390-
return [self._build_message(message) for message in batch]
391-
392-
# Dynamically issue link credit if max_message_count > 1 when the prefetch_count is the default value 1
393-
if max_message_count and self._prefetch_count == 1 and max_message_count > 1:
394-
link_credit_needed = max_message_count - len(batch)
395-
amqp_receive_client.message_handler.reset_link_credit(link_credit_needed)
396-
397-
first_message_received = expired = False
398-
receiving = True
399-
while receiving and not expired and len(batch) < max_message_count:
400-
while receiving and received_messages_queue.qsize() < max_message_count:
401-
if (
402-
abs_timeout_ms
403-
and amqp_receive_client._counter.get_current_ms() > abs_timeout_ms
404-
):
405-
expired = True
406-
break
407-
before = received_messages_queue.qsize()
408-
receiving = amqp_receive_client.do_work()
409-
received = received_messages_queue.qsize() - before
410-
if (
411-
not first_message_received
412-
and received_messages_queue.qsize() > 0
413-
and received > 0
414-
):
415-
# first message(s) received, continue receiving for some time
416-
first_message_received = True
417-
abs_timeout_ms = (
418-
amqp_receive_client._counter.get_current_ms()
419-
+ self._further_pull_receive_timeout_ms
420-
)
421-
while (
422-
not received_messages_queue.empty() and len(batch) < max_message_count
423-
):
383+
try:
384+
self._receive_context.set()
385+
self._open()
386+
387+
amqp_receive_client = self._handler
388+
received_messages_queue = amqp_receive_client._received_messages
389+
max_message_count = max_message_count or self._prefetch_count
390+
timeout_ms = (
391+
1000 * (timeout or self._max_wait_time)
392+
if (timeout or self._max_wait_time)
393+
else 0
394+
)
395+
abs_timeout_ms = (
396+
amqp_receive_client._counter.get_current_ms() + timeout_ms
397+
if timeout_ms
398+
else 0
399+
)
400+
batch = [] # type: List[Message]
401+
while not received_messages_queue.empty() and len(batch) < max_message_count:
424402
batch.append(received_messages_queue.get())
425403
received_messages_queue.task_done()
404+
if len(batch) >= max_message_count:
405+
return [self._build_message(message) for message in batch]
406+
407+
# Dynamically issue link credit if max_message_count > 1 when the prefetch_count is the default value 1
408+
if max_message_count and self._prefetch_count == 1 and max_message_count > 1:
409+
link_credit_needed = max_message_count - len(batch)
410+
amqp_receive_client.message_handler.reset_link_credit(link_credit_needed)
411+
412+
first_message_received = expired = False
413+
receiving = True
414+
while receiving and not expired and len(batch) < max_message_count:
415+
while receiving and received_messages_queue.qsize() < max_message_count:
416+
if (
417+
abs_timeout_ms
418+
and amqp_receive_client._counter.get_current_ms() > abs_timeout_ms
419+
):
420+
expired = True
421+
break
422+
before = received_messages_queue.qsize()
423+
receiving = amqp_receive_client.do_work()
424+
received = received_messages_queue.qsize() - before
425+
if (
426+
not first_message_received
427+
and received_messages_queue.qsize() > 0
428+
and received > 0
429+
):
430+
# first message(s) received, continue receiving for some time
431+
first_message_received = True
432+
abs_timeout_ms = (
433+
amqp_receive_client._counter.get_current_ms()
434+
+ self._further_pull_receive_timeout_ms
435+
)
436+
while (
437+
not received_messages_queue.empty() and len(batch) < max_message_count
438+
):
439+
batch.append(received_messages_queue.get())
440+
received_messages_queue.task_done()
426441

427-
return [self._build_message(message) for message in batch]
442+
return [self._build_message(message) for message in batch]
443+
finally:
444+
self._receive_context.clear()
428445

429446
def _settle_message_with_retry(
430447
self,

0 commit comments

Comments
 (0)