|
1 | 1 | import asyncio |
| 2 | +from collections import defaultdict |
2 | 3 | import logging |
3 | 4 | import json |
4 | 5 | from typing import List, Optional |
@@ -58,6 +59,7 @@ def __init__(self, |
58 | 59 | } |
59 | 60 | self._name = 'data' |
60 | 61 | self._should_run = True |
| 62 | + self._max_frame_size = 32768 |
61 | 63 |
|
62 | 64 | async def _connect(self): |
63 | 65 | self._ws = await websockets.connect( |
@@ -186,17 +188,16 @@ def _subscribe(self, handler, symbols, handlers): |
186 | 188 | ).result() |
187 | 189 |
|
188 | 190 | async def _subscribe_all(self): |
189 | | - if any( |
190 | | - v for k, v in self._handlers.items() |
191 | | - if k not in ("cancelErrors", "corrections") |
192 | | - ): |
193 | | - msg = { |
194 | | - k: tuple(v.keys()) |
195 | | - for k, v in self._handlers.items() |
196 | | - if v |
197 | | - } |
198 | | - msg['action'] = 'subscribe' |
199 | | - await self._ws.send(msgpack.packb(msg)) |
| 191 | + msg = defaultdict(list) |
| 192 | + for k, v in self._handlers.items(): |
| 193 | + if k not in ("cancelErrors", "corrections") and v: |
| 194 | + for s in v.keys(): |
| 195 | + msg[k].append(s) |
| 196 | + msg['action'] = 'subscribe' |
| 197 | + bs = msgpack.packb(msg) |
| 198 | + frames = (bs[i:i+self._max_frame_size] |
| 199 | + for i in range(0, len(bs), self._max_frame_size)) |
| 200 | + await self._ws.send(frames) |
200 | 201 |
|
201 | 202 | async def _unsubscribe(self, |
202 | 203 | trades=(), |
|
0 commit comments