Skip to content

Commit e9db467

Browse files
authored
Hotfixes (#1)
* Fix issue where occasionally two transported would be instantiated Occasionally the _transport property of EngineIO might be accessed from separate threads which might result in creation of two transports. This occasionally leads to issues where messages are lost. Fixed by protecting the _transport property with a lock to avoid entering it while another access is ongoing. * Protect also send_packet with lock * Don't try to send packets if transport is gone * Don't call wait for event if it doesn't exist * Fix errors during socket close * Call transport disconnect when disconnecting * Don't process packet if it's empty * some fixes for hanging connections * small fix that should fix hanging http sessions from the background * some fixes for lock releases.
1 parent baae546 commit e9db467

File tree

3 files changed

+37
-15
lines changed

3 files changed

+37
-15
lines changed

socketIO_client/__init__.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import atexit
2-
2+
from threading import Lock
33
from .exceptions import ConnectionError, TimeoutError, PacketError
44
from .heartbeats import HeartbeatThread
55
from .logs import LoggingMixin
@@ -48,6 +48,7 @@ def __init__(
4848
self._opened = False
4949
self._wants_to_close = False
5050
atexit.register(self._close)
51+
self._transport_lock = Lock()
5152

5253
if Namespace:
5354
self.define(Namespace)
@@ -57,13 +58,16 @@ def __init__(
5758

5859
@property
5960
def _transport(self):
60-
if self._opened:
61-
return self._transport_instance
62-
self._engineIO_session = self._get_engineIO_session()
63-
self._negotiate_transport()
64-
self._connect_namespaces()
65-
self._opened = True
66-
self._reset_heartbeat()
61+
self._transport_lock.acquire()
62+
try:
63+
if not self._opened and not self._wants_to_close:
64+
self._engineIO_session = self._get_engineIO_session()
65+
self._negotiate_transport()
66+
self._connect_namespaces()
67+
self._opened = True
68+
self._reset_heartbeat()
69+
finally:
70+
self._transport_lock.release()
6771
return self._transport_instance
6872

6973
def _get_engineIO_session(self):
@@ -194,15 +198,20 @@ def _close(self):
194198
try:
195199
self._heartbeat_thread.halt()
196200
self._heartbeat_thread.join()
201+
self._heartbeat_thread = None
197202
except AttributeError:
198203
pass
199204
if not hasattr(self, '_opened') or not self._opened:
205+
self._http_session.close()
200206
return
201207
engineIO_packet_type = 1
202208
try:
203209
self._transport_instance.send_packet(engineIO_packet_type)
204210
except (TimeoutError, ConnectionError):
205211
pass
212+
finally:
213+
self._http_session.close()
214+
self._transport_instance.disconnect()
206215
self._opened = False
207216

208217
def _ping(self, engineIO_packet_data=''):
@@ -266,7 +275,8 @@ def wait(self, seconds=None, **kw):
266275
namespace.on_disconnect()
267276
except PacketError:
268277
pass
269-
self._heartbeat_thread.relax()
278+
if self._heartbeat_thread:
279+
self._heartbeat_thread.relax()
270280
self._transport.set_timeout()
271281

272282
def _should_stop_waiting(self):

socketIO_client/heartbeats.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def run(self):
3131
interval_in_seconds = self._hurry_interval_in_seconds
3232
else:
3333
interval_in_seconds = self._relax_interval_in_seconds
34-
self._rest.wait(interval_in_seconds)
34+
if self._rest and hasattr(self._rest, 'wait'):
35+
self._rest.wait(interval_in_seconds)
3536
except ConnectionError:
3637
logging.debug('[heartbeat connection error]')
3738

socketIO_client/transports.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import requests
22
import six
33
import ssl
4+
from threading import Lock
45
import threading
56
import time
67
from socket import error as SocketError
@@ -45,6 +46,9 @@ def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
4546
def set_timeout(self, seconds=None):
4647
pass
4748

49+
def disconnect(self):
50+
pass
51+
4852

4953
class XHR_PollingTransport(AbstractTransport):
5054

@@ -138,6 +142,7 @@ def __init__(self, http_session, is_secure, url, engineIO_session=None):
138142
self._connection = create_connection(ws_url, **kw)
139143
except Exception as e:
140144
raise ConnectionError(e)
145+
self.lock = Lock()
141146

142147
def recv_packet(self):
143148
try:
@@ -150,24 +155,30 @@ def recv_packet(self):
150155
raise ConnectionError('recv disconnected (%s)' % e)
151156
except SocketError as e:
152157
raise ConnectionError('recv disconnected (%s)' % e)
153-
if not isinstance(packet_text, six.binary_type):
154-
packet_text = six.b(packet_text)
155-
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
156-
packet_text)
157-
yield engineIO_packet_type, engineIO_packet_data
158+
if packet_text:
159+
if not isinstance(packet_text, six.binary_type):
160+
packet_text = six.b(packet_text)
161+
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
162+
packet_text)
163+
yield engineIO_packet_type, engineIO_packet_data
158164

159165
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
160166
packet = format_packet_text(engineIO_packet_type, engineIO_packet_data)
167+
self.lock.acquire()
161168
try:
162169
self._connection.send(packet)
163170
except WebSocketTimeoutException as e:
164171
raise TimeoutError('send timed out (%s)' % e)
165172
except (SocketError, WebSocketConnectionClosedException) as e:
166173
raise ConnectionError('send disconnected (%s)' % e)
174+
finally:
175+
self.lock.release()
167176

168177
def set_timeout(self, seconds=None):
169178
self._connection.settimeout(seconds or self._timeout)
170179

180+
def disconnect(self):
181+
self._connection.close()
171182

172183
def get_response(request, *args, **kw):
173184
try:

0 commit comments

Comments
 (0)