Skip to content

Commit dcde006

Browse files
support monkey-patched gevent queue in the client (Fixes #403)
1 parent 7a41390 commit dcde006

File tree

5 files changed

+55
-37
lines changed

5 files changed

+55
-37
lines changed

src/engineio/async_client.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ async def connect(self, url, headers=None, transports=None,
126126
if not transports:
127127
raise ValueError('No valid transports provided')
128128
self.transports = transports or valid_transports
129-
self.queue = self.create_queue()
130129
return await getattr(self, '_connect_' + self.transports[0])(
131130
url, headers or {}, engineio_path)
132131

@@ -199,11 +198,15 @@ async def sleep(self, seconds=0):
199198
"""
200199
return await asyncio.sleep(seconds)
201200

202-
def create_queue(self):
201+
def create_queue(self, *args, **kwargs):
203202
"""Create a queue object."""
204-
q = asyncio.Queue()
205-
q.Empty = asyncio.QueueEmpty
206-
return q
203+
return asyncio.Queue(*args, **kwargs)
204+
205+
def get_queue_empty_exception(self):
206+
"""Return the queue empty exception raised by queues created by the
207+
``create_queue()`` method.
208+
"""
209+
return asyncio.QueueEmpty
207210

208211
def create_event(self):
209212
"""Create an event object."""
@@ -624,7 +627,7 @@ async def _write_loop(self):
624627
packets = None
625628
try:
626629
packets = [await asyncio.wait_for(self.queue.get(), timeout)]
627-
except (self.queue.Empty, asyncio.TimeoutError):
630+
except (self.queue_empty, asyncio.TimeoutError):
628631
self.logger.error('packet queue is empty, aborting')
629632
break
630633
except asyncio.CancelledError: # pragma: no cover
@@ -636,7 +639,7 @@ async def _write_loop(self):
636639
while True:
637640
try:
638641
packets.append(self.queue.get_nowait())
639-
except self.queue.Empty:
642+
except self.queue_empty:
640643
break
641644
if packets[-1] is None:
642645
packets = packets[:-1]

src/engineio/base_client.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ def __init__(self, logger=False, json=None, request_timeout=5,
6161
self.ws = None
6262
self.read_loop_task = None
6363
self.write_loop_task = None
64-
self.queue = None
64+
self.queue = self.create_queue()
65+
self.queue_empty = self.get_queue_empty_exception()
6566
self.state = 'disconnected'
6667
self.ssl_verify = ssl_verify
6768
self.websocket_extra_options = websocket_extra_options or {}
@@ -156,3 +157,13 @@ def _get_url_timestamp(self):
156157
if not self.timestamp_requests:
157158
return ''
158159
return '&t=' + str(time.time())
160+
161+
def create_queue(self, *args, **kwargs): # pragma: no cover
162+
"""Create a queue object."""
163+
raise NotImplementedError('must be implemented in a subclass')
164+
165+
def get_queue_empty_exception(self): # pragma: no cover
166+
"""Return the queue empty exception raised by queues created by the
167+
``create_queue()`` method.
168+
"""
169+
raise NotImplementedError('must be implemented in a subclass')

src/engineio/client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ def connect(self, url, headers=None, transports=None,
9191
if not transports:
9292
raise ValueError('No valid transports provided')
9393
self.transports = transports or valid_transports
94-
self.queue = self.create_queue()
9594
return getattr(self, '_connect_' + self.transports[0])(
9695
url, headers or {}, engineio_path)
9796

@@ -162,9 +161,13 @@ def sleep(self, seconds=0):
162161

163162
def create_queue(self, *args, **kwargs):
164163
"""Create a queue object."""
165-
q = queue.Queue(*args, **kwargs)
166-
q.Empty = queue.Empty
167-
return q
164+
return queue.Queue(*args, **kwargs)
165+
166+
def get_queue_empty_exception(self):
167+
"""Return the queue empty exception raised by queues created by the
168+
``create_queue()`` method.
169+
"""
170+
return queue.Empty
168171

169172
def create_event(self, *args, **kwargs):
170173
"""Create an event object."""
@@ -566,7 +569,7 @@ def _write_loop(self):
566569
packets = None
567570
try:
568571
packets = [self.queue.get(timeout=timeout)]
569-
except self.queue.Empty:
572+
except self.queue_empty:
570573
self.logger.error('packet queue is empty, aborting')
571574
break
572575
if packets == [None]:
@@ -576,7 +579,7 @@ def _write_loop(self):
576579
while True:
577580
try:
578581
packets.append(self.queue.get(block=False))
579-
except self.queue.Empty:
582+
except self.queue_empty:
580583
break
581584
if packets[-1] is None:
582585
packets = packets[:-1]

tests/async/test_client.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ async def test_sleep(self):
225225
async def test_create_queue(self):
226226
c = async_client.AsyncClient()
227227
q = c.create_queue()
228-
with pytest.raises(q.Empty):
228+
with pytest.raises(c.queue_empty):
229229
q.get_nowait()
230230

231231
async def test_create_event(self):
@@ -1200,7 +1200,7 @@ async def test_write_loop_empty_queue(self):
12001200
c.ping_interval = 1
12011201
c.ping_timeout = 2
12021202
c.queue = mock.MagicMock()
1203-
c.queue.Empty = RuntimeError
1203+
c.queue_empty = RuntimeError
12041204
c.queue.get = mock.AsyncMock(side_effect=RuntimeError)
12051205
await c._write_loop()
12061206
c.queue.get.assert_awaited_once_with()
@@ -1213,7 +1213,7 @@ async def test_write_loop_polling_one_packet(self):
12131213
c.ping_timeout = 2
12141214
c.current_transport = 'polling'
12151215
c.queue = mock.MagicMock()
1216-
c.queue.Empty = RuntimeError
1216+
c.queue_empty = RuntimeError
12171217
c.queue.get = mock.AsyncMock(
12181218
side_effect=[
12191219
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
@@ -1244,7 +1244,7 @@ async def test_write_loop_polling_three_packets(self):
12441244
c.ping_timeout = 2
12451245
c.current_transport = 'polling'
12461246
c.queue = mock.MagicMock()
1247-
c.queue.Empty = RuntimeError
1247+
c.queue_empty = RuntimeError
12481248
c.queue.get = mock.AsyncMock(
12491249
side_effect=[
12501250
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
@@ -1285,7 +1285,7 @@ async def test_write_loop_polling_two_packets_done(self):
12851285
c.ping_timeout = 2
12861286
c.current_transport = 'polling'
12871287
c.queue = mock.MagicMock()
1288-
c.queue.Empty = RuntimeError
1288+
c.queue_empty = RuntimeError
12891289
c.queue.get = mock.AsyncMock(
12901290
side_effect=[
12911291
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
@@ -1322,7 +1322,7 @@ async def test_write_loop_polling_bad_connection(self):
13221322
c.ping_timeout = 2
13231323
c.current_transport = 'polling'
13241324
c.queue = mock.MagicMock()
1325-
c.queue.Empty = RuntimeError
1325+
c.queue_empty = RuntimeError
13261326
c.queue.get = mock.AsyncMock(
13271327
side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})]
13281328
)
@@ -1350,7 +1350,7 @@ async def test_write_loop_polling_bad_status(self):
13501350
c.ping_timeout = 2
13511351
c.current_transport = 'polling'
13521352
c.queue = mock.MagicMock()
1353-
c.queue.Empty = RuntimeError
1353+
c.queue_empty = RuntimeError
13541354
c.queue.get = mock.AsyncMock(
13551355
side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})]
13561356
)
@@ -1379,7 +1379,7 @@ async def test_write_loop_websocket_one_packet(self):
13791379
c.ping_timeout = 2
13801380
c.current_transport = 'websocket'
13811381
c.queue = mock.MagicMock()
1382-
c.queue.Empty = RuntimeError
1382+
c.queue_empty = RuntimeError
13831383
c.queue.get = mock.AsyncMock(
13841384
side_effect=[
13851385
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
@@ -1401,7 +1401,7 @@ async def test_write_loop_websocket_three_packets(self):
14011401
c.ping_timeout = 2
14021402
c.current_transport = 'websocket'
14031403
c.queue = mock.MagicMock()
1404-
c.queue.Empty = RuntimeError
1404+
c.queue_empty = RuntimeError
14051405
c.queue.get = mock.AsyncMock(
14061406
side_effect=[
14071407
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
@@ -1431,7 +1431,7 @@ async def test_write_loop_websocket_one_packet_binary(self):
14311431
c.ping_timeout = 2
14321432
c.current_transport = 'websocket'
14331433
c.queue = mock.MagicMock()
1434-
c.queue.Empty = RuntimeError
1434+
c.queue_empty = RuntimeError
14351435
c.queue.get = mock.AsyncMock(
14361436
side_effect=[packet.Packet(packet.MESSAGE, b'foo'), RuntimeError]
14371437
)
@@ -1450,7 +1450,7 @@ async def test_write_loop_websocket_bad_connection(self):
14501450
c.ping_timeout = 2
14511451
c.current_transport = 'websocket'
14521452
c.queue = mock.MagicMock()
1453-
c.queue.Empty = RuntimeError
1453+
c.queue_empty = RuntimeError
14541454
c.queue.get = mock.AsyncMock(
14551455
side_effect=[
14561456
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),

tests/common/test_client.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ def test_create(self):
3333
'ws',
3434
'read_loop_task',
3535
'write_loop_task',
36-
'queue',
3736
]:
3837
assert getattr(c, attr) is None, attr + ' is not None'
3938
assert c.state == 'disconnected'
39+
assert c.queue.__class__.__name__ == 'Queue'
40+
assert c.queue_empty.__name__ == 'Empty'
4041

4142
def test_custom_json(self):
4243
client.Client()
@@ -289,7 +290,7 @@ def test_sleep(self):
289290
def test_create_queue(self):
290291
c = client.Client()
291292
q = c.create_queue()
292-
with pytest.raises(q.Empty):
293+
with pytest.raises(c.queue_empty):
293294
q.get(timeout=0.01)
294295

295296
def test_create_event(self):
@@ -1499,7 +1500,7 @@ def test_write_loop_empty_queue(self):
14991500
c.ping_interval = 1
15001501
c.ping_timeout = 2
15011502
c.queue = mock.MagicMock()
1502-
c.queue.Empty = RuntimeError
1503+
c.queue_empty = RuntimeError
15031504
c.queue.get.side_effect = RuntimeError
15041505
c._write_loop()
15051506
c.queue.get.assert_called_once_with(timeout=7)
@@ -1512,7 +1513,7 @@ def test_write_loop_polling_one_packet(self):
15121513
c.ping_timeout = 2
15131514
c.current_transport = 'polling'
15141515
c.queue = mock.MagicMock()
1515-
c.queue.Empty = RuntimeError
1516+
c.queue_empty = RuntimeError
15161517
c.queue.get.side_effect = [
15171518
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
15181519
RuntimeError,
@@ -1541,7 +1542,7 @@ def test_write_loop_polling_three_packets(self):
15411542
c.ping_timeout = 2
15421543
c.current_transport = 'polling'
15431544
c.queue = mock.MagicMock()
1544-
c.queue.Empty = RuntimeError
1545+
c.queue_empty = RuntimeError
15451546
c.queue.get.side_effect = [
15461547
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
15471548
packet.Packet(packet.PING),
@@ -1576,7 +1577,7 @@ def test_write_loop_polling_two_packets_done(self):
15761577
c.ping_timeout = 2
15771578
c.current_transport = 'polling'
15781579
c.queue = mock.MagicMock()
1579-
c.queue.Empty = RuntimeError
1580+
c.queue_empty = RuntimeError
15801581
c.queue.get.side_effect = [
15811582
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
15821583
packet.Packet(packet.PING),
@@ -1610,7 +1611,7 @@ def test_write_loop_polling_bad_connection(self):
16101611
c.ping_timeout = 2
16111612
c.current_transport = 'polling'
16121613
c.queue = mock.MagicMock()
1613-
c.queue.Empty = RuntimeError
1614+
c.queue_empty = RuntimeError
16141615
c.queue.get.side_effect = [
16151616
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16161617
RuntimeError,
@@ -1639,7 +1640,7 @@ def test_write_loop_polling_bad_status(self):
16391640
c.ping_timeout = 2
16401641
c.current_transport = 'polling'
16411642
c.queue = mock.MagicMock()
1642-
c.queue.Empty = RuntimeError
1643+
c.queue_empty = RuntimeError
16431644
c.queue.get.side_effect = [
16441645
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16451646
RuntimeError,
@@ -1668,7 +1669,7 @@ def test_write_loop_websocket_one_packet(self):
16681669
c.ping_timeout = 2
16691670
c.current_transport = 'websocket'
16701671
c.queue = mock.MagicMock()
1671-
c.queue.Empty = RuntimeError
1672+
c.queue_empty = RuntimeError
16721673
c.queue.get.side_effect = [
16731674
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16741675
RuntimeError,
@@ -1688,7 +1689,7 @@ def test_write_loop_websocket_three_packets(self):
16881689
c.ping_timeout = 2
16891690
c.current_transport = 'websocket'
16901691
c.queue = mock.MagicMock()
1691-
c.queue.Empty = RuntimeError
1692+
c.queue_empty = RuntimeError
16921693
c.queue.get.side_effect = [
16931694
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16941695
packet.Packet(packet.PING),
@@ -1712,7 +1713,7 @@ def test_write_loop_websocket_one_packet_binary(self):
17121713
c.ping_timeout = 2
17131714
c.current_transport = 'websocket'
17141715
c.queue = mock.MagicMock()
1715-
c.queue.Empty = RuntimeError
1716+
c.queue_empty = RuntimeError
17161717
c.queue.get.side_effect = [
17171718
packet.Packet(packet.MESSAGE, b'foo'),
17181719
RuntimeError,
@@ -1732,7 +1733,7 @@ def test_write_loop_websocket_bad_connection(self):
17321733
c.ping_timeout = 2
17331734
c.current_transport = 'websocket'
17341735
c.queue = mock.MagicMock()
1735-
c.queue.Empty = RuntimeError
1736+
c.queue_empty = RuntimeError
17361737
c.queue.get.side_effect = [
17371738
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
17381739
RuntimeError,

0 commit comments

Comments
 (0)