Skip to content

Commit 2b48d42

Browse files
authored
[pyAMQP] Align pyAMQP (Azure#37095)
* align * updates * pylint
1 parent 0abe197 commit 2b48d42

File tree

12 files changed

+58
-17
lines changed

12 files changed

+58
-17
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ async def mgmt_request_async(
389389
self._mgmt_links[node] = mgmt_link
390390
await mgmt_link.open()
391391

392+
while not await self.client_ready_async():
393+
await asyncio.sleep(0.05)
394+
392395
while not await mgmt_link.ready():
393396
await self._connection.listen(wait=False)
394397

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_link_async.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def __init__(
100100
self._is_closed = False
101101
self._on_link_state_change = kwargs.get("on_link_state_change")
102102
self._on_attach = kwargs.get("on_attach")
103-
self._error = None
103+
self._error: Optional[AMQPLinkError] = None
104104

105105
async def __aenter__(self) -> "Link":
106106
await self.attach()
@@ -244,6 +244,11 @@ async def _incoming_detach(self, frame) -> None:
244244
self._error = error_cls(condition=frame[2][0], description=frame[2][1], info=frame[2][2])
245245
await self._set_state(LinkState.ERROR)
246246
else:
247+
if self.state != LinkState.DETACH_SENT:
248+
# Handle the case of when the remote side detaches without sending an error.
249+
# We should detach as per the spec but then retry connecting
250+
self._error = AMQPLinkError(condition=ErrorCondition.UnknownError,
251+
description="Link detached unexpectedly.", retryable=True)
247252
await self._set_state(LinkState.DETACHED)
248253

249254
async def attach(self) -> None:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
DispositionFrame,
1717
)
1818
from ..outcomes import Received, Accepted, Rejected, Released, Modified
19-
from ..error import AMQPLinkError, ErrorCondition
19+
from ..error import AMQPException, ErrorCondition
2020

2121

2222
_LOGGER = logging.getLogger(__name__)
@@ -113,7 +113,7 @@ async def _outgoing_disposition(
113113
role=self.role, first=first, last=last, settled=settled, state=state, batchable=batchable
114114
)
115115
if delivery_tag not in self._received_delivery_tags:
116-
raise AMQPLinkError(condition=ErrorCondition.InternalError, description = "Delivery tag not found.")
116+
raise AMQPException(condition=ErrorCondition.IllegalState, description = "Delivery tag not found.")
117117

118118
if self.network_trace:
119119
_LOGGER.debug("-> %r", DispositionFrame(*disposition_frame), extra=self.network_trace_params)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ def mgmt_request(
467467
self._mgmt_links[node] = mgmt_link
468468
mgmt_link.open()
469469

470+
while not self.client_ready():
471+
time.sleep(0.05)
472+
470473
while not mgmt_link.ready():
471474
self._connection.listen(wait=False)
472475
operation_type = operation_type or b"empty"

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/link.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def __init__(
9696
self._is_closed = False
9797
self._on_link_state_change = kwargs.get("on_link_state_change")
9898
self._on_attach = kwargs.get("on_attach")
99-
self._error = None
99+
self._error: Optional[AMQPLinkError] = None
100100

101101
def __enter__(self) -> "Link":
102102
self.attach()
@@ -239,6 +239,11 @@ def _incoming_detach(self, frame) -> None:
239239
self._error = error_cls(condition=frame[2][0], description=frame[2][1], info=frame[2][2])
240240
self._set_state(LinkState.ERROR)
241241
else:
242+
if self.state != LinkState.DETACH_SENT:
243+
# Handle the case of when the remote side detaches without sending an error.
244+
# We should detach as per the spec but then retry connecting
245+
self._error = AMQPLinkError(condition=ErrorCondition.UnknownError,
246+
description="Link detached unexpectedly.", retryable=True)
242247
self._set_state(LinkState.DETACHED)
243248

244249
def attach(self) -> None:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from .constants import LinkState, Role
1414
from .performatives import TransferFrame, DispositionFrame
1515
from .outcomes import Received, Accepted, Rejected, Released, Modified
16-
from .error import AMQPLinkError, ErrorCondition
16+
from .error import AMQPException, ErrorCondition
1717

1818

1919
_LOGGER = logging.getLogger(__name__)
@@ -109,7 +109,7 @@ def _outgoing_disposition(
109109
batchable: Optional[bool],
110110
):
111111
if delivery_tag not in self._received_delivery_tags:
112-
raise AMQPLinkError(condition=ErrorCondition.InternalError, description = "Delivery tag not found.")
112+
raise AMQPException(condition=ErrorCondition.IllegalState, description = "Delivery tag not found.")
113113

114114
disposition_frame = DispositionFrame(
115115
role=self.role, first=first, last=last, settled=settled, state=state, batchable=batchable

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
126126
self._port = PORT
127127
self.state: Optional[ConnectionState] = None
128128

129+
# Set the port for AmqpOverWebsocket
130+
if transport_type.value == TransportType.AmqpOverWebsocket.value:
131+
self._port = WEBSOCKET_PORT
132+
129133
# Custom Endpoint
130134
custom_endpoint_address = kwargs.get("custom_endpoint_address")
131135
custom_endpoint = None
@@ -157,6 +161,7 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
157161
self._transport = sasl_transport(
158162
host=endpoint,
159163
credential=kwargs["sasl_credential"],
164+
port=self._port,
160165
custom_endpoint=custom_endpoint,
161166
socket_timeout=self._socket_timeout,
162167
network_trace_params=self._network_trace_params,

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_transport.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def __init__(
171171
socket_timeout=SOCKET_TIMEOUT,
172172
socket_settings=None,
173173
raise_on_initial_eintr=True,
174+
use_tls: bool =True,
174175
**kwargs
175176
):
176177
self._quick_recv = None
@@ -186,6 +187,8 @@ def __init__(
186187
self.socket_settings = socket_settings
187188
self.socket_lock = Lock()
188189

190+
self._use_tls = use_tls
191+
189192
def connect(self):
190193
try:
191194
# are we already connected?
@@ -509,7 +512,8 @@ def __init__(
509512

510513
def _setup_transport(self):
511514
"""Wrap the socket in an SSL object."""
512-
self.sock = self._wrap_socket(self.sock, **self.sslopts)
515+
if self._use_tls:
516+
self.sock = self._wrap_socket(self.sock, **self.sslopts)
513517
self._quick_recv = self.sock.recv
514518

515519
def _wrap_socket(self, sock, context=None, **sslopts):
@@ -599,7 +603,8 @@ def _shutdown_transport(self):
599603
"""Unwrap a SSL socket, so we can call shutdown()."""
600604
if self.sock is not None:
601605
try:
602-
self.sock = self.sock.unwrap()
606+
if self._use_tls:
607+
self.sock = self.sock.unwrap()
603608
except OSError:
604609
pass
605610

@@ -737,11 +742,12 @@ def connect(self):
737742
) from None
738743
try:
739744
self.sock = create_connection(
740-
url="wss://{}".format(self._custom_endpoint or self._host),
745+
url="wss://{}".format(self._custom_endpoint or self._host) if self._use_tls
746+
else "ws://{}".format(self._custom_endpoint or self._host),
741747
subprotocols=[AMQP_WS_SUBPROTOCOL],
742748
timeout=self.socket_timeout, # timeout for read/write operations
743749
skip_utf8_validation=True,
744-
sslopt=self.sslopts,
750+
sslopt=self.sslopts if self._use_tls else None,
745751
http_proxy_host=http_proxy_host,
746752
http_proxy_port=http_proxy_port,
747753
http_proxy_auth=http_proxy_auth,

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async def open_async(self, connection=None):
244244
self._external_connection = True
245245
if not self._connection:
246246
self._connection = Connection(
247-
"amqps://" + self._hostname,
247+
"amqps://" + self._hostname if self._use_tls else "amqp://" + self._hostname,
248248
sasl_credential=self._auth.sasl,
249249
ssl_opts={'ca_certs': self._connection_verify or certifi.where()},
250250
container_id=self._name,
@@ -257,6 +257,7 @@ async def open_async(self, connection=None):
257257
http_proxy=self._http_proxy,
258258
custom_endpoint_address=self._custom_endpoint_address,
259259
socket_timeout=self._socket_timeout,
260+
use_tls=self._use_tls,
260261
)
261262
await self._connection.open()
262263
if not self._session:

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_connection_async.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ def __init__(# pylint:disable=too-many-locals,too-many-statements
107107
self._port = PORT
108108
self.state: Optional[ConnectionState] = None
109109

110+
# Set the port for AmqpOverWebsocket
111+
if transport_type.value == TransportType.AmqpOverWebsocket.value:
112+
self._port = WEBSOCKET_PORT
113+
110114
# Custom Endpoint
111115
custom_endpoint_address = kwargs.get("custom_endpoint_address")
112116
custom_endpoint = None
@@ -141,6 +145,7 @@ def __init__(# pylint:disable=too-many-locals,too-many-statements
141145
self._transport: Union[SASLTransport, SASLWithWebSocket, AsyncTransport] = sasl_transport(
142146
host=endpoint,
143147
credential=kwargs["sasl_credential"],
148+
port=self._port,
144149
custom_endpoint=custom_endpoint,
145150
socket_timeout=self._socket_timeout,
146151
network_trace_params=self._network_trace_params,

0 commit comments

Comments
 (0)