11import asyncio
22import json
3+ import time
34import re
45import os
56import websockets
@@ -17,10 +18,17 @@ def __init__(self, key_id=None):
1718 ).rstrip ('/' )
1819 self ._handlers = {}
1920 self ._ws = None
21+ self ._retry = int (os .environ .get ('APCA_RETRY_MAX' , 3 ))
22+ self ._retry_wait = int (os .environ .get ('APCA_RETRY_WAIT' , 3 ))
23+ self ._retries = 0
2024
2125 async def connect (self ):
22- print ('connecting to polygon' )
26+ await self ._dispatch ('status' ,
27+ {'ev' : 'status' ,
28+ 'status' : 'connecting' ,
29+ 'message' : 'Connecting to Polygon' })
2330 ws = await websockets .connect (self ._endpoint )
31+
2432 await ws .send (json .dumps ({
2533 'action' : 'auth' ,
2634 'params' : self ._key_id
@@ -35,14 +43,16 @@ async def connect(self):
3543 .format (msg ))
3644 )
3745
46+ self ._retries = 0
3847 self ._ws = ws
3948 await self ._dispatch ('authorized' , msg [0 ])
4049
4150 asyncio .ensure_future (self ._consume_msg ())
42- return ws
4351
4452 async def _consume_msg (self ):
4553 ws = self ._ws
54+ if not ws :
55+ return
4656 try :
4757 while True :
4858 r = await ws .recv ()
@@ -53,14 +63,31 @@ async def _consume_msg(self):
5363 stream = update .get ('ev' )
5464 if stream is not None :
5565 await self ._dispatch (stream , update )
66+ except websockets .exceptions .ConnectionClosedError :
67+ await self ._dispatch ('status' ,
68+ {'ev' : 'status' ,
69+ 'status' : 'disconnected' ,
70+ 'message' :
71+ 'Polygon Disconnected Unexpectedly' })
5672 finally :
57- await ws .close ()
73+ if self ._ws is not None :
74+ await self ._ws .close ()
5875 self ._ws = None
76+ asyncio .ensure_future (self ._ensure_ws ())
5977
6078 async def _ensure_ws (self ):
6179 if self ._ws is not None :
6280 return
63- self ._ws = await self .connect ()
81+ try :
82+ await self .connect ()
83+ except Exception :
84+ self ._ws = None
85+ self ._retries += 1
86+ time .sleep (self ._retry_wait )
87+ if self ._retries <= self ._retry :
88+ asyncio .ensure_future (self ._ensure_ws ())
89+ else :
90+ raise ConnectionError ("Max Retries Exceeded" )
6491
6592 async def subscribe (self , channels ):
6693 '''Start subscribing channels.
0 commit comments