77from .entity import (
88 Quote , Trade , Agg , Entity ,
99)
10+ import logging
1011
1112
1213class StreamConn (object ):
1314 def __init__ (self , key_id = None ):
14- self ._key_id = key_id
15+ self ._key_id = key_id or os . environ . get ( 'APCA_API_KEY_ID' )
1516 self ._endpoint = os .environ .get (
1617 'POLYGON_WS_URL' ,
1718 'wss://alpaca.socket.polygon.io/stocks'
@@ -22,6 +23,7 @@ def __init__(self, key_id=None):
2223 self ._retry = int (os .environ .get ('APCA_RETRY_MAX' , 3 ))
2324 self ._retry_wait = int (os .environ .get ('APCA_RETRY_WAIT' , 3 ))
2425 self ._retries = 0
26+ self .loop = asyncio .get_event_loop ()
2527
2628 async def connect (self ):
2729 await self ._dispatch ({'ev' : 'status' ,
@@ -84,6 +86,9 @@ async def _recv(self):
8486 msg = json .loads (r )
8587 for update in msg :
8688 yield update
89+ except websockets .exceptions .ConnectionClosed :
90+ # This error occurs when we self.close() such as on KeyboardInterrupt
91+ pass
8792 except websockets .exceptions .ConnectionClosedError as e :
8893 await self ._dispatch ({'ev' : 'status' ,
8994 'status' : 'disconnected' ,
@@ -164,12 +169,15 @@ def run(self, initial_channels=[]):
164169 '''Run forever and block until exception is raised.
165170 initial_channels is the channels to start with.
166171 '''
167- loop = asyncio . get_event_loop ()
172+ loop = self . loop
168173 try :
169174 loop .run_until_complete (self .subscribe (initial_channels ))
170175 loop .run_forever ()
176+ except KeyboardInterrupt :
177+ logging .info ("Exiting on Interrupt" )
171178 finally :
172179 loop .run_until_complete (self .close ())
180+ loop .close ()
173181
174182 async def close (self ):
175183 '''Close any of open connections'''
0 commit comments