Skip to content

Commit 3727b87

Browse files
authored
[SB] Sync Azure AMQP Typing changes (#35472)
* bring over typing update * fix type of node
1 parent f8e4e43 commit 3727b87

File tree

6 files changed

+112
-48
lines changed

6 files changed

+112
-48
lines changed

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
# license information.
55
#--------------------------------------------------------------------------
66
# TODO: Check types of kwargs (issue exists for this)
7+
# pylint: disable=too-many-lines
78
import asyncio
89
import logging
910
import time
1011
import queue
1112
from functools import partial
12-
from typing import Any, Dict, Optional, Tuple, Union, overload, cast
13+
from typing import Any, Callable, Coroutine, List, Dict, Optional, Tuple, Union, overload, cast
1314
from typing_extensions import Literal
1415
import certifi
1516

@@ -23,7 +24,7 @@
2324
SendClient as SendClientSync,
2425
Outcomes
2526
)
26-
from ..message import _MessageDelivery
27+
from ..message import _MessageDelivery, Message
2728
from ..constants import (
2829
MessageDeliveryState,
2930
SEND_DISPOSITION_ACCEPT,
@@ -350,7 +351,16 @@ async def do_work_async(self, **kwargs):
350351
return True
351352
return await self._client_run_async(**kwargs)
352353

353-
async def mgmt_request_async(self, message, **kwargs):
354+
async def mgmt_request_async(
355+
self,
356+
message,
357+
*,
358+
operation: Optional[Union[bytes, str]] = None,
359+
operation_type: Optional[Union[bytes, str]] = None,
360+
node: str = "$management",
361+
timeout: float = 0,
362+
**kwargs
363+
):
354364
"""
355365
:param message: The message to send in the management request.
356366
:type message: ~pyamqp.message.Message
@@ -370,10 +380,6 @@ async def mgmt_request_async(self, message, **kwargs):
370380
# The method also takes "status_code_field" and "status_description_field"
371381
# keyword arguments as alternate names for the status code and description
372382
# in the response body. Those two keyword arguments are used in Azure services only.
373-
operation = kwargs.pop("operation", None)
374-
operation_type = kwargs.pop("operation_type", None)
375-
node = kwargs.pop("node", "$management")
376-
timeout = kwargs.pop('timeout', 0)
377383
async with self._mgmt_link_lock_async:
378384
try:
379385
mgmt_link = self._mgmt_links[node]
@@ -569,8 +575,7 @@ async def _on_send_complete_async(self, message_delivery, reason, state):
569575
condition=ErrorCondition.UnknownError
570576
)
571577

572-
async def _send_message_impl_async(self, message, **kwargs):
573-
timeout = kwargs.pop("timeout", 0)
578+
async def _send_message_impl_async(self, message, *, timeout: float = 0):
574579
expire_time = (time.time() + timeout) if timeout else None
575580
await self.open_async()
576581
message_delivery = _MessageDelivery(
@@ -604,11 +609,19 @@ async def _send_message_impl_async(self, message, **kwargs):
604609
# This is a default handler
605610
raise MessageException(condition=ErrorCondition.UnknownError, description="Send failed.") from None
606611

607-
async def send_message_async(self, message, **kwargs):
612+
async def send_message_async(self, message, *, timeout: float = 0, **kwargs):
608613
"""
609614
:param ~pyamqp.message.Message message: The message to send.
615+
:keyword float timeout: timeout in seconds. If set to
616+
0, the client will continue to wait until the message is sent or error happens. The
617+
default is 0.
610618
"""
611-
await self._do_retryable_operation_async(self._send_message_impl_async, message=message, **kwargs)
619+
await self._do_retryable_operation_async(
620+
self._send_message_impl_async,
621+
message=message,
622+
timeout=timeout,
623+
**kwargs
624+
)
612625

613626

614627
class ReceiveClientAsync(ReceiveClientSync, AMQPClientAsync):
@@ -767,12 +780,17 @@ async def _message_received_async(self, frame, message):
767780
if not self._streaming_receive:
768781
self._received_messages.put((frame, message))
769782

770-
async def _receive_message_batch_impl_async(self, max_batch_size=None, on_message_received=None, timeout=0):
783+
async def _receive_message_batch_impl_async(
784+
self,
785+
max_batch_size: Optional[int] = None,
786+
on_message_received: Optional[Callable] = None,
787+
timeout: float = 0,
788+
):
771789
self._message_received_callback = on_message_received
772790
max_batch_size = max_batch_size or self._link_credit
773791
timeout_time = time.time() + timeout if timeout else 0
774792
receiving = True
775-
batch = []
793+
batch: List[Message] = []
776794
await self.open_async()
777795
while len(batch) < max_batch_size:
778796
try:
@@ -825,7 +843,14 @@ async def close_async(self):
825843
self._received_messages = queue.Queue()
826844
await super(ReceiveClientAsync, self).close_async()
827845

828-
async def receive_message_batch_async(self, **kwargs):
846+
async def receive_message_batch_async( # pylint: disable=unused-argument
847+
self,
848+
*,
849+
max_batch_size: Optional[int] = None,
850+
on_message_received: Optional[Callable] = None,
851+
timeout: float = 0,
852+
**kwargs
853+
) -> Coroutine[Any, Any, list]:
829854
"""Receive a batch of messages. Messages returned in the batch have already been
830855
accepted - if you wish to add logic to accept or reject messages based on custom
831856
criteria, pass in a callback. This method will return as soon as some messages are
@@ -844,12 +869,14 @@ async def receive_message_batch_async(self, **kwargs):
844869
0, the client will continue to wait until at least one message is received. The
845870
default is 0.
846871
:paramtype timeout: float
847-
:return: Retryable operation coroutine.
848-
:rtype: Coroutine[Any, Any, Any]
872+
:returns: Retryable operation coroutine.
873+
:rtype: Coroutine[Any, Any, list]
849874
"""
850875
return await self._do_retryable_operation_async(
851876
self._receive_message_batch_impl_async,
852-
**kwargs
877+
max_batch_size=max_batch_size,
878+
on_message_received=on_message_received,
879+
timeout=timeout,
853880
)
854881

855882
async def receive_messages_iter_async(self, timeout=None, on_message_received=None):

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_management_link_async.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import time
88
import logging
99
from functools import partial
10+
from typing import Optional, Union
1011

1112
from ..management_link import PendingManagementOperation
1213
from ._sender_async import SenderLink
@@ -189,7 +190,16 @@ async def open(self):
189190
await self._response_link.attach()
190191
await self._request_link.attach()
191192

192-
async def execute_operation(self, message, on_execute_operation_complete, **kwargs):
193+
async def execute_operation(
194+
self,
195+
message,
196+
on_execute_operation_complete,
197+
*,
198+
operation: Optional[Union[bytes, str]] = None,
199+
type: Optional[Union[bytes, str]] = None,
200+
locales: Optional[str] = None,
201+
timeout: Optional[float] = None
202+
):
193203
"""Execute a request and wait on a response.
194204
195205
:param message: The message to send in the management request.
@@ -212,11 +222,10 @@ async def execute_operation(self, message, on_execute_operation_complete, **kwar
212222
to the management request must be received.
213223
:rtype: None
214224
"""
215-
timeout = kwargs.get("timeout")
216-
message.application_properties["operation"] = kwargs.get("operation")
217-
message.application_properties["type"] = kwargs.get("type")
218-
if "locales" in kwargs:
219-
message.application_properties["locales"] = kwargs.get("locales")
225+
message.application_properties["operation"] = operation
226+
message.application_properties["type"] = type
227+
if locales:
228+
message.application_properties["locales"] = locales
220229
try:
221230
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
222231
new_properties = message.properties._replace(message_id=self.next_message_id)

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
import time
1616
import uuid
1717
from functools import partial
18-
from typing import Any, Dict, Optional, Tuple, Union, overload, cast
18+
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload, cast
1919
import certifi
2020
from typing_extensions import Literal
2121

2222
from ._connection import Connection
23-
from .message import _MessageDelivery
23+
from .message import _MessageDelivery, Message
2424
from .error import (
2525
AMQPException,
2626
ErrorCondition,
@@ -426,7 +426,16 @@ def do_work(self, **kwargs):
426426
return True
427427
return self._client_run(**kwargs)
428428

429-
def mgmt_request(self, message, **kwargs):
429+
def mgmt_request(
430+
self,
431+
message,
432+
*,
433+
operation: Optional[Union[str, bytes]] = None,
434+
operation_type: Optional[Union[str, bytes]] = None,
435+
node: str = "$management",
436+
timeout: float = 0,
437+
**kwargs
438+
):
430439
"""
431440
:param message: The message to send in the management request.
432441
:type message: ~pyamqp.message.Message
@@ -446,10 +455,6 @@ def mgmt_request(self, message, **kwargs):
446455
# The method also takes "status_code_field" and "status_description_field"
447456
# keyword arguments as alternate names for the status code and description
448457
# in the response body. Those two keyword arguments are used in Azure services only.
449-
operation = kwargs.pop("operation", None)
450-
operation_type = kwargs.pop("operation_type", None)
451-
node = kwargs.pop("node", "$management")
452-
timeout = kwargs.pop("timeout", 0)
453458
with self._mgmt_link_lock:
454459
try:
455460
mgmt_link = self._mgmt_links[node]
@@ -659,8 +664,7 @@ def _on_send_complete(self, message_delivery, reason, state):
659664
message_delivery, condition=ErrorCondition.UnknownError
660665
)
661666

662-
def _send_message_impl(self, message, **kwargs):
663-
timeout = kwargs.pop("timeout", 0)
667+
def _send_message_impl(self, message, *, timeout: float = 0):
664668
expire_time = (time.time() + timeout) if timeout else None
665669
self.open()
666670
message_delivery = _MessageDelivery(
@@ -692,14 +696,19 @@ def _send_message_impl(self, message, **kwargs):
692696
condition=ErrorCondition.UnknownError, description="Send failed."
693697
) from None
694698

695-
def send_message(self, message, **kwargs):
699+
def send_message(self, message, *, timeout: float = 0, **kwargs):
696700
"""
697701
:param ~pyamqp.message.Message message:
698702
:keyword float timeout: timeout in seconds. If set to
699703
0, the client will continue to wait until the message is sent or error happens. The
700704
default is 0.
701705
"""
702-
self._do_retryable_operation(self._send_message_impl, message=message, **kwargs)
706+
self._do_retryable_operation(
707+
self._send_message_impl,
708+
message=message,
709+
timeout=timeout,
710+
**kwargs
711+
)
703712

704713

705714
class ReceiveClient(AMQPClient): # pylint:disable=too-many-instance-attributes
@@ -877,13 +886,16 @@ def _message_received(self, frame, message):
877886
self._received_messages.put((frame, message))
878887

879888
def _receive_message_batch_impl(
880-
self, max_batch_size=None, on_message_received=None, timeout=0
889+
self,
890+
max_batch_size: Optional[int] = None,
891+
on_message_received: Optional[Callable] = None,
892+
timeout: float = 0,
881893
):
882894
self._message_received_callback = on_message_received
883895
max_batch_size = max_batch_size or self._link_credit
884896
timeout = time.time() + timeout if timeout else 0
885897
receiving = True
886-
batch = []
898+
batch: List[Message] = []
887899
self.open()
888900
while len(batch) < max_batch_size:
889901
try:
@@ -928,7 +940,14 @@ def close(self):
928940
self._received_messages = queue.Queue()
929941
super(ReceiveClient, self).close()
930942

931-
def receive_message_batch(self, **kwargs):
943+
def receive_message_batch( # pylint: disable=unused-argument
944+
self,
945+
*,
946+
max_batch_size: Optional[int] = None,
947+
on_message_received: Optional[Callable] = None,
948+
timeout: float = 0,
949+
**kwargs
950+
):
932951
"""Receive a batch of messages. Messages returned in the batch have already been
933952
accepted - if you wish to add logic to accept or reject messages based on custom
934953
criteria, pass in a callback. This method will return as soon as some messages are
@@ -950,7 +969,12 @@ def receive_message_batch(self, **kwargs):
950969
:return: A list of messages.
951970
:rtype: list[~pyamqp.message.Message]
952971
"""
953-
return self._do_retryable_operation(self._receive_message_batch_impl, **kwargs)
972+
return self._do_retryable_operation(
973+
self._receive_message_batch_impl,
974+
max_batch_size=max_batch_size,
975+
on_message_received=on_message_received,
976+
timeout=timeout
977+
)
954978

955979
def receive_messages_iter(self, timeout=None, on_message_received=None):
956980
"""Receive messages by generator. Messages returned in the generator have already been

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/management_link.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99
from functools import partial
1010
from collections import namedtuple
11+
from typing import Optional, Union
1112

1213
from .sender import SenderLink
1314
from .receiver import ReceiverLink
@@ -193,7 +194,11 @@ def execute_operation(
193194
self,
194195
message,
195196
on_execute_operation_complete,
196-
**kwargs
197+
*,
198+
operation: Optional[Union[bytes, str]] = None,
199+
type: Optional[Union[bytes, str]] = None,
200+
locales: Optional[str] = None,
201+
timeout: Optional[float] = None
197202
):
198203
"""Execute a request and wait on a response.
199204
@@ -217,11 +222,10 @@ def execute_operation(
217222
to the management request must be received.
218223
:rtype: None
219224
"""
220-
timeout = kwargs.get("timeout")
221-
message.application_properties["operation"] = kwargs.get("operation")
222-
message.application_properties["type"] = kwargs.get("type")
223-
if "locales" in kwargs:
224-
message.application_properties["locales"] = kwargs.get("locales")
225+
message.application_properties["operation"] = operation
226+
message.application_properties["type"] = type
227+
if locales:
228+
message.application_properties["locales"] = locales
225229
try:
226230
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
227231
new_properties = message.properties._replace(message_id=self.next_message_id)

sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ def mgmt_client_request(
926926
*,
927927
operation: bytes,
928928
operation_type: bytes,
929-
node: bytes,
929+
node: str,
930930
timeout: int,
931931
callback: Callable
932932
) -> "ServiceBusReceivedMessage":
@@ -936,7 +936,7 @@ def mgmt_client_request(
936936
:param ~pyamqp.message.Message mgmt_msg: Message.
937937
:keyword bytes operation: Operation.
938938
:keyword bytes operation_type: Op type.
939-
:keyword bytes node: Mgmt target.
939+
:keyword str node: Mgmt target.
940940
:keyword int timeout: Timeout.
941941
:keyword callable callback: Callback to process request response.
942942
:return: ServiceBusReceivedMessage

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ async def mgmt_client_request_async(
418418
*,
419419
operation: bytes,
420420
operation_type: bytes,
421-
node: bytes,
421+
node: str,
422422
timeout: int,
423423
callback: Callable
424424
) -> "ServiceBusReceivedMessage":
@@ -428,7 +428,7 @@ async def mgmt_client_request_async(
428428
:param ~pyamqp.message.Message mgmt_msg: Message.
429429
:keyword bytes operation: Operation.
430430
:keyword bytes operation_type: Op type.
431-
:keyword bytes node: Mgmt target.
431+
:keyword str node: Mgmt target.
432432
:keyword int timeout: Timeout.
433433
:keyword callable callback: Callback to process request response.
434434
:return: The result returned by the mgmt request.

0 commit comments

Comments
 (0)