@@ -77,7 +77,6 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
7777 self ._asset_counter = 0
7878 self ._position_counter = 0
7979 self ._order_counter = 0
80- # self.subscriptions = {} # subscription callbacks indexed by subscriber's ID
8180
8281 self .subscribed_symbols = None
8382 self .quote_changed = None
@@ -86,6 +85,8 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
8685 self .order_changed = None
8786 self .connect_callback = None
8887 self .disconnect_callback = None
88+ self .subscribe_callback = None
89+ self .unsubscribe_callback = None
8990 self .error_callback = None
9091 self ._connection_timeout = connection_timeout
9192 self ._auto_reconnect = auto_reconnect
@@ -226,11 +227,21 @@ def on_message(self, headers, body):
226227 items .append ((ORDER_KEYS_MAPPINGS .get (key ), value ))
227228 if items :
228229 self .order_changed (account , items )
230+ elif response_type == str (ResponseType .GET_SUBSCRIBE_END .value ):
231+ if self .subscribe_callback :
232+ self .subscribe_callback (headers .get ('destination' ), json .loads (body ))
233+ elif response_type == str (ResponseType .GET_CANCEL_SUBSCRIBE_END .value ):
234+ if self .unsubscribe_callback :
235+ self .unsubscribe_callback (headers .get ('destination' ), json .loads (body ))
236+ elif response_type == str (ResponseType .ERROR_END .value ):
237+ if self .error_callback :
238+ self .error_callback (body )
229239 except Exception as e :
230240 logging .error (e , exc_info = True )
231241
232242 def on_error (self , headers , body ):
233- pass
243+ if self .error_callback :
244+ self .error_callback (body )
234245
235246 @staticmethod
236247 def _get_subscribe_id (counter ):
0 commit comments