1
1
import os
2
2
from enum import Enum
3
3
from typing import Optional , Union , List , Set , Callable , Awaitable
4
+ import logging
4
5
import json
5
6
import asyncio
6
7
import ssl
7
8
import certifi
8
9
from .models import *
9
10
from websockets .client import connect , WebSocketClientProtocol
10
11
from websockets .exceptions import ConnectionClosedOK , ConnectionClosedError
12
+ from ..logging import get_logger
13
+ import logging
11
14
12
15
env_key = "POLYGON_API_KEY"
16
+ logger = get_logger ("WebSocketClient" )
13
17
14
18
15
19
class AuthError (Exception ):
@@ -35,7 +39,7 @@ def __init__(
35
39
:param api_key: Your API keYour API key.
36
40
:param feed: The feed to subscribe to.
37
41
:param raw: Whether to pass raw Union[str, bytes] to user callback.
38
- :param verbose: Whether to print client and server status messages.
42
+ :param verbose: Whether to log client and server status messages.
39
43
:param subscriptions: List of subscription parameters.
40
44
:param max_reconnects: How many times to reconnect on network outage before ending .connect event loop.
41
45
:return: A client.
@@ -48,7 +52,8 @@ def __init__(
48
52
self .feed = feed
49
53
self .market = market
50
54
self .raw = raw
51
- self .verbose = verbose
55
+ if verbose :
56
+ logger .setLevel (logging .DEBUG )
52
57
self .websocket_cfg = kwargs
53
58
if isinstance (feed , Enum ):
54
59
feed = feed .value
@@ -82,8 +87,7 @@ async def connect(
82
87
:raises AuthError: If invalid API key is supplied.
83
88
"""
84
89
reconnects = 0
85
- if self .verbose :
86
- print ("connect:" , self .url )
90
+ logger .debug ("connect: %s" , self .url )
87
91
# darwin needs some extra <3
88
92
ssl_context = None
89
93
if self .url .startswith ("wss://" ):
@@ -96,21 +100,19 @@ async def connect(
96
100
self .websocket = s
97
101
try :
98
102
msg = await s .recv ()
99
- if self .verbose :
100
- print ("connected:" , msg )
101
- if self .verbose :
102
- print ("authing:" )
103
+ logger .debug ("connected: %s" , msg )
104
+ logger .debug ("authing..." )
103
105
await s .send (json .dumps ({"action" : "auth" , "params" : self .api_key }))
104
106
auth_msg = await s .recv ()
105
107
auth_msg_parsed = json .loads (auth_msg )
106
- if self .verbose :
107
- print ("authed:" , auth_msg )
108
+ logger .debug ("authed: %s" , auth_msg )
108
109
if auth_msg_parsed [0 ]["status" ] == "auth_failed" :
109
110
raise AuthError (auth_msg_parsed [0 ]["message" ])
110
111
while True :
111
112
if self .schedule_resub :
112
- if self .verbose :
113
- print ("reconciling:" , self .subs , self .scheduled_subs )
113
+ logger .debug (
114
+ "reconciling: %s %s" , self .subs , self .scheduled_subs
115
+ )
114
116
new_subs = self .scheduled_subs .difference (self .subs )
115
117
await self ._subscribe (new_subs )
116
118
old_subs = self .subs .difference (self .scheduled_subs )
@@ -126,21 +128,18 @@ async def connect(
126
128
msgJson = json .loads (cmsg ) # type: ignore
127
129
for m in msgJson :
128
130
if m ["ev" ] == "status" :
129
- if self .verbose :
130
- print ("status:" , m ["message" ])
131
+ logger .debug ("status: %s" , m ["message" ])
131
132
continue
132
133
if not self .raw :
133
- cmsg = parse (msgJson )
134
+ cmsg = parse (msgJson , logger )
134
135
135
136
if len (cmsg ) > 0 :
136
137
await processor (cmsg ) # type: ignore
137
138
except ConnectionClosedOK :
138
- if self .verbose :
139
- print ("connection closed (OK)" )
139
+ logger .debug ("connection closed (OK)" )
140
140
return
141
141
except ConnectionClosedError :
142
- if self .verbose :
143
- print ("connection closed (error)" )
142
+ logger .debug ("connection closed (ERR)" )
144
143
reconnects += 1
145
144
if self .max_reconnects is not None and reconnects > self .max_reconnects :
146
145
return
@@ -172,24 +171,22 @@ async def _subscribe(self, topics: Union[List[str], Set[str]]):
172
171
if self .websocket is None or len (topics ) == 0 :
173
172
return
174
173
subs = "," .join (topics )
175
- if self .verbose :
176
- print ("subbing:" , subs )
174
+ logger .debug ("subbing: %s" , subs )
177
175
await self .websocket .send (json .dumps ({"action" : "subscribe" , "params" : subs }))
178
176
179
177
async def _unsubscribe (self , topics : Union [List [str ], Set [str ]]):
180
178
if self .websocket is None or len (topics ) == 0 :
181
179
return
182
180
subs = "," .join (topics )
183
- if self .verbose :
184
- print ("unsubbing:" , subs )
181
+ logger .debug ("unsubbing: %s" , subs )
185
182
await self .websocket .send (json .dumps ({"action" : "unsubscribe" , "params" : subs }))
186
183
187
184
@staticmethod
188
185
def _parse_subscription (s : str ):
189
186
s = s .strip ()
190
187
split = s .split ("." )
191
188
if len (split ) != 2 :
192
- print ("invalid subscription:" , s )
189
+ logger . warning ("invalid subscription:" , s )
193
190
return [None , None ]
194
191
195
192
return split
@@ -204,8 +201,7 @@ def subscribe(self, *subscriptions: str):
204
201
topic , sym = self ._parse_subscription (s )
205
202
if topic == None :
206
203
continue
207
- if self .verbose :
208
- print ("add:" , s )
204
+ logger .debug ("sub desired: %s" , s )
209
205
self .scheduled_subs .add (s )
210
206
# If user subs to X.*, remove other X.\w+
211
207
if sym == "*" :
@@ -225,8 +221,7 @@ def unsubscribe(self, *subscriptions: str):
225
221
topic , sym = self ._parse_subscription (s )
226
222
if topic == None :
227
223
continue
228
- if self .verbose :
229
- print ("discard:" , s )
224
+ logger .debug ("sub undesired: %s" , s )
230
225
self .scheduled_subs .discard (s )
231
226
232
227
# If user unsubs to X.*, remove other X.\w+
@@ -248,11 +243,10 @@ async def close(self):
248
243
"""
249
244
Close the websocket connection.
250
245
"""
251
- if self .verbose :
252
- print ("closing:" )
246
+ logger .debug ("closing" )
253
247
254
248
if self .websocket :
255
249
await self .websocket .close ()
256
250
self .websocket = None
257
251
else :
258
- print ("no websocket open to close" )
252
+ logger . warning ("no websocket open to close" )
0 commit comments