diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 44b4444..208e139 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -79,7 +79,7 @@ def on_event(self, event, *args): """ callback, args = find_callback(args) if callback: - callback(*args) + callback(args) def on_error(self, reason, advice): 'Called after server sends an error; you can override this method' @@ -117,7 +117,7 @@ def _find_event_callback(self, event): return getattr( self, 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, *args)) + lambda *args: self.on_event(event, args)) class LoggingNamespace(BaseNamespace): @@ -148,7 +148,7 @@ def on_event(self, event, *args): arguments.append('callback(*args)') self._log(logging.INFO, '%s [event] %s(%s)', self.path, event, ', '.join(arguments)) - super(LoggingNamespace, self).on_event(event, *args) + super(LoggingNamespace, self).on_event(event, args) def on_error(self, reason, advice): self._log(logging.INFO, '%s [error] %s', self.path, advice) @@ -237,7 +237,7 @@ def emit(self, event, *args, **kw): callback, args = find_callback(args, kw) self._transport.emit(path, event, args, callback) - def wait(self, seconds=None, for_callbacks=False): + def wait(self, seconds=None, for_callbacks=False, on_error=None, error_args=None): """Wait in a loop and process events as defined in the namespaces. - Omit seconds, i.e. call wait() without arguments, to wait forever. @@ -254,6 +254,8 @@ def wait(self, seconds=None, for_callbacks=False): pass next(self._heartbeat_pacemaker) except ConnectionError as e: + if on_error is not None and callable(on_error): + on_error(e, error_args) try: warning = Exception('[connection error] %s' % e) warning_screen.throw(warning) @@ -274,8 +276,8 @@ def _process_events(self, timeout=None): def _process_packet(self, packet): code, packet_id, path, data = packet - namespace = self.get_namespace(path) - delegate = self._get_delegate(code) + namespace = self.get_namespace(path or '') + delegate = self._get_delegate(code, packet) delegate(packet, namespace._find_event_callback) def _stop_waiting(self, for_callbacks): @@ -315,7 +317,7 @@ def _transport(self): try: if self.connected: return self.__transport - except AttributeError: + except AttributeError as e: pass socketIO_session = self._get_socketIO_session() supported_transports = self._get_supported_transports(socketIO_session) @@ -395,7 +397,7 @@ def get_namespace(self, path=''): except KeyError: raise PacketError('unhandled namespace path (%s)' % path) - def _get_delegate(self, code): + def _get_delegate(self, code, packet): try: return { '0': self._on_disconnect, @@ -407,9 +409,10 @@ def _get_delegate(self, code): '6': self._on_ack, '7': self._on_error, '8': self._on_noop, + '': self._on_noop }[code] except KeyError: - raise PacketError('unexpected code (%s)' % code) + raise PacketError('unexpected code ({}): {}'.format([code], packet)) def _on_disconnect(self, packet, find_event_callback): find_event_callback('disconnect')() @@ -422,26 +425,27 @@ def _on_heartbeat(self, packet, find_event_callback): def _on_message(self, packet, find_event_callback): code, packet_id, path, data = packet - args = [data] + args = data + self._send_args('message', args, path, packet_id, find_event_callback) + + def _send_args(self, event, args, path, packet_id, find_event_callback): + ev_args = [args] if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) + ack = self._prepare_to_send_ack(path, packet_id) + ev_args.append(ack) + find_event_callback(event)(*ev_args) def _on_json(self, packet, find_event_callback): code, packet_id, path, data = packet - args = [json.loads(data)] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) + args = json.loads(data) + self._send_args('message', args, path, packet_id, find_event_callback) def _on_event(self, packet, find_event_callback): code, packet_id, path, data = packet value_by_name = json.loads(data) event = value_by_name['name'] args = value_by_name.get('args', []) - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback(event)(*args) + self._send_args(event, args, path, packet_id, find_event_callback) def _on_ack(self, packet, find_event_callback): code, packet_id, path, data = packet @@ -452,7 +456,7 @@ def _on_ack(self, packet, find_event_callback): except KeyError: return args = json.loads(data_parts[1]) if len(data_parts) > 1 else [] - ack_callback(*args) + ack_callback(args) def _on_error(self, packet, find_event_callback): code, packet_id, path, data = packet diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 21e046e..e479cd1 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -8,6 +8,7 @@ import sys import time import websocket +import re from .exceptions import ConnectionError, TimeoutError from .symmetries import _get_text @@ -85,6 +86,8 @@ def noop(self, path=''): def send_packet(self, code, path='', data='', callback=None): packet_id = self.set_ack_callback(callback) if callback else '' packet_parts = str(code), packet_id, path, encode_unicode(data) + if not data: + packet_parts = packet_parts[:-1] packet_text = ':'.join(packet_parts) self.send(packet_text) self._log(logging.DEBUG, '[packet sent] %s', packet_text) @@ -95,22 +98,34 @@ def recv_packet(self, timeout=None): yield self._packets.pop(0) except IndexError: pass - for packet_text in self.recv(timeout=timeout): - self._log(logging.DEBUG, '[packet received] %s', packet_text) - try: - packet_parts = packet_text.split(':', 3) - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', packet_text) - continue - code, packet_id, path, data = None, None, None, None - packet_count = len(packet_parts) - if 4 == packet_count: - code, packet_id, path, data = packet_parts - elif 3 == packet_count: - code, packet_id, path = packet_parts - elif 1 == packet_count: - code = packet_parts[0] - yield code, packet_id, path, data + for packet_texts in self.recv(timeout=timeout): + #remove packet separator + packet_texts = re.sub('^\xef\xbf\xbd\w+\xef\xbf\xbd', '', + packet_texts) + + packets = packet_texts.split('\xef\xbf\xbd')[::2] + + for packet_text in packets: + self._log(logging.DEBUG, '[packet received] %s', packet_text) + sep_count = packet_text.count('\xef\xbf\xbd')/2 + + try: + packet_parts = packet_text.split(':', 3) + except AttributeError: + self._log(logging.WARNING, '[packet error] %s', packet_text) + continue + code, packet_id, path, data = None, None, None, None + packet_count = len(packet_parts) + if 4 == packet_count: + code, packet_id, path, data = packet_parts + elif 3 == packet_count: + code, packet_id, path = packet_parts + elif 1 == packet_count: + code = packet_parts[0] + if code and len(code) > 1: + code = code[-1] + + yield code, packet_id, path, data def _enqueue_packet(self, packet): self._packets.append(packet) @@ -143,6 +158,8 @@ def __init__(self, socketIO_session, is_secure, base_url, **kw): http_session = _prepare_http_session(kw) req = http_session.prepare_request(requests.Request('GET', url)) headers = ['%s: %s' % item for item in req.headers.items()] + headers.append('Connection: keep-alive') + try: self._connection = websocket.create_connection(url, header=headers) except socket.timeout as e: @@ -318,10 +335,10 @@ def _get_response(request, *args, **kw): response = request(*args, **kw) except requests.exceptions.Timeout as e: raise TimeoutError(e) - except requests.exceptions.ConnectionError as e: - raise ConnectionError(e) except requests.exceptions.SSLError as e: raise ConnectionError('could not negotiate SSL (%s)' % e) + except requests.exceptions.ConnectionError as e: + raise ConnectionError(e) status = response.status_code if 200 != status: raise ConnectionError('unexpected status code (%s)' % status)