|
1 | 1 | import asyncio
|
2 |
| -import functools |
3 | 2 | import json
|
4 | 3 | import logging
|
5 |
| -import random |
6 |
| -import re |
7 |
| -from concurrent.futures import ThreadPoolExecutor |
8 | 4 |
|
9 |
| -import aiohttp |
10 | 5 | import aioredis
|
| 6 | +import backoff |
11 | 7 |
|
12 |
| -from placedump.common import ctx_aioredis, get_token, headers |
13 |
| -from placedump.constants import socket_key |
| 8 | +from placedump.common import ctx_aioredis, get_async_gql_client, get_token |
| 9 | +from placedump.constants import config_gql, socket_key, sub_gql |
14 | 10 | from placedump.tasks.parse import parse_message
|
15 | 11 |
|
16 |
| -logging.basicConfig(level=logging.INFO) |
17 | 12 | log = logging.getLogger(__name__)
|
18 |
| - |
19 | 13 | tasks = []
|
20 | 14 |
|
21 |
| -pool = ThreadPoolExecutor(max_workers=8) |
22 |
| - |
23 |
| -PAYLOAD_CONFIG = """ |
24 |
| - subscription configuration($input: SubscribeInput!) { |
25 |
| - subscribe(input: $input) { |
26 |
| - id |
27 |
| - ... on BasicMessage { |
28 |
| - data { |
29 |
| - __typename |
30 |
| - ... on ConfigurationMessageData { |
31 |
| - colorPalette { |
32 |
| - colors { |
33 |
| - hex |
34 |
| - index |
35 |
| - } |
36 |
| - } |
37 |
| - canvasConfigurations { |
38 |
| - index |
39 |
| - dx |
40 |
| - dy |
41 |
| - } |
42 |
| - canvasWidth |
43 |
| - canvasHeight |
44 |
| - } |
45 |
| - } |
46 |
| - } |
47 |
| - } |
48 |
| - } |
49 |
| -""" |
50 |
| - |
51 |
| -PAYLOAD_REPLACE = """ |
52 |
| - subscription replace($input: SubscribeInput!) { |
53 |
| - subscribe(input: $input) { |
54 |
| - id |
55 |
| - ... on BasicMessage { |
56 |
| - data { |
57 |
| - __typename |
58 |
| - ... on FullFrameMessageData { |
59 |
| - __typename |
60 |
| - name |
61 |
| - timestamp |
62 |
| - } |
63 |
| - ... on DiffFrameMessageData { |
64 |
| - __typename |
65 |
| - name |
66 |
| - currentTimestamp |
67 |
| - previousTimestamp |
68 |
| - } |
69 |
| - } |
70 |
| - } |
71 |
| - } |
72 |
| - } |
73 |
| -""" |
| 15 | + |
| 16 | +async def get_meta() -> dict: |
| 17 | + async with ctx_aioredis() as redis: |
| 18 | + result = await redis.hgetall("place:meta") |
| 19 | + return result or {} |
74 | 20 |
|
75 | 21 |
|
76 |
| -async def push_to_key(redis: aioredis.Redis, key: str, payload: dict): |
| 22 | +async def push_to_key(redis: aioredis.Redis, key: str, payload: dict, canvas_id: int): |
77 | 23 | await redis.xadd(key, payload, maxlen=2000000)
|
78 | 24 |
|
79 | 25 | message = payload["message"]
|
80 |
| - if isinstance(message, bytes): |
81 |
| - message = message.decode("utf8") |
82 |
| - |
83 | 26 | await redis.publish(key, message)
|
84 |
| - parse_message.delay(message) |
| 27 | + parse_message.delay(message, canvas_id) |
85 | 28 |
|
86 | 29 |
|
87 |
| -async def get_meta() -> dict: |
88 |
| - async with ctx_aioredis() as redis: |
89 |
| - result = await redis.hgetall("place:meta") |
90 |
| - return result or {} |
| 30 | +async def main(): |
| 31 | + meta = await get_meta() |
| 32 | + highest_board = int(meta.get("index", "0")) |
| 33 | + log.info(meta) |
91 | 34 |
|
| 35 | + tasks.append(asyncio.create_task(graphql_parser("config"))) |
92 | 36 |
|
93 |
| -async def connect_socket(session: aiohttp.ClientSession, url: str): |
94 |
| - token = await get_token() |
95 |
| - meta = await get_meta() |
| 37 | + for x in range(0, highest_board + 1): |
| 38 | + tasks.append(asyncio.create_task(graphql_parser(x))) |
| 39 | + |
| 40 | + await asyncio.gather(*tasks) |
96 | 41 |
|
97 |
| - log.info("socket connecting") |
98 |
| - log.info(meta) |
99 | 42 |
|
100 |
| - async with session.ws_connect( |
101 |
| - url, |
102 |
| - headers={ |
103 |
| - "Sec-WebSocket-Protocol": "graphql-ws", |
104 |
| - "Origin": "https://hot-potato.reddit.com", |
105 |
| - }, |
106 |
| - ) as ws: |
107 |
| - log.info("socket connected") |
108 |
| - await ws.send_str( |
109 |
| - json.dumps( |
110 |
| - { |
111 |
| - "type": "connection_init", |
112 |
| - "payload": {"Authorization": f"Bearer {token}"}, |
| 43 | +@backoff.on_exception(backoff.fibo, Exception, max_time=30) |
| 44 | +async def graphql_parser(canvas_id): |
| 45 | + # pick the corrent gql schema and pick variables for canvas / config grabs. |
| 46 | + if canvas_id == "config": |
| 47 | + schema = config_gql |
| 48 | + variables = { |
| 49 | + "input": { |
| 50 | + "channel": { |
| 51 | + "category": "CONFIG", |
| 52 | + "teamOwner": "AFD2022", |
113 | 53 | }
|
114 |
| - ) |
115 |
| - ) |
116 |
| - |
117 |
| - await ws.send_json( |
118 |
| - { |
119 |
| - "id": "1", |
120 |
| - "payload": { |
121 |
| - "extensions": {}, |
122 |
| - "operationName": "configuration", |
123 |
| - "query": PAYLOAD_CONFIG, |
124 |
| - "variables": { |
125 |
| - "input": { |
126 |
| - "channel": { |
127 |
| - "category": "CONFIG", |
128 |
| - "teamOwner": "AFD2022", |
129 |
| - } |
130 |
| - } |
131 |
| - }, |
132 |
| - }, |
133 |
| - "type": "start", |
134 | 54 | }
|
135 |
| - ) |
136 |
| - |
137 |
| - highest_board = int(meta.get("index", "0")) |
138 |
| - |
139 |
| - for x in range(0, highest_board + 1): |
140 |
| - log.info("launching for board %d, ws id %d", x, 2 + x) |
141 |
| - await ws.send_json( |
142 |
| - { |
143 |
| - "id": str(2 + highest_board), |
144 |
| - "payload": { |
145 |
| - "extensions": {}, |
146 |
| - "operationName": "replace", |
147 |
| - "query": PAYLOAD_REPLACE, |
148 |
| - "variables": { |
149 |
| - "input": { |
150 |
| - "channel": { |
151 |
| - "category": "CANVAS", |
152 |
| - "tag": str(x), |
153 |
| - "teamOwner": "AFD2022", |
154 |
| - } |
155 |
| - } |
156 |
| - }, |
157 |
| - }, |
158 |
| - "type": "start", |
| 55 | + } |
| 56 | + else: |
| 57 | + schema = sub_gql |
| 58 | + variables = { |
| 59 | + "input": { |
| 60 | + "channel": { |
| 61 | + "category": "CANVAS", |
| 62 | + "teamOwner": "AFD2022", |
| 63 | + "tag": str(canvas_id), |
159 | 64 | }
|
160 |
| - ) |
161 |
| - |
162 |
| - async with ctx_aioredis(decode_responses=False) as redis: |
163 |
| - async for msg in ws: |
164 |
| - if msg.type == aiohttp.WSMsgType.TEXT: |
165 |
| - await push_to_key( |
166 |
| - redis, |
167 |
| - socket_key, |
168 |
| - { |
169 |
| - "message": msg.data.encode("utf8"), |
170 |
| - "type": "text", |
171 |
| - }, |
172 |
| - ) |
173 |
| - elif msg.type == aiohttp.WSMsgType.BINARY: |
174 |
| - await push_to_key( |
175 |
| - redis, |
176 |
| - socket_key, |
177 |
| - { |
178 |
| - "message": msg.data, |
179 |
| - "type": "binary", |
180 |
| - }, |
181 |
| - ) |
182 |
| - elif msg.type == aiohttp.WSMsgType.ERROR: |
183 |
| - break |
| 65 | + } |
| 66 | + } |
184 | 67 |
|
| 68 | + # Using `async with` on the client will start a connection on the transport |
| 69 | + # and provide a `session` variable to execute queries on this connection |
| 70 | + log.info("socket connecting for canvas %s", canvas_id) |
185 | 71 |
|
186 |
| -async def main(): |
187 |
| - async with aiohttp.ClientSession(headers=headers) as session: |
188 |
| - while True: |
189 |
| - try: |
190 |
| - await connect_socket(session, "wss://gql-realtime-2.reddit.com/query") |
191 |
| - print("Socket disconnected!") |
192 |
| - except aiohttp.client_exceptions.WSServerHandshakeError as e: |
193 |
| - print(e.request_info) |
194 |
| - print("Handshake error!", e) |
195 |
| - |
196 |
| - await asyncio.sleep(1) |
| 72 | + async with ctx_aioredis() as redis: |
| 73 | + async with get_async_gql_client() as session: |
| 74 | + log.info("socket connected for canvas %s", canvas_id) |
| 75 | + async for result in session.subscribe(schema, variable_values=variables): |
| 76 | + # append canvas id to messages |
| 77 | + result["canvas_id"] = canvas_id |
| 78 | + |
| 79 | + await push_to_key( |
| 80 | + redis, |
| 81 | + socket_key, |
| 82 | + { |
| 83 | + "message": json.dumps(result), |
| 84 | + "type": "text", |
| 85 | + }, |
| 86 | + canvas_id=canvas_id, |
| 87 | + ) |
197 | 88 |
|
198 | 89 |
|
199 | 90 | if __name__ == "__main__":
|
|
0 commit comments