Skip to content

Commit c4beba5

Browse files
[ServiceBus] Track2 - Dead Letter Queue Receiver Implementation and Dead Letter Fix (Azure#11535)
* dead letter fix * fix mypy and update dependency * draft implementation of dead letter queue receiver, re-enable dead letter test * helpify entity name generation in dead letter queue * tweak test * merge kieran commit * complete dlq feature * fix pylint * add dlq samples * fix pylint * update according to review comment * move DLQ path generation outside * tweak change log * fix pylint error * fix pylint * move idletimeout and prefetch out of config * fix typo Co-authored-by: Kieran Brantner-Magee <[email protected]>
1 parent 5e48415 commit c4beba5

26 files changed

+648
-183
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
## 7.0.0b3 (Unreleased)
44

5+
**New Features**
6+
7+
* Added methods `get_queue_deadletter_receiver` and `get_subscription_deadletter_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for the dead-letter sub-queue of the target entity.
8+
9+
**BugFixes**
10+
11+
* Updated uAMQP dependency to 1.2.8.
12+
* Fixed bug where reason and description were not being set when dead-lettering messages.
513

614
## 7.0.0b2 (2020-05-04)
715

@@ -23,7 +31,7 @@
2331

2432
**BugFixes**
2533

26-
* Fig bug where http_proxy and transport_type in ServiceBusClient are not propagated into Sender/Receiver creation properly.
34+
* Fixed bug where http_proxy and transport_type in ServiceBusClient are not propagated into Sender/Receiver creation properly.
2735
* Updated uAMQP dependency to 1.2.7.
2836
* Fixed bug in setting certificate of tlsio on MacOS. #7201
2937
* Fixed bug that caused segmentation fault in network tracing on MacOS when setting `logging_enable` to `True` in `ServiceBusClient`.

sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,3 @@ def __init__(self, **kwargs):
2323
self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int
2424
self.encoding = kwargs.get("encoding", "UTF-8")
2525
self.auto_reconnect = kwargs.get("auto_reconnect", True)
26-
self.idle_timeout = kwargs.get("idle_timeout", None)
27-
prefetch = kwargs.get("prefetch", 0)
28-
if int(prefetch) < 0 or int(prefetch) > 50000:
29-
raise ValueError("Prefetch must be an integer between 0 and 50000 inclusive.")
30-
self.prefetch = prefetch + 1

sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
MGMT_REQUEST_VIA_PARTITION_KEY = 'via-partition-key'
7272
MGMT_REQUEST_DEAD_LETTER_REASON = 'deadletter-reason'
7373
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION = 'deadletter-description'
74+
RECEIVER_LINK_DEAD_LETTER_REASON = 'DeadLetterReason'
75+
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION = 'DeadLetterErrorDescription'
7476
MGMT_REQUEST_OP_TYPE_ENTITY_MGMT = b"entity-mgmt"
7577

7678
MESSAGE_COMPLETE = 'complete'
@@ -97,6 +99,10 @@
9799
_X_OPT_SCHEDULED_ENQUEUE_TIME = b'x-opt-scheduled-enqueue-time'
98100

99101

102+
DEAD_LETTER_QUEUE_SUFFIX = '/$DeadLetterQueue'
103+
TRANSFER_DEAD_LETTER_QUEUE_SUFFIX = '/$Transfer' + DEAD_LETTER_QUEUE_SUFFIX
104+
105+
100106
class ReceiveSettleMode(Enum):
101107
PeekLock = constants.ReceiverSettleMode.PeekLock
102108
ReceiveAndDelete = constants.ReceiverSettleMode.ReceiveAndDelete

sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
MGMT_RESPONSE_MESSAGE_EXPIRATION,
3333
MGMT_REQUEST_DEAD_LETTER_REASON,
3434
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
35+
RECEIVER_LINK_DEAD_LETTER_REASON,
36+
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION,
3537
MESSAGE_COMPLETE,
3638
MESSAGE_DEAD_LETTER,
3739
MESSAGE_ABANDON,
@@ -539,8 +541,8 @@ def _check_live(self, action):
539541
except AttributeError:
540542
pass
541543

542-
def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
543-
# type: (str, Dict[str, Any]) -> Callable
544+
def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
545+
# type: (str, Optional[str], Optional[str]) -> Callable
544546
# pylint: disable=protected-access
545547
if settle_operation == MESSAGE_COMPLETE:
546548
return functools.partial(
@@ -559,7 +561,10 @@ def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
559561
self._receiver._settle_message,
560562
SETTLEMENT_DEADLETTER,
561563
[self.lock_token],
562-
dead_letter_details=dead_letter_details
564+
dead_letter_details={
565+
MGMT_REQUEST_DEAD_LETTER_REASON: dead_letter_reason or "",
566+
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: dead_letter_description or ""
567+
}
563568
)
564569
if settle_operation == MESSAGE_DEFER:
565570
return functools.partial(
@@ -569,34 +574,39 @@ def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
569574
)
570575
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))
571576

572-
def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None): # pylint: disable=unused-argument
573-
# type: (str, Dict[str, Any]) -> Callable
574-
# dead_letter_detail is not used because of uamqp receiver link doesn't accept it while it
575-
# should be accepted. Will revisit this later.
576-
# uamqp management link accepts dead_letter_details. Refer to method _settle_via_mgmt_link
577-
# TODO: to make dead_letter_details useful
577+
def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
578+
# type: (str, Optional[str], Optional[str]) -> Callable
578579
if settle_operation == MESSAGE_COMPLETE:
579580
return functools.partial(self.message.accept)
580581
if settle_operation == MESSAGE_ABANDON:
581582
return functools.partial(self.message.modify, True, False)
582583
if settle_operation == MESSAGE_DEAD_LETTER:
583-
# note: message.reject() can not set reason and description properly due to the issue
584-
# https://github.com/Azure/azure-uamqp-python/issues/155
585-
return functools.partial(self.message.reject, condition=DEADLETTERNAME)
584+
return functools.partial(
585+
self.message.reject,
586+
condition=DEADLETTERNAME,
587+
description=dead_letter_description,
588+
info={
589+
RECEIVER_LINK_DEAD_LETTER_REASON: dead_letter_reason,
590+
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION: dead_letter_description
591+
}
592+
)
586593
if settle_operation == MESSAGE_DEFER:
587594
return functools.partial(self.message.modify, True, True)
588595
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))
589596

590597
def _settle_message(
591598
self,
592599
settle_operation,
593-
dead_letter_details=None
600+
dead_letter_reason=None,
601+
dead_letter_description=None,
594602
):
595-
# type: (str, Dict[str, Any]) -> None
603+
# type: (str, Optional[str], Optional[str]) -> None
596604
try:
597605
if not self._is_deferred_message:
598606
try:
599-
self._settle_via_receiver_link(settle_operation, dead_letter_details)()
607+
self._settle_via_receiver_link(settle_operation,
608+
dead_letter_reason=dead_letter_reason,
609+
dead_letter_description=dead_letter_description)()
600610
return
601611
except RuntimeError as exception:
602612
_LOGGER.info(
@@ -605,7 +615,9 @@ def _settle_message(
605615
settle_operation,
606616
exception
607617
)
608-
self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
618+
self._settle_via_mgmt_link(settle_operation,
619+
dead_letter_reason=dead_letter_reason,
620+
dead_letter_description=dead_letter_description)()
609621
except Exception as e:
610622
raise MessageSettleFailed(settle_operation, e)
611623

@@ -644,12 +656,7 @@ def dead_letter(self, reason=None, description=None):
644656
"""
645657
# pylint: disable=protected-access
646658
self._check_live(MESSAGE_DEAD_LETTER)
647-
648-
details = {
649-
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
650-
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
651-
652-
self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details)
659+
self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_reason=reason, dead_letter_description=description)
653660
self._settled = True
654661

655662
def abandon(self):

sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323

2424
class ReceiverMixin(object): # pylint: disable=too-many-instance-attributes
25-
def _create_attribute(self, **kwargs):
25+
def _populate_attributes(self, **kwargs):
2626
if kwargs.get("subscription_name"):
2727
self._subscription_name = kwargs.get("subscription_name")
2828
self._is_subscription = True
@@ -38,6 +38,13 @@ def _create_attribute(self, **kwargs):
3838
)
3939
self._name = "SBReceiver-{}".format(uuid.uuid4())
4040
self._last_received_sequenced_number = None
41+
self._message_iter = None
42+
self._connection = kwargs.get("connection")
43+
prefetch = kwargs.get("prefetch", 0)
44+
if int(prefetch) < 0 or int(prefetch) > 50000:
45+
raise ValueError("Prefetch must be an integer between 0 and 50000 inclusive.")
46+
self._prefetch = prefetch + 1
47+
self._idle_timeout = kwargs.get("idle_timeout", None)
4148

4249
def _build_message(self, received, message_type=ReceivedMessage):
4350
message = message_type(message=received, mode=self._mode)
@@ -82,7 +89,7 @@ def _check_live(self):
8289
if self._session and self._session.expired:
8390
raise SessionLockExpired(inner_exception=self._session.auto_renew_error)
8491

85-
def _create_session_attributes(self, **kwargs):
92+
def _populate_session_attributes(self, **kwargs):
8693
self._session_id = kwargs.get("session_id") or NEXT_AVAILABLE
8794
self._error_policy = _ServiceBusErrorPolicy(
8895
max_retries=self._config.retry_total,

sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
from .constants import (
2424
JWT_TOKEN_SCOPE,
2525
TOKEN_TYPE_JWT,
26-
TOKEN_TYPE_SASTOKEN
26+
TOKEN_TYPE_SASTOKEN,
27+
DEAD_LETTER_QUEUE_SUFFIX,
28+
TRANSFER_DEAD_LETTER_QUEUE_SUFFIX
2729
)
2830

2931
_log = logging.getLogger(__name__)
@@ -141,6 +143,21 @@ def create_authentication(client):
141143
)
142144

143145

146+
def generate_dead_letter_entity_name(
147+
queue_name=None,
148+
topic_name=None,
149+
subscription_name=None,
150+
transfer_deadletter=False
151+
):
152+
entity_name = queue_name if queue_name else (topic_name + "/Subscriptions/" + subscription_name)
153+
entity_name = "{}{}".format(
154+
entity_name,
155+
TRANSFER_DEAD_LETTER_QUEUE_SUFFIX if transfer_deadletter else DEAD_LETTER_QUEUE_SUFFIX
156+
)
157+
158+
return entity_name
159+
160+
144161
class AutoLockRenew(object):
145162
"""Auto renew locks for messages and sessions using a background thread pool.
146163

0 commit comments

Comments
 (0)