Skip to content

Commit 62e1b79

Browse files
authored
[EventHubs] sync with SB pyamqp (#34407)
* sync pyamqp * update credential used for tests * apply cr changes * change reconnect test back * copy over sb changes
1 parent 65a743b commit 62e1b79

File tree

10 files changed

+34
-29
lines changed

10 files changed

+34
-29
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,13 @@ def _get_next_outgoing_channel(self) -> int:
337337

338338
def _outgoing_empty(self) -> None:
339339
"""Send an empty frame to prevent the connection from reaching an idle timeout."""
340-
if self._network_trace:
341-
_LOGGER.debug("-> EmptyFrame()", extra=self._network_trace_params)
342340
if self._error:
343341
raise self._error
344342

345343
try:
346344
if self._can_write():
345+
if self._network_trace:
346+
_LOGGER.debug("-> EmptyFrame()", extra=self._network_trace_params)
347347
self._transport.write(EMPTY_FRAME)
348348
self._last_frame_sent_time = time.time()
349349
except (OSError, IOError, SSLError, socket.error) as exc:
@@ -516,7 +516,7 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
516516
self._error = AMQPConnectionError(
517517
condition=frame[0][0], description=frame[0][1], info=frame[0][2]
518518
)
519-
_LOGGER.error(
519+
_LOGGER.warning(
520520
"Connection closed with error: %r", frame[0],
521521
extra=self._network_trace_params
522522
)
@@ -667,7 +667,10 @@ def _process_outgoing_frame(self, channel: int, frame) -> None:
667667
ConnectionState.OPEN_SENT,
668668
ConnectionState.OPENED,
669669
]:
670-
raise ValueError("Connection not open.")
670+
raise AMQPConnectionError(
671+
ErrorCondition.SocketError,
672+
description="Connection not open."
673+
)
671674
now = time.time()
672675
if get_local_timeout(
673676
now,

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,11 @@ def _write(self, s):
648648
"""Write a string out to the SSL socket fully.
649649
:param str s: The string to write.
650650
"""
651-
write = self.sock.send
651+
try:
652+
write = self.sock.send
653+
except AttributeError:
654+
raise IOError("Socket has already been closed.") from None
655+
652656
while s:
653657
try:
654658
n = write(s)
@@ -659,7 +663,7 @@ def _write(self, s):
659663
# None.
660664
n = 0
661665
if not n:
662-
raise IOError("Socket closed")
666+
raise IOError("Socket closed.")
663667
s = s[n:]
664668

665669
def negotiate(self):

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_cbs_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ async def _on_execute_operation_complete(
169169
async def _update_status(self):
170170
if self.auth_state in (CbsAuthState.OK, CbsAuthState.REFRESH_REQUIRED):
171171
is_expired, is_refresh_required = check_expiration_and_refresh_status(
172-
self._expires_on, self._refresh_window
172+
self._expires_on, self._refresh_window # type: ignore
173173
)
174174
_LOGGER.debug(
175175
"CBS status check: state == %r, expired == %r, refresh required == %r",
@@ -235,13 +235,13 @@ async def update_token(self) -> None:
235235
elif isinstance(access_token.token, str):
236236
self._token = access_token.token
237237
else:
238-
raise ValueError("Token must be either bytes or string.")
238+
raise ValueError("Token must be a string or bytes.")
239239
if isinstance(self._auth.token_type, bytes):
240240
token_type = self._auth.token_type.decode()
241241
elif isinstance(self._auth.token_type, str):
242242
token_type = self._auth.token_type
243243
else:
244-
raise ValueError("Token type must be either bytes or string.")
244+
raise ValueError("Token type must be a string or bytes.")
245245

246246
self._token_put_time = int(utc_now().timestamp())
247247
if self._token and token_type:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,6 @@ async def _keep_alive_async(self):
150150
current_time = time.time()
151151
elapsed_time = current_time - start_time
152152
if elapsed_time >= self._keep_alive_interval:
153-
_logger.debug(
154-
"Keeping %r connection alive.",
155-
self.__class__.__name__,
156-
extra=self._network_trace_params
157-
)
158153
await asyncio.shield(self._connection.listen(wait=self._socket_timeout,
159154
batch=self._link.current_link_credit))
160155
start_time = current_time
@@ -723,7 +718,7 @@ async def _client_ready_async(self):
723718
if not self._link:
724719
self._link = self._session.create_receiver_link(
725720
source_address=self.source,
726-
link_credit=self._link_credit,
721+
link_credit=0, # link_credit=0 on flow frame sent before client is ready
727722
send_settle_mode=self._send_settle_mode,
728723
rcv_settle_mode=self._receive_settle_mode,
729724
max_message_size=self._max_message_size,
@@ -748,7 +743,7 @@ async def _client_run_async(self, **kwargs):
748743
"""
749744
try:
750745
if self._link.current_link_credit <= 0:
751-
await self._link.flow()
746+
await self._link.flow(link_credit=self._link_credit)
752747
await self._connection.listen(wait=self._socket_timeout, **kwargs)
753748
except ValueError:
754749
_logger.info("Timeout reached, closing receiver.", extra=self._network_trace_params)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -345,14 +345,13 @@ def _get_next_outgoing_channel(self) -> int:
345345

346346
async def _outgoing_empty(self) -> None:
347347
"""Send an empty frame to prevent the connection from reaching an idle timeout."""
348-
if self._network_trace:
349-
_LOGGER.debug("-> EmptyFrame()", extra=self._network_trace_params)
350-
351348
if self._error:
352349
raise self._error
353350

354351
try:
355352
if self._can_write():
353+
if self._network_trace:
354+
_LOGGER.debug("-> EmptyFrame()", extra=self._network_trace_params)
356355
await self._transport.write(EMPTY_FRAME)
357356
self._last_frame_sent_time = time.time()
358357
except (OSError, IOError, SSLError, socket.error) as exc:
@@ -533,7 +532,7 @@ async def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
533532
self._error = AMQPConnectionError(
534533
condition=frame[0][0], description=frame[0][1], info=frame[0][2]
535534
)
536-
_LOGGER.error(
535+
_LOGGER.warning(
537536
"Connection closed with error: %r", frame[0],
538537
extra=self._network_trace_params
539538
)
@@ -682,7 +681,10 @@ async def _process_outgoing_frame(self, channel: int, frame) -> None:
682681
ConnectionState.OPEN_SENT,
683682
ConnectionState.OPENED,
684683
]:
685-
raise ValueError("Connection not open.")
684+
raise AMQPConnectionError(
685+
ErrorCondition.SocketError,
686+
description="Connection not open."
687+
)
686688
now = time.time()
687689
if get_local_timeout(
688690
now,

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/cbs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ def update_token(self) -> None:
280280
utc_from_timestamp(self._expires_on),
281281
)
282282

283-
def handle_token(self) -> bool: # pylint: disable=inconsistent-return-statements
283+
def handle_token(self) -> bool: # pylint: disable=inconsistent-return-statements
284284
if not self._cbs_link_ready():
285285
return False
286286
self._update_status()

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ def _keep_alive(self):
236236
current_time = time.time()
237237
elapsed_time = current_time - start_time
238238
if elapsed_time >= self._keep_alive_interval:
239-
_logger.debug("Keeping %r connection alive.", self.__class__.__name__)
240239
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
241240
start_time = current_time
242241
time.sleep(1)
@@ -827,7 +826,7 @@ def _client_ready(self):
827826
if not self._link:
828827
self._link = self._session.create_receiver_link(
829828
source_address=self.source,
830-
link_credit=self._link_credit, # link_credit=0 on flow frame sent before client is ready
829+
link_credit=0, # link_credit=0 on flow frame sent before client is ready
831830
send_settle_mode=self._send_settle_mode,
832831
rcv_settle_mode=self._receive_settle_mode,
833832
max_message_size=self._max_message_size,
@@ -852,7 +851,7 @@ def _client_run(self, **kwargs):
852851
"""
853852
try:
854853
if self._link.current_link_credit <= 0:
855-
self._link.flow()
854+
self._link.flow(link_credit=self._link_credit)
856855
self._connection.listen(wait=self._socket_timeout, **kwargs)
857856
except ValueError:
858857
_logger.info("Timeout reached, closing receiver.", extra=self._network_trace_params)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class Header(NamedTuple):
6262
This field contains the relative Message priority. Higher numbers indicate higher priority Messages.
6363
Messages with higher priorities MAY be delivered before those with lower priorities. An AMQP intermediary
6464
implementing distinct priority levels MUST do so in the following manner:
65-
65+
6666
- If n distince priorities are implemented and n is less than 10 - priorities 0 to (5 - ceiling(n/2))
6767
MUST be treated equivalently and MUST be the lowest effective priority. The priorities (4 + fioor(n/2))
6868
and above MUST be treated equivalently and MUST be the highest effective priority. The priorities
@@ -184,7 +184,7 @@ class Message(NamedTuple):
184184
delivery_annotations: Optional[Dict[Union[str, bytes], Any]] = None
185185
message_annotations: Optional[Dict[Union[str, bytes], Any]] = None
186186
properties: Optional[Properties] = None
187-
application_properties: Optional[Dict[Union[str, bytes], Any]] = None # TODO: make not read-only
187+
application_properties: Optional[Dict[Union[str, bytes], Any]] = None
188188
data: Optional[bytes] = None
189189
sequence: Optional[List[Any]] = None
190190
value: Optional[Any] = None

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_reconnect_async.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ async def on_event_received(event):
169169
assert consumer._handler._connection._state == uamqp.c_uamqp.ConnectionState.DISCARDING
170170
await consumer.receive(batch=False, max_batch_size=1, max_wait_time=10)
171171
else:
172-
await consumer._handler.do_work_async()
172+
with pytest.raises(error.AMQPConnectionError):
173+
await consumer._handler.do_work_async()
173174
assert consumer._handler._connection.state == constants.ConnectionState.END
174175
try:
175176
await asyncio.wait_for(consumer.receive(), timeout=10)

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_reconnect.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ def on_event_received(event):
164164
consumer._handler.do_work()
165165
assert consumer._handler._connection._state == uamqp.c_uamqp.ConnectionState.DISCARDING
166166
else:
167-
consumer._handler.do_work()
167+
with pytest.raises(error.AMQPConnectionError):
168+
consumer._handler.do_work()
168169
assert consumer._handler._connection.state == constants.ConnectionState.END
169170

170171
duration = 10

0 commit comments

Comments
 (0)