|
| 1 | +import signal |
| 2 | +import threading |
| 3 | +from typing import Optional, Callable |
| 4 | + |
| 5 | +import websocket |
| 6 | + |
| 7 | +STOCKS_CLUSTER = "stocks" |
| 8 | +FOREX_CLUSTER = "forex" |
| 9 | +CRYPTO_CLUSTER = "crypto" |
| 10 | + |
| 11 | + |
| 12 | +class WebSocketClient(object): |
| 13 | + DEFAULT_HOST = 'socket.polygon.io' |
| 14 | + |
| 15 | + # TODO: Either an instance of the client couples 1:1 with the cluster or an instance of the Client couples 1:3 with |
| 16 | + # the 3 possible clusters (I think I like client per, but then a problem is the user can make multiple clients for |
| 17 | + # the same cluster and that's not desirable behavior, |
| 18 | + # somehow keeping track with multiple Client instances will be the difficulty) |
| 19 | + def __init__(self, cluster: str, auth_key: str, process_message: Optional[Callable[[str], None]] = None): |
| 20 | + self._host = self.DEFAULT_HOST |
| 21 | + self.url = f"wss://{self._host}/{cluster}" |
| 22 | + self.ws: websocket.WebSocketApp = websocket.WebSocketApp(self.url, on_open=self._default_on_open(), |
| 23 | + on_close=self._default_on_close, |
| 24 | + on_error=self._default_on_error, |
| 25 | + on_message=self._default_on_message()) |
| 26 | + self.auth_key = auth_key |
| 27 | + |
| 28 | + self.process_message = process_message |
| 29 | + |
| 30 | + # being authenticated is an event that must occur before any other action is sent to the server |
| 31 | + self._authenticated = threading.Event() |
| 32 | + # self._run_thread is only set if the client is run asynchronously |
| 33 | + self._run_thread: Optional[threading.Thread] = None |
| 34 | + |
| 35 | + signal.signal(signal.SIGINT, self._cleanup_signal_handler()) |
| 36 | + signal.signal(signal.SIGTERM, self._cleanup_signal_handler()) |
| 37 | + |
| 38 | + def run(self): |
| 39 | + self.ws.run_forever() |
| 40 | + |
| 41 | + def run_async(self): |
| 42 | + self._run_thread = threading.Thread(target=self.run) |
| 43 | + self._run_thread.start() |
| 44 | + |
| 45 | + def close_connection(self): |
| 46 | + self.ws.close() |
| 47 | + if self._run_thread: |
| 48 | + self._run_thread.join() |
| 49 | + |
| 50 | + def subscribe(self, *params): |
| 51 | + # TODO: make this a decorator or context manager |
| 52 | + self._authenticated.wait() |
| 53 | + |
| 54 | + sub_message = '{"action":"subscribe","params":"%s"}' % self._format_params(params) |
| 55 | + self.ws.send(sub_message) |
| 56 | + |
| 57 | + def unsubscribe(self, *params): |
| 58 | + # TODO: make this a decorator or context manager |
| 59 | + self._authenticated.wait() |
| 60 | + |
| 61 | + sub_message = '{"action":"unsubscribe","params":"%s"}' % self._format_params(params) |
| 62 | + self.ws.send(sub_message) |
| 63 | + |
| 64 | + def _cleanup_signal_handler(self): |
| 65 | + return lambda signalnum, frame: self.close_connection() |
| 66 | + |
| 67 | + def _authenticate(self, ws): |
| 68 | + ws.send('{"action":"auth","params":"%s"}' % self.auth_key) |
| 69 | + self._authenticated.set() |
| 70 | + |
| 71 | + @staticmethod |
| 72 | + def _format_params(params): |
| 73 | + return ",".join(params) |
| 74 | + |
| 75 | + @property |
| 76 | + def process_message(self): |
| 77 | + return self.__process_message |
| 78 | + |
| 79 | + @process_message.setter |
| 80 | + def process_message(self, pm): |
| 81 | + if pm: |
| 82 | + self.__process_message = pm |
| 83 | + self.ws.on_message = lambda ws, message: self.__process_message(message) |
| 84 | + |
| 85 | + def _default_on_message(self): |
| 86 | + return lambda ws, message: self._default_process_message(message) |
| 87 | + |
| 88 | + @staticmethod |
| 89 | + def _default_process_message(message): |
| 90 | + print(message) |
| 91 | + |
| 92 | + def _default_on_open(self): |
| 93 | + def f(ws): |
| 94 | + self._authenticate(ws) |
| 95 | + |
| 96 | + @staticmethod |
| 97 | + def _default_on_error(ws, error): |
| 98 | + print("error:", error) |
| 99 | + |
| 100 | + @staticmethod |
| 101 | + def _default_on_close(ws): |
| 102 | + print("### closed ###") |
0 commit comments