Skip to content

Commit d2cf143

Browse files
authored
[Azure AMQP] Handle Websocket Disconnect in Async (Azure#32061)
* handle websocket close * make sure buffer has left over bytes
1 parent aa591d4 commit d2cf143

File tree

2 files changed

+34
-24
lines changed

2 files changed

+34
-24
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -499,18 +499,23 @@ async def _read(self, toread, buffer=None, **kwargs): # pylint: disable=unused-
499499
length += nbytes
500500
toread -= nbytes
501501
try:
502-
while toread:
503-
data = await self.sock.receive_bytes()
504-
read_length = len(data)
505-
if read_length <= toread:
506-
view[length : length + read_length] = data
507-
toread -= read_length
508-
length += read_length
509-
else:
510-
view[length : length + toread] = data[0:toread]
511-
self._read_buffer = BytesIO(data[toread:])
512-
toread = 0
513-
return view
502+
try:
503+
while toread:
504+
data = await self.sock.receive_bytes()
505+
read_length = len(data)
506+
if read_length <= toread:
507+
view[length : length + read_length] = data
508+
toread -= read_length
509+
length += read_length
510+
else:
511+
view[length : length + toread] = data[0:toread]
512+
self._read_buffer = BytesIO(data[toread:])
513+
toread = 0
514+
return view
515+
except TypeError as te:
516+
# aiohttp websocket raises TypeError when a websocket disconnects, as it ends up
517+
# reading None over the wire and cant convert to bytes.
518+
raise ConnectionError("Websocket disconnected: %r" % te) from None
514519
except:
515520
self._read_buffer = BytesIO(view[:length])
516521
raise

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_transport_async.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -499,18 +499,23 @@ async def _read(self, toread, buffer=None, **kwargs): # pylint: disable=unused-
499499
length += nbytes
500500
toread -= nbytes
501501
try:
502-
while toread:
503-
data = await self.sock.receive_bytes()
504-
read_length = len(data)
505-
if read_length <= toread:
506-
view[length : length + read_length] = data
507-
toread -= read_length
508-
length += read_length
509-
else:
510-
view[length : length + toread] = data[0:toread]
511-
self._read_buffer = BytesIO(data[toread:])
512-
toread = 0
513-
return view
502+
try:
503+
while toread:
504+
data = await self.sock.receive_bytes()
505+
read_length = len(data)
506+
if read_length <= toread:
507+
view[length : length + read_length] = data
508+
toread -= read_length
509+
length += read_length
510+
else:
511+
view[length : length + toread] = data[0:toread]
512+
self._read_buffer = BytesIO(data[toread:])
513+
toread = 0
514+
return view
515+
except TypeError as te:
516+
# aiohttp websocket raises TypeError when a websocket disconnects, as it ends up
517+
# reading None over the wire and cant convert to bytes.
518+
raise ConnectionError("Websocket disconnected: %r" % te) from None
514519
except:
515520
self._read_buffer = BytesIO(view[:length])
516521
raise

0 commit comments

Comments
 (0)