Skip to content

Commit 0bcd06e

Browse files
Merge pull request #35 from scivisum/bug/RD-41528_dont_wait_for_messages
Bug/rd 41528 dont wait for messages
2 parents 65cade6 + 6cc4387 commit 0bcd06e

File tree

3 files changed

+29
-48
lines changed

3 files changed

+29
-48
lines changed

browserdebuggertools/wssessionmanager.py

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import socket
55
import time
66
import collections
7-
from threading import Thread, Event, Lock
7+
from threading import Thread, Lock
88

99
from typing import Dict
1010

@@ -22,43 +22,20 @@
2222
)
2323

2424

25-
class NotifiableDeque(collections.deque):
26-
""" A Queue with the benefits of deque speed
27-
It can wait until there are new messages or the timeout is met
28-
"""
29-
_POLL_INTERVAL = 1
30-
31-
def __init__(self):
32-
super(NotifiableDeque, self).__init__()
33-
self._poll_signal = Event()
34-
35-
def append(self, message):
36-
""" Appends to the queue and allows any waiting threads to start popping from it
37-
"""
38-
super(NotifiableDeque, self).append(message)
39-
self._poll_signal.set()
40-
41-
def wait_for_messages(self):
42-
""" Waits until there are messages or the poll interval time
43-
"""
44-
self._poll_signal.wait(self._POLL_INTERVAL)
45-
if self._poll_signal.is_set():
46-
self._poll_signal.clear()
47-
48-
4925
class _WSMessageProducer(Thread):
5026
""" Interfaces with the websocket to send messages from the send queue
5127
or put messages from the websocket into recv queue
5228
"""
5329
_CONN_TIMEOUT = 15
5430
_BLOCKED_TIMEOUT = 5
31+
_POLL_INTERVAL = 0.01 # How long to wait for new ws messages
5532

5633
def __init__(self, port, send_queue, on_message):
5734
super(_WSMessageProducer, self).__init__()
5835
self._port = port
5936
self._send_queue = send_queue
6037
self._on_message = on_message
61-
self._last_poll = None
38+
self._last_ws_attempt = None
6239
self._continue = True
6340

6441
self.exception = None
@@ -141,14 +118,12 @@ def run(self):
141118

142119
with self._ws_io():
143120

144-
self._last_poll = time.time()
145121
while self._continue:
146-
122+
self._last_ws_attempt = time.time()
147123
self._empty_send_queue()
148124
self._empty_websocket()
149125

150-
self._send_queue.wait_for_messages()
151-
self._last_poll = time.time()
126+
time.sleep(self._POLL_INTERVAL)
152127

153128
@property
154129
def blocked(self):
@@ -161,12 +136,13 @@ def blocked(self):
161136
some messages could allow us to reduce the load on the websocket
162137
so raising an exception in this case allows us to empty the send queue and try again.
163138
164-
** This could be solved by having a separate thread to handle sending messages,
165-
assuming ws.send() doesn't hang and is also thread safe.
166-
Then we could update self._last_poll after every successful ws.send()/ws.recv()
139+
** This could be solved by not handling sending messages in the thread,
140+
assuming ws.send() doesn't hang and is atomic.
141+
Then we could update self._last_ws_attempt after every successful ws send()/recv()
167142
"""
168143
return (
169-
self._last_poll and ((time.time() - self._last_poll) > self._BLOCKED_TIMEOUT)
144+
self._last_ws_attempt
145+
and ((time.time() - self._last_ws_attempt) > self._BLOCKED_TIMEOUT)
170146
)
171147

172148
def health_check(self):
@@ -237,7 +213,7 @@ def __init__(self, port, timeout, domains=None):
237213
self._message_producer_lock = Lock() # Lock making sure we don't create 2 ws connections
238214
self._last_not_ok = None
239215
self._message_producer_not_ok_count = 0
240-
self._send_queue = NotifiableDeque()
216+
self._send_queue = collections.deque()
241217

242218
self.port = port
243219
self._message_producer = None

tests/integrationtests/test_wssessionmanager.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from browserdebuggertools.exceptions import DevToolsTimeoutException, MaxRetriesException
1111
from browserdebuggertools.wssessionmanager import (
12-
WSSessionManager, _WSMessageProducer, NotifiableDeque
12+
WSSessionManager, _WSMessageProducer
1313
)
1414

1515
MODULE_PATH = "browserdebuggertools.WSSessionManager."
@@ -92,7 +92,6 @@ def test_locked_get_events(self):
9292
with patch.object(
9393
_WSMessageProducer, "_get_websocket", new=MagicMock(return_value=self.ws)
9494
):
95-
NotifiableDeque._MAX_QUEUE_BUFFER = 9999
9695
self.session_manager = WSSessionManager(1234, 1, {"Network": {}})
9796

9897
events = list(reversed(self.session_manager.get_events("Network", clear=True)))
@@ -159,6 +158,12 @@ def unblock(self):
159158
self._continue = False
160159

161160

161+
class TimeoutBlockingWS(BlockingWS):
162+
163+
def send(self, data):
164+
pass
165+
166+
162167
class ExceptionThrowingWS(_DummyWebsocket):
163168

164169
exceptions = 0
@@ -229,14 +234,14 @@ def test_thread_blocked_twice(self):
229234
def test_thread_blocks_causes_timeout(self):
230235

231236
with patch.object(_WSMessageProducer, "_get_websocket",
232-
new=MagicMock(return_value=BlockingWS(times_to_block=1))):
237+
new=MagicMock(return_value=TimeoutBlockingWS())):
233238

234239
self.session_manager = WSSessionManager(1234, 3)
235240
self.resetWS()
236241
with self.assertRaises(DevToolsTimeoutException):
237242
start = time.time()
238243
self.session_manager.execute("Network", "enable")
239-
self.assertLess(time.time() - start, 5)
244+
self.assertLess(time.time() - start, 5)
240245

241246
def test_max_thread_blocks_exceeded(self):
242247

tests/unittests/test_wssessionmanager.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import collections
12
import copy
23
import socket
3-
from threading import Event
44
from unittest import TestCase
55

66
from mock import patch, MagicMock, call, PropertyMock
@@ -13,7 +13,7 @@
1313
MaxRetriesException
1414
)
1515
from browserdebuggertools.wssessionmanager import (
16-
WSSessionManager, _WSMessageProducer, NotifiableDeque
16+
WSSessionManager, _WSMessageProducer
1717
)
1818

1919
MODULE_PATH = "browserdebuggertools.wssessionmanager."
@@ -31,7 +31,7 @@ def _get_websocket(self):
3131
return MagicMock()
3232

3333
def setUp(self):
34-
self.send_queue = NotifiableDeque()
34+
self.send_queue = collections.deque()
3535
self.messaging_thread = self.MockWSMessageProducer(1111, self.send_queue, MagicMock())
3636
self.ws_message_producer = self.messaging_thread
3737

@@ -179,7 +179,7 @@ def test_fail(self):
179179
class Test__WSMessageProducer_run(WSMessageProducerTest):
180180

181181
def prepare(self, time):
182-
NotifiableDeque._POLL_INTERVAL = 0
182+
_WSMessageProducer._POLL_INTERVAL = 0
183183
self.next_time = 0
184184

185185
def increment_time():
@@ -197,7 +197,7 @@ def test(self, time):
197197

198198
self.ws_message_producer.run()
199199

200-
self.assertEqual(10, self.ws_message_producer._last_poll)
200+
self.assertEqual(10, self.ws_message_producer._last_ws_attempt)
201201

202202
def test_exception(self, time):
203203
exception = Exception()
@@ -206,15 +206,15 @@ def test_exception(self, time):
206206

207207
self.ws_message_producer.run()
208208

209-
self.assertEqual(0, self.ws_message_producer._last_poll)
209+
self.assertEqual(0, self.ws_message_producer._last_ws_attempt)
210210
self.assertEqual(exception, self.ws_message_producer.exception)
211211

212212

213213
class Test__WSMessagingThread_blocked(WSMessageProducerTest):
214214

215215
def test_thread_not_started(self):
216216

217-
self.messaging_thread._last_poll = None
217+
self.messaging_thread._last_ws_attempt = None
218218

219219
self.assertFalse(self.messaging_thread.blocked)
220220

@@ -224,7 +224,7 @@ def test_thread_blocked(self, _time):
224224
now = 100
225225
_time.time.return_value = now
226226

227-
self.messaging_thread._last_poll = now - self.messaging_thread._BLOCKED_TIMEOUT - 1
227+
self.messaging_thread._last_ws_attempt = now - self.messaging_thread._BLOCKED_TIMEOUT - 1
228228

229229
self.assertTrue(self.messaging_thread.blocked)
230230

@@ -234,7 +234,7 @@ def test_thread_not_blocked(self, _time):
234234
now = 100
235235
_time.time.return_value = now
236236

237-
self.messaging_thread._last_poll = now - self.messaging_thread._BLOCKED_TIMEOUT + 1
237+
self.messaging_thread._last_ws_attempt = now - self.messaging_thread._BLOCKED_TIMEOUT + 1
238238

239239
self.assertFalse(self.messaging_thread.blocked)
240240

0 commit comments

Comments
 (0)