Skip to content

Commit 93d3ad8

Browse files
committed
upgrade stomp.py
1 parent 7d6c952 commit 93d3ad8

File tree

4 files changed

+47
-17
lines changed

4 files changed

+47
-17
lines changed

requirements.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
simplejson==3.17.3
1+
simplejson==3.17.6
22
delorean==1.0.0
33
pandas
44
python-dateutil==2.8.2
5-
pytz==2021.1
5+
pytz==2022.1
66
pyasn1==0.4.8
7-
rsa==4.7.2
8-
stomp.py==4.1.24
9-
getmac==0.8.2
7+
rsa==4.8
8+
stomp.py==8.0.1
9+
getmac==0.8.3

tigeropen/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
55
@author: gaoan
66
"""
7-
__VERSION__ = '2.1.1'
7+
__VERSION__ = '2.1.2'

tigeropen/push/__init__.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,26 @@
33
Created on 2018/10/29
44
55
@author: gaoan
6-
"""
6+
"""
7+
import ssl
8+
import time
9+
10+
11+
def _patch_ssl(wait=0.006):
12+
def new_wrap_socket(self, sock, server_side=False,
13+
do_handshake_on_connect=True,
14+
suppress_ragged_eofs=True,
15+
server_hostname=None, session=None):
16+
time.sleep(wait)
17+
return self.sslsocket_class._create(
18+
sock=sock,
19+
server_side=server_side,
20+
do_handshake_on_connect=do_handshake_on_connect,
21+
suppress_ragged_eofs=suppress_ragged_eofs,
22+
server_hostname=server_hostname,
23+
context=self,
24+
session=session
25+
)
26+
ssl.SSLContext.old_wrap_socket = ssl.SSLContext.wrap_socket
27+
ssl.SSLContext.wrap_socket = new_wrap_socket
28+

tigeropen/push/push_client.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from tigeropen.common.util.common_utils import get_enum_value
2727
from tigeropen.common.util.order_utils import get_order_status
2828
from tigeropen.common.util.signature_utils import sign_with_rsa
29+
from tigeropen.push import _patch_ssl
2930

3031
HOUR_TRADING_QUOTE_KEYS_MAPPINGS = {'hourTradingLatestPrice': 'latest_price', 'hourTradingPreClose': 'pre_close',
3132
'hourTradingLatestTime': 'latest_time', 'hourTradingVolume': 'volume',
@@ -73,7 +74,7 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
7374
:param host:
7475
:param port:
7576
:param use_ssl:
76-
:param connection_timeout: second
77+
param connection_timeout: unit: second. The timeout value should be greater the heartbeats interval
7778
:param auto_reconnect:
7879
:param heartbeats: tuple of millisecond
7980
"""
@@ -99,6 +100,7 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
99100
self._connection_timeout = connection_timeout
100101
self._auto_reconnect = auto_reconnect
101102
self._heartbeats = heartbeats
103+
_patch_ssl()
102104

103105
def _connect(self):
104106
try:
@@ -108,12 +110,13 @@ def _connect(self):
108110
except:
109111
pass
110112

111-
self._stomp_connection = stomp.Connection12(host_and_ports=[(self.host, self.port), ], use_ssl=self.use_ssl,
113+
self._stomp_connection = stomp.Connection12(host_and_ports=[(self.host, self.port)],
112114
keepalive=KEEPALIVE, timeout=self._connection_timeout,
113115
heartbeats=self._heartbeats)
114116
self._stomp_connection.set_listener('push', self)
115-
self._stomp_connection.start()
116117
try:
118+
if self.use_ssl:
119+
self._stomp_connection.set_ssl([(self.host, self.port)])
117120
self._stomp_connection.connect(self._tiger_id, self._sign, wait=True, headers=self._generate_headers())
118121
except ConnectFailedException as e:
119122
raise e
@@ -128,7 +131,7 @@ def disconnect(self):
128131
if self._stomp_connection:
129132
self._stomp_connection.disconnect()
130133

131-
def on_connected(self, headers, body):
134+
def on_connected(self, frame):
132135
if self.connect_callback:
133136
self.connect_callback()
134137

@@ -138,13 +141,18 @@ def on_disconnected(self):
138141
elif self._auto_reconnect:
139142
self._connect()
140143

141-
def on_message(self, headers, body):
144+
def on_message(self, frame):
142145
"""
143146
Called by the STOMP connection when a MESSAGE frame is received.
144147
145-
:param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
146-
:param body: the frame's payload - the message body.
148+
:param Frame frame: the stomp frame. stomp.utils.Frame
149+
A STOMP frame's attributes:
150+
cmd: the protocol command
151+
headers: a map of headers for the frame
152+
body: the content of the frame.
147153
"""
154+
headers = frame.headers
155+
body = frame.body
148156
try:
149157
response_type = headers.get('ret-type')
150158
if response_type == str(ResponseType.GET_SUB_SYMBOLS_END.value):
@@ -255,11 +263,11 @@ def on_message(self, headers, body):
255263
except Exception as e:
256264
logging.error(e, exc_info=True)
257265

258-
def on_error(self, headers, body):
266+
def on_error(self, frame):
259267
if self.error_callback:
260-
self.error_callback(body)
268+
self.error_callback(frame)
261269
else:
262-
logging.error(body)
270+
logging.error(frame.body)
263271

264272
def _update_subscribe_id(self, destination):
265273
self._destination_counter_map[destination] += 1

0 commit comments

Comments
 (0)