77import json
88import logging
99import sys
10+ import os
1011from collections import defaultdict
1112
1213import stomp
2223 SUBSCRIPTION_TRADE_ORDER
2324from tigeropen .common .consts .push_types import RequestType , ResponseType
2425from tigeropen .common .consts .quote_keys import QuoteChangeKey , QuoteKeyType
26+ from tigeropen .common .exceptions import ApiException
2527from tigeropen .common .util .string_utils import camel_to_underline
2628from tigeropen .common .util .common_utils import get_enum_value
2729from tigeropen .common .util .order_utils import get_order_status
2830from tigeropen .common .util .signature_utils import sign_with_rsa
31+ from tigeropen .push import _patch_ssl
2932
3033HOUR_TRADING_QUOTE_KEYS_MAPPINGS = {'hourTradingLatestPrice' : 'latest_price' , 'hourTradingPreClose' : 'pre_close' ,
3134 'hourTradingLatestTime' : 'latest_time' , 'hourTradingVolume' : 'volume' ,
6770
6871
6972class PushClient (stomp .ConnectionListener ):
70- def __init__ (self , host , port , use_ssl = True , connection_timeout = 120 , auto_reconnect = True ,
71- heartbeats = (30 * 1000 , 30 * 1000 )):
73+ def __init__ (self , host , port , use_ssl = True , connection_timeout = 120 , heartbeats = (30 * 1000 , 30 * 1000 )):
7274 """
7375 :param host:
7476 :param port:
7577 :param use_ssl:
76- :param connection_timeout: second
77- :param auto_reconnect:
78+ :param connection_timeout: unit: second. The timeout value should be greater the heartbeats interval
7879 :param heartbeats: tuple of millisecond
7980 """
8081 self .host = host
@@ -97,8 +98,8 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
9798 self .unsubscribe_callback = None
9899 self .error_callback = None
99100 self ._connection_timeout = connection_timeout
100- self ._auto_reconnect = auto_reconnect
101101 self ._heartbeats = heartbeats
102+ _patch_ssl ()
102103
103104 def _connect (self ):
104105 try :
@@ -108,12 +109,13 @@ def _connect(self):
108109 except :
109110 pass
110111
111- self ._stomp_connection = stomp .Connection12 (host_and_ports = [(self .host , self .port ), ], use_ssl = self . use_ssl ,
112+ self ._stomp_connection = stomp .Connection12 (host_and_ports = [(self .host , self .port )] ,
112113 keepalive = KEEPALIVE , timeout = self ._connection_timeout ,
113114 heartbeats = self ._heartbeats )
114115 self ._stomp_connection .set_listener ('push' , self )
115- self ._stomp_connection .start ()
116116 try :
117+ if self .use_ssl :
118+ self ._stomp_connection .set_ssl ([(self .host , self .port )])
117119 self ._stomp_connection .connect (self ._tiger_id , self ._sign , wait = True , headers = self ._generate_headers ())
118120 except ConnectFailedException as e :
119121 raise e
@@ -128,23 +130,26 @@ def disconnect(self):
128130 if self ._stomp_connection :
129131 self ._stomp_connection .disconnect ()
130132
131- def on_connected (self , headers , body ):
133+ def on_connected (self , frame ):
132134 if self .connect_callback :
133- self .connect_callback ()
135+ self .connect_callback (frame )
134136
135137 def on_disconnected (self ):
136138 if self .disconnect_callback :
137139 self .disconnect_callback ()
138- elif self ._auto_reconnect :
139- self ._connect ()
140140
141- def on_message (self , headers , body ):
141+ def on_message (self , frame ):
142142 """
143143 Called by the STOMP connection when a MESSAGE frame is received.
144144
145- :param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
146- :param body: the frame's payload - the message body.
145+ :param Frame frame: the stomp frame. stomp.utils.Frame
146+ A STOMP frame's attributes:
147+ cmd: the protocol command
148+ headers: a map of headers for the frame
149+ body: the content of the frame.
147150 """
151+ headers = frame .headers
152+ body = frame .body
148153 try :
149154 response_type = headers .get ('ret-type' )
150155 if response_type == str (ResponseType .GET_SUB_SYMBOLS_END .value ):
@@ -255,11 +260,17 @@ def on_message(self, headers, body):
255260 except Exception as e :
256261 logging .error (e , exc_info = True )
257262
258- def on_error (self , headers , body ):
263+ def on_error (self , frame ):
264+ body = json .loads (frame .body )
265+ if body .get ('code' ) == 4001 :
266+ logging .error (body )
267+ self .disconnect_callback = None
268+ raise ApiException (4001 , body .get ('message' ))
269+
259270 if self .error_callback :
260- self .error_callback (body )
271+ self .error_callback (frame )
261272 else :
262- logging .error (body )
273+ logging .error (frame . body )
263274
264275 def _update_subscribe_id (self , destination ):
265276 self ._destination_counter_map [destination ] += 1
0 commit comments