Skip to content

Commit 81b6a38

Browse files
l0lawrencekashifkhanswathipil
authored
[ServiceBus] set timeout on link properties (#30832)
* attempt to set timeout on link properties * jitter math * update receiver logic * update encode * move logic to pyamqp_transport * update docstring * add tests * update if statement * update if statements * whitespace * trailing whitespace * move logic into pyamqp_transport * pylint * update kwargs * Update sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py Co-authored-by: Kashif Khan <[email protected]> * update async docstring too * update docstrings * add back in jitter logic to align with other lang * update client wording * update other receiver docstring * remove spacing * pylint * add comment * update client * fix pylint * update max_wait_time docstring * align iterator and receive_messages() behavior * docstring updates * asyn align iterator and receive_messages() * add deprecation * pylint fixes * add timeout kwarg * update docstring * client docstring * add to OperationTimeoutError logic * remove timeout from mock * nit * nit * revert doc * update error msg * nit * base handler async handling * doc * pylint * pylint * conn str doc update * remove ==True Co-authored-by: swathipil <[email protected]> * add >65 to test * add warning * move note to docstring --------- Co-authored-by: Kashif Khan <[email protected]> Co-authored-by: swathipil <[email protected]>
1 parent e588b24 commit 81b6a38

File tree

11 files changed

+218
-29
lines changed

11 files changed

+218
-29
lines changed

sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,14 @@ def _do_retryable_operation(
425425
self._container_id,
426426
last_exception,
427427
)
428+
if isinstance(last_exception, OperationTimeoutError):
429+
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
430+
"use max_wait_time on the ServiceBusReceiver to control the"\
431+
" timeout."
432+
error = OperationTimeoutError(
433+
message=description,
434+
)
435+
raise error from last_exception
428436
raise last_exception from None
429437
self._backoff(
430438
retried_times=retried_times,
@@ -461,6 +469,15 @@ def _backoff(
461469
entity_name,
462470
last_exception,
463471
)
472+
if isinstance(last_exception, OperationTimeoutError):
473+
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
474+
"use max_wait_time on the ServiceBusReceiver to control the"\
475+
" timeout."
476+
error = OperationTimeoutError(
477+
message=description,
478+
)
479+
480+
raise error from last_exception
464481
raise last_exception
465482

466483
def _mgmt_request_response(

sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
CONSUMER_IDENTIFIER = VENDOR + b":receiver-name"
6262
UAMQP_LIBRARY = "uamqp"
6363
PYAMQP_LIBRARY = "pyamqp"
64+
OPERATION_TIMEOUT = VENDOR + b":timeout"
6465

6566
MANAGEMENT_PATH_SUFFIX = "/$management"
6667

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,12 @@ def get_queue_receiver(
356356
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
357357
the client fails to process the message. The default receive_mode is PEEK_LOCK.
358358
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
359-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
360-
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
361-
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
362-
the `socket_timeout` optional parameter for more details.
359+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
360+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
361+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
362+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
363+
If connection errors are occurring due to write timing out,the connection timeout
364+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
363365
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
364366
can be provided such that messages are automatically registered on receipt. If the receiver is a session
365367
receiver, it will apply to the session instead.
@@ -546,10 +548,12 @@ def get_subscription_receiver(
546548
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
547549
the client fails to process the message. The default receive_mode is PEEK_LOCK.
548550
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
549-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
550-
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
551-
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
552-
the `socket_timeout` optional parameter for more details.
551+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
552+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
553+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
554+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
555+
If connection errors are occurring due to write timing out,the connection timeout
556+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
553557
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
554558
can be provided such that messages are automatically registered on receipt. If the receiver is a session
555559
receiver, it will apply to the session instead.

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,12 @@ class ServiceBusReceiver(
105105
the client connects to.
106106
:keyword str subscription_name: The path of specific Service Bus Subscription under the
107107
specified Topic the client connects to.
108-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
109-
receiver will automatically stop receiving. The default value is None, meaning no timeout.
108+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
109+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
110+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
111+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
112+
to a session. If connection errors are occurring due to write timing out,the connection timeout
113+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
110114
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
111115
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
112116
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
@@ -296,8 +300,12 @@ def _from_connection_string(
296300
if the client fails to process the message.
297301
The default mode is PEEK_LOCK.
298302
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
299-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
300-
receiver will automatically stop receiving. The default value is None, meaning no timeout.
303+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
304+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
305+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
306+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
307+
to a session. If connection errors are occurring due to write timing out,the connection timeout
308+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
301309
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
302310
:keyword transport_type: The type of transport protocol that will be used for communicating with
303311
the Service Bus service. Default is `TransportType.Amqp`.
@@ -668,8 +676,10 @@ def receive_messages(
668676
Setting to None will fully depend on the prefetch config. The default value is 1.
669677
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
670678
If no messages arrive, and no timeout is specified, this call will not return
671-
until the connection is closed. If specified, an no messages arrive within the
672-
timeout period, an empty list will be returned.
679+
until the connection is closed. If specified, and no messages arrive within the
680+
timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
681+
when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
682+
Please use max_wait_time on the constructor to set the timeout for connecting to a session.
673683
:return: A list of messages received. If no messages are available, this will be an empty list.
674684
:rtype: List[~azure.servicebus.ServiceBusReceivedMessage]
675685

sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
# pylint: disable=too-many-lines
66
import functools
77
import time
8+
import math
9+
import random
810
import datetime
911
from datetime import timezone
1012
from typing import Optional, Tuple, cast, List, TYPE_CHECKING, Any, Callable, Dict, Union, Iterator, Type
@@ -66,6 +68,8 @@
6668
ERROR_CODE_ENTITY_ALREADY_EXISTS,
6769
ERROR_CODE_PRECONDITION_FAILED,
6870
ServiceBusReceiveMode,
71+
OPERATION_TIMEOUT,
72+
NEXT_AVAILABLE_SESSION,
6973
)
7074

7175
from ..exceptions import (
@@ -576,6 +580,28 @@ def create_receive_client(
576580
config = receiver._config # pylint: disable=protected-access
577581
source = kwargs.pop("source")
578582
receive_mode = kwargs.pop("receive_mode")
583+
link_properties = kwargs.pop("link_properties")
584+
585+
# When NEXT_AVAILABLE_SESSION is set, the default time to wait to connect to a session is 65 seconds.
586+
# If there are no messages in the topic/queue the client will wait for 65 seconds for an AttachFrame
587+
# frame from the service before raising an OperationTimeoutError due to failure to connect.
588+
# max_wait_time, if specified, will allow the user to wait for fewer or more than 65 seconds to
589+
# connect to a session.
590+
if receiver._session_id == NEXT_AVAILABLE_SESSION and receiver._max_wait_time: # pylint: disable=protected-access
591+
timeout_in_ms = receiver._max_wait_time * 1000 # pylint: disable=protected-access
592+
open_receive_link_base_jitter_in_ms = 100
593+
open_recieve_link_buffer_in_ms = 20
594+
open_receive_link_buffer_threshold_in_ms = 1000
595+
jitter_base_in_ms = min(timeout_in_ms * 0.01, open_receive_link_base_jitter_in_ms)
596+
timeout_in_ms = math.floor(timeout_in_ms - jitter_base_in_ms * random.random())
597+
if timeout_in_ms >= open_receive_link_buffer_threshold_in_ms:
598+
timeout_in_ms -= open_recieve_link_buffer_in_ms
599+
600+
# If we have specified a client-side timeout, assure that it is encoded as an uint
601+
link_properties[OPERATION_TIMEOUT] = amqp_uint_value(timeout_in_ms)
602+
603+
kwargs["link_properties"] = link_properties
604+
579605
return ReceiveClient(
580606
config.hostname,
581607
source,

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,14 @@ async def _do_retryable_operation(
274274
self._container_id,
275275
last_exception,
276276
)
277+
if isinstance(last_exception, OperationTimeoutError):
278+
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
279+
"use max_wait_time on the ServiceBusReceiver to control the"\
280+
" timeout."
281+
error = OperationTimeoutError(
282+
message=description,
283+
)
284+
raise error from last_exception
277285
raise last_exception from None
278286
await self._backoff(
279287
retried_times=retried_times,
@@ -306,6 +314,14 @@ async def _backoff(
306314
entity_name,
307315
last_exception,
308316
)
317+
if isinstance(last_exception, OperationTimeoutError):
318+
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
319+
"use max_wait_time on the ServiceBusReceiver to control the"\
320+
" timeout."
321+
error = OperationTimeoutError(
322+
message=description,
323+
)
324+
raise error from last_exception
309325
raise last_exception
310326

311327
async def _mgmt_request_response(

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,12 @@ def get_queue_receiver(
341341
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
342342
the client fails to process the message. The default mode is PEEK_LOCK.
343343
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
344-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
345-
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
346-
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
347-
the `socket_timeout` optional parameter for more details.
344+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
345+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
346+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
347+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
348+
If connection errors are occurring due to write timing out,the connection timeout
349+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
348350
:keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An
349351
~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on
350352
receipt. If the receiver is a session receiver, it will apply to the session instead.
@@ -520,10 +522,12 @@ def get_subscription_receiver(
520522
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
521523
the client fails to process the message. The default mode is PEEK_LOCK.
522524
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
523-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
524-
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
525-
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
526-
the `socket_timeout` optional parameter for more details.
525+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
526+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
527+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
528+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
529+
If connection errors are occurring due to write timing out,the connection timeout
530+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
527531
:keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An
528532
~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on
529533
receipt. If the receiver is a session receiver, it will apply to the session instead.

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,12 @@ class ServiceBusReceiver(AsyncIterator, BaseHandler, ReceiverMixin):
117117
if the client fails to process the message.
118118
The default mode is PEEK_LOCK.
119119
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
120-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver
121-
will automatically stop receiving. The default value is None, meaning no timeout.
120+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
121+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
122+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
123+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
124+
to a session. If connection errors are occurring due to write timing out,the connection timeout
125+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
122126
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
123127
:keyword transport_type: The type of transport protocol that will be used for communicating with
124128
the Service Bus service. Default is `TransportType.Amqp`.
@@ -294,8 +298,12 @@ def _from_connection_string(
294298
if the client fails to process the message.
295299
The default mode is PEEK_LOCK.
296300
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
297-
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
298-
receiver will automatically stop receiving. The default value is None, meaning no timeout.
301+
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
302+
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
303+
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
304+
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
305+
to a session. If connection errors are occurring due to write timing out,the connection timeout
306+
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
299307
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
300308
:keyword transport_type: The type of transport protocol that will be used for communicating with
301309
the Service Bus service. Default is `TransportType.Amqp`.
@@ -649,7 +657,9 @@ async def receive_messages(
649657
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
650658
If no messages arrive, and no timeout is specified, this call will not return
651659
until the connection is closed. If specified, and no messages arrive within the
652-
timeout period, an empty list will be returned.
660+
timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
661+
when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
662+
Please use max_wait_time on the constructor to set the timeout for connecting to a session.
653663
:return: A list of messages received. If no messages are available, this will be an empty list.
654664
:rtype: list[~azure.servicebus.aio.ServiceBusReceivedMessage]
655665

0 commit comments

Comments
 (0)