Skip to content

Commit 3b2dfa4

Browse files
authored
[EventHub] Raise error when primary key is rotated (#36245)
* sync side fix for rotate primary key issue in hub * sync uamqp transport * add to async * remove print/cleanup * mypy/pylint * changelog * fix logging formatting * no retry on all auth exceptions * remove import error in uamqp recv, b/c no external dependency * mypy
1 parent 43e0189 commit 3b2dfa4

File tree

9 files changed

+25
-2
lines changed

9 files changed

+25
-2
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Fixed a bug where the consumer waited indefinitely when the primary key was rotated while receiving, rather than raising an authentication error. ([#33926](https://github.com/Azure/azure-sdk-for-python/issues/33926))
12+
1113
### Other Changes
1214

1315
## 5.12.1 (2024-06-11)

sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ def receive(self, batch=False, max_batch_size=300, max_wait_time=None):
236236
# If optional dependency is not installed, do not retry.
237237
if isinstance(exception, ImportError):
238238
raise exception
239+
240+
# If authentication exception, do not retry.
241+
if isinstance(exception, self._amqp_transport.AUTHENTICATION_EXCEPTION):
242+
raise self._handle_exception(exception, is_consumer=True)
243+
239244
self._amqp_transport.check_link_stolen(self, exception)
240245
# TODO: below block hangs when retry_total > 0
241246
# need to remove/refactor, issue #27137

sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
ReceiveClient as uamqp_ReceiveClient,
3131
)
3232
from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
33+
from uamqp.errors import AuthenticationException as uamqp_AuthenticationException
3334

3435
except ImportError:
3536
pass
@@ -48,6 +49,7 @@
4849
from .._pyamqp.constants import (
4950
ConnectionState as pyamqp_ConnectionState
5051
)
52+
from .._pyamqp.error import AuthenticationException as pyamqp_AuthenticationException
5153

5254
class AmqpTransport(ABC): # pylint: disable=too-many-public-methods
5355
"""
@@ -76,6 +78,9 @@ class AmqpTransport(ABC): # pylint: disable=too-many-public-methods
7678
USER_AGENT_SYMBOL: Union[uamqp_Types_AMQPSymbol, str, bytes]
7779
PROP_PARTITION_KEY_AMQP_SYMBOL: Union[uamqp_Types_AMQPSymbol, str, bytes]
7880

81+
# exceptions
82+
AUTHENTICATION_EXCEPTION: Union["uamqp_AuthenticationException", "pyamqp_AuthenticationException"]
83+
7984
@staticmethod
8085
@abstractmethod
8186
def build_message(**kwargs: Any) -> Union["uamqp_Message", "pyamqp_Message"]:

sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ class PyamqpTransport(AmqpTransport): # pylint: disable=too-many-public-method
7272

7373
ERROR_CONDITIONS = [condition.value for condition in errors.ErrorCondition]
7474

75+
# define exceptions
76+
AUTHENTICATION_EXCEPTION = errors.AuthenticationException
77+
7578
@staticmethod
7679
def build_message(**kwargs):
7780
"""

sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ class UamqpTransport(AmqpTransport): # pylint: disable=too-many-public-method
122122
USER_AGENT_SYMBOL = types.AMQPSymbol("user-agent")
123123
PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY)
124124

125+
# define exceptions
126+
AUTHENTICATION_EXCEPTION = errors.AuthenticationException
127+
125128
@staticmethod
126129
def build_message(**kwargs):
127130
"""

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_connection_manager_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(
6262
**kwargs: Any
6363
) -> None:
6464
self._loop = kwargs.get("loop")
65-
self._lock = Lock(loop=self._loop)
65+
self._lock = Lock(loop=self._loop) # pylint: disable=unexpected-keyword-arg
6666
self._conn: Optional[Union[uamqp_ConnectionAsync, ConnectionAsync]] = None
6767

6868
self._container_id = container_id

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ class AmqpTransportAsync(ABC): # pylint: disable=too-many-public-methods
7979
USER_AGENT_SYMBOL: Union[uamqp_Types_AMQPSymbol, Literal["user-agent"]]
8080
PROP_PARTITION_KEY_AMQP_SYMBOL: Union[uamqp_Types_AMQPSymbol, Literal[b'x-opt-partition-key']]
8181

82-
8382
@staticmethod
8483
@abstractmethod
8584
def build_message(**kwargs: Any) -> Union["uamqp_Message", "pyamqp_Message"]:

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ async def _receive_task(consumer):
269269
# If optional dependency is not installed, do not retry.
270270
if isinstance(exception, ImportError):
271271
raise exception
272+
# If authentication exception, do not retry.
273+
if isinstance(exception, errors.AuthenticationException):
274+
raise await consumer._handle_exception(exception)
272275
if (
273276
isinstance(exception, errors.AMQPLinkError)
274277
and exception.condition == errors.ErrorCondition.LinkStolen # pylint: disable=no-member

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,9 @@ async def receive_messages_async(
283283
except asyncio.CancelledError: # pylint: disable=try-except-raise
284284
raise
285285
except Exception as exception: # pylint: disable=broad-except
286+
# If authentication exception, do not retry.
287+
if isinstance(exception, errors.AuthenticationException):
288+
raise await consumer._handle_exception(exception)
286289
if (
287290
isinstance(exception, errors.LinkDetach)
288291
and exception.condition == constants.ErrorCodes.LinkStolen # pylint: disable=no-member

0 commit comments

Comments
 (0)