11import os
22from enum import Enum
33from typing import Optional , Union , List , Set , Callable , Awaitable
4+ import logging
45import json
56import asyncio
67import ssl
78import certifi
89from .models import *
910from websockets .client import connect , WebSocketClientProtocol
1011from websockets .exceptions import ConnectionClosedOK , ConnectionClosedError
12+ from ..logging import get_logger
13+ import logging
1114
1215env_key = "POLYGON_API_KEY"
16+ logger = get_logger ("WebSocketClient" )
1317
1418
1519class AuthError (Exception ):
@@ -35,7 +39,7 @@ def __init__(
3539 :param api_key: Your API keYour API key.
3640 :param feed: The feed to subscribe to.
3741 :param raw: Whether to pass raw Union[str, bytes] to user callback.
38- :param verbose: Whether to print client and server status messages.
42+ :param verbose: Whether to log client and server status messages.
3943 :param subscriptions: List of subscription parameters.
4044 :param max_reconnects: How many times to reconnect on network outage before ending .connect event loop.
4145 :return: A client.
@@ -48,7 +52,8 @@ def __init__(
4852 self .feed = feed
4953 self .market = market
5054 self .raw = raw
51- self .verbose = verbose
55+ if verbose :
56+ logger .setLevel (logging .DEBUG )
5257 self .websocket_cfg = kwargs
5358 if isinstance (feed , Enum ):
5459 feed = feed .value
@@ -82,8 +87,7 @@ async def connect(
8287 :raises AuthError: If invalid API key is supplied.
8388 """
8489 reconnects = 0
85- if self .verbose :
86- print ("connect:" , self .url )
90+ logger .debug ("connect: %s" , self .url )
8791 # darwin needs some extra <3
8892 ssl_context = None
8993 if self .url .startswith ("wss://" ):
@@ -96,21 +100,19 @@ async def connect(
96100 self .websocket = s
97101 try :
98102 msg = await s .recv ()
99- if self .verbose :
100- print ("connected:" , msg )
101- if self .verbose :
102- print ("authing:" )
103+ logger .debug ("connected: %s" , msg )
104+ logger .debug ("authing..." )
103105 await s .send (json .dumps ({"action" : "auth" , "params" : self .api_key }))
104106 auth_msg = await s .recv ()
105107 auth_msg_parsed = json .loads (auth_msg )
106- if self .verbose :
107- print ("authed:" , auth_msg )
108+ logger .debug ("authed: %s" , auth_msg )
108109 if auth_msg_parsed [0 ]["status" ] == "auth_failed" :
109110 raise AuthError (auth_msg_parsed [0 ]["message" ])
110111 while True :
111112 if self .schedule_resub :
112- if self .verbose :
113- print ("reconciling:" , self .subs , self .scheduled_subs )
113+ logger .debug (
114+ "reconciling: %s %s" , self .subs , self .scheduled_subs
115+ )
114116 new_subs = self .scheduled_subs .difference (self .subs )
115117 await self ._subscribe (new_subs )
116118 old_subs = self .subs .difference (self .scheduled_subs )
@@ -126,21 +128,18 @@ async def connect(
126128 msgJson = json .loads (cmsg ) # type: ignore
127129 for m in msgJson :
128130 if m ["ev" ] == "status" :
129- if self .verbose :
130- print ("status:" , m ["message" ])
131+ logger .debug ("status: %s" , m ["message" ])
131132 continue
132133 if not self .raw :
133- cmsg = parse (msgJson )
134+ cmsg = parse (msgJson , logger )
134135
135136 if len (cmsg ) > 0 :
136137 await processor (cmsg ) # type: ignore
137138 except ConnectionClosedOK :
138- if self .verbose :
139- print ("connection closed (OK)" )
139+ logger .debug ("connection closed (OK)" )
140140 return
141141 except ConnectionClosedError :
142- if self .verbose :
143- print ("connection closed (error)" )
142+ logger .debug ("connection closed (ERR)" )
144143 reconnects += 1
145144 if self .max_reconnects is not None and reconnects > self .max_reconnects :
146145 return
@@ -172,24 +171,22 @@ async def _subscribe(self, topics: Union[List[str], Set[str]]):
172171 if self .websocket is None or len (topics ) == 0 :
173172 return
174173 subs = "," .join (topics )
175- if self .verbose :
176- print ("subbing:" , subs )
174+ logger .debug ("subbing: %s" , subs )
177175 await self .websocket .send (json .dumps ({"action" : "subscribe" , "params" : subs }))
178176
179177 async def _unsubscribe (self , topics : Union [List [str ], Set [str ]]):
180178 if self .websocket is None or len (topics ) == 0 :
181179 return
182180 subs = "," .join (topics )
183- if self .verbose :
184- print ("unsubbing:" , subs )
181+ logger .debug ("unsubbing: %s" , subs )
185182 await self .websocket .send (json .dumps ({"action" : "unsubscribe" , "params" : subs }))
186183
187184 @staticmethod
188185 def _parse_subscription (s : str ):
189186 s = s .strip ()
190187 split = s .split ("." )
191188 if len (split ) != 2 :
192- print ("invalid subscription:" , s )
189+ logger . warning ("invalid subscription:" , s )
193190 return [None , None ]
194191
195192 return split
@@ -204,8 +201,7 @@ def subscribe(self, *subscriptions: str):
204201 topic , sym = self ._parse_subscription (s )
205202 if topic == None :
206203 continue
207- if self .verbose :
208- print ("add:" , s )
204+ logger .debug ("sub desired: %s" , s )
209205 self .scheduled_subs .add (s )
210206 # If user subs to X.*, remove other X.\w+
211207 if sym == "*" :
@@ -225,8 +221,7 @@ def unsubscribe(self, *subscriptions: str):
225221 topic , sym = self ._parse_subscription (s )
226222 if topic == None :
227223 continue
228- if self .verbose :
229- print ("discard:" , s )
224+ logger .debug ("sub undesired: %s" , s )
230225 self .scheduled_subs .discard (s )
231226
232227 # If user unsubs to X.*, remove other X.\w+
@@ -248,11 +243,10 @@ async def close(self):
248243 """
249244 Close the websocket connection.
250245 """
251- if self .verbose :
252- print ("closing:" )
246+ logger .debug ("closing" )
253247
254248 if self .websocket :
255249 await self .websocket .close ()
256250 self .websocket = None
257251 else :
258- print ("no websocket open to close" )
252+ logger . warning ("no websocket open to close" )
0 commit comments