1313
1414from bintrees import FastRBTree
1515import aiofiles
16- import websockets
16+ import aiohttp
17+ # import websockets
1718
1819import gdax .trader
1920import gdax .utils
@@ -43,11 +44,14 @@ def __init__(self, product_ids='ETH-USD', api_key=None, api_secret=None,
4344 self ._asks = {product_id : FastRBTree () for product_id in product_ids }
4445 self ._bids = {product_id : FastRBTree () for product_id in product_ids }
4546 self ._sequences = {product_id : None for product_id in product_ids }
46- self ._ws = None
47+ self ._ws_session = None
4748 self ._ws_connect = None
49+ self ._ws = None
4850
4951 async def _init (self ):
50- self ._ws_connect = websockets .connect ('wss://ws-feed.gdax.com' )
52+ self ._ws_session = aiohttp .ClientSession ()
53+ self ._ws_connect = self ._ws_session .ws_connect (
54+ 'wss://ws-feed.gdax.com' )
5155 self ._ws = await self ._ws_connect .__aenter__ ()
5256
5357 # subscribe
@@ -89,7 +93,7 @@ async def __aenter__(self):
8993 return self
9094
9195 async def __aexit__ (self , exc_type , exc , traceback ):
92- return await self ._ws_connect .__aexit__ (exc_type , exc , traceback )
96+ return await self ._ws_session .__aexit__ (exc_type , exc , traceback )
9397
9498 async def _open_log_file (self ):
9599 if self .trade_log_file_path is not None :
@@ -101,10 +105,10 @@ async def _close_log_file(self):
101105 await self ._trade_file .__aexit__ (None , None , None )
102106
103107 async def _send (self , ** kwargs ):
104- await self ._ws .send ( json . dumps ( kwargs ) )
108+ await self ._ws .send_json ( kwargs )
105109
106110 async def _recv (self ):
107- json_data = await self ._ws .recv ()
111+ json_data = await self ._ws .receive_str ()
108112 if self ._trade_file :
109113 await self ._trade_file .write (f'W { json_data } \n ' )
110114 return json .loads (json_data )
@@ -132,9 +136,11 @@ async def _subscribe(self):
132136 async def handle_message (self ):
133137 try :
134138 message = await self ._recv ()
135- except websockets .exceptions .ConnectionClosed :
136- await self ._ws_connect .__aexit__ (None , None , None )
137- self ._init ()
139+ except aiohttp .ServerDisconnectedError as exc :
140+ logging .error (
141+ f'Error: Exception: f{ exc } . Re-initializing websocket.' )
142+ await self ._ws_session .__aexit__ (None , None , None )
143+ await self ._init ()
138144 return
139145
140146 product_id = message ['product_id' ]
@@ -146,10 +152,10 @@ async def handle_message(self):
146152 # from getProductOrderBook)
147153 return message
148154 elif sequence > self ._sequences [product_id ] + 1 :
149- logging .info (
155+ logging .error (
150156 'Error: messages missing ({} - {}). Re-initializing websocket.'
151157 .format (sequence , self ._sequences [product_id ]))
152- await self ._ws_connect .__aexit__ (None , None , None )
158+ await self ._ws_session .__aexit__ (None , None , None )
153159 await self ._init ()
154160 return
155161
@@ -243,7 +249,6 @@ def match(self, product_id, order):
243249 self .set_asks (product_id , price , asks )
244250
245251 def change (self , product_id , order ):
246- logging .info ((product_id , order ))
247252 if 'new_size' not in order :
248253 # market order
249254 # TODO
0 commit comments