Skip to content

Commit d1c3a3a

Browse files
committed
refactoring to dedupe gql ws opening, refactor pixel watcher
1 parent d57f68c commit d1c3a3a

File tree

3 files changed

+129
-84
lines changed

3 files changed

+129
-84
lines changed

pixel_watcher.py

Lines changed: 103 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
import asyncio
2+
import functools
23
import json
3-
import random
4+
import logging
5+
from concurrent.futures import ThreadPoolExecutor
6+
from functools import lru_cache
47

58
import backoff
6-
from gql import Client
7-
from gql.transport.websockets import WebsocketsTransport
9+
from gql import gql
10+
from gql.dsl import DSLQuery, DSLSchema, dsl_gql
811

9-
from placedump.common import ctx_aioredis, ctx_redis, get_token, headers
10-
from placedump.constants import query_get_pixel_10x
12+
from placedump.common import ctx_aioredis, get_async_gql_client, get_token, headers
1113
from placedump.tasks.pixels import update_pixel
1214

15+
log = logging.getLogger("info")
16+
pool = ThreadPoolExecutor(max_workers=8)
1317
running = True
1418
tasks = []
1519

@@ -21,88 +25,125 @@ async def main():
2125
await asyncio.gather(*tasks)
2226

2327

24-
@backoff.on_exception(backoff.constant, Exception, interval=1, max_time=300)
28+
@lru_cache
29+
def generate_history_mutation(count: int):
30+
input_header = ""
31+
inputs = ""
32+
33+
for input_index in range(0, count):
34+
index_str = str(input_index + 1)
35+
36+
input_header += f"$input{index_str}: ActInput!, "
37+
inputs += """
38+
input%s: act(input: $input%s) {
39+
data {
40+
... on BasicMessage {
41+
id
42+
data {
43+
... on GetTileHistoryResponseMessageData {
44+
lastModifiedTimestamp
45+
userInfo {
46+
userID
47+
username
48+
}
49+
}
50+
}
51+
}
52+
}
53+
}""" % (
54+
index_str,
55+
index_str,
56+
)
57+
58+
input_header = input_header.rstrip(", ")
59+
60+
return gql(
61+
"""mutation pixelHistory({input_header}) {{
62+
{inputs}
63+
}}""".format(
64+
input_header=input_header,
65+
inputs=inputs,
66+
)
67+
)
68+
69+
70+
async def bulk_update(pixels: dict, gql_results: dict):
71+
updates = []
72+
73+
for input_name, gql_res in gql_results.items():
74+
pixel_info = pixels[input_name]
75+
pixel_data = gql_res["data"][0]["data"]
76+
77+
updates.append(
78+
asyncio.get_event_loop().run_in_executor(
79+
pool,
80+
functools.partial(
81+
update_pixel.apply_async,
82+
kwargs=dict(
83+
board_id=pixel_info["board"],
84+
x=pixel_info["x"],
85+
y=pixel_info["y"],
86+
pixel_data=pixel_data,
87+
),
88+
priority=5,
89+
),
90+
)
91+
)
92+
93+
await asyncio.gather(*updates)
94+
95+
96+
@backoff.on_exception(backoff.fibo, Exception, max_time=30)
2597
async def graphql_parser():
2698
token = await get_token()
2799

28-
transport = WebsocketsTransport(
29-
url="wss://gql-realtime-2.reddit.com/query",
30-
headers={
31-
"Authorization": f"Bearer {token}",
32-
"Sec-WebSocket-Protocol": "graphql-ws",
33-
"Origin": "https://hot-potato.reddit.com",
34-
"User-Agent": "r/place archiver u/nepeat nepeat#0001",
35-
},
36-
)
37-
38100
# Using `async with` on the client will start a connection on the transport
39101
# and provide a `session` variable to execute queries on this connection
40102
async with ctx_aioredis() as redis:
41-
async with Client(
42-
transport=transport,
43-
fetch_schema_from_transport=True,
44-
) as session:
45-
print("socket connected")
46-
pixels_get = []
103+
async with get_async_gql_client() as session:
104+
log.info("socket connected")
105+
pixels_index = {}
47106

48107
highest_board = await redis.hget("place:meta", "index")
49108
highest_board = max(int(highest_board), 0)
50109

51110
while running:
52-
for pair in await redis.spop("queue:pixels", 10):
53-
pair = json.loads(pair)
54-
pixels_get.append((pair["x"], pair["y"], pair["board"]))
111+
variables = {}
112+
113+
pairs_raw = await redis.spop("queue:pixels", 24)
114+
for index, pixel in enumerate(pairs_raw):
115+
pixels_index["input" + str(index + 1)] = json.loads(pixel)
55116

56117
# sleep if we have no pixels
57-
if len(pixels_get) == 0:
118+
if len(pixels_index) == 0:
58119
await asyncio.sleep(0.1)
59120
continue
60121

61-
# pad pixels if less than payload size
62-
while len(pixels_get) < 10:
63-
pixels_get.append(
64-
(
65-
random.randint(0, 999),
66-
random.randint(0, 999),
67-
random.randint(0, highest_board + 1),
68-
)
69-
)
70-
71-
variables = {}
72-
pixels_index = {}
73-
74-
for index, pixel in enumerate(pixels_get):
75-
x, y, board = pixel
76-
77-
variables["input" + str(index + 1)] = {
122+
for key, pixel in pixels_index.items():
123+
variables[key] = {
78124
"actionName": "r/replace:get_tile_history",
79125
"PixelMessageData": {
80-
"canvasIndex": board,
126+
"canvasIndex": pixel["board"],
81127
"colorIndex": 0,
82-
"coordinate": {"x": x, "y": y},
128+
"coordinate": {"x": pixel["x"], "y": pixel["y"]},
83129
},
84130
}
85-
pixels_index["input" + str(index + 1)] = (x, y, board)
131+
132+
gql_query = generate_history_mutation(len(pixels_index))
86133

87134
result = await session.execute(
88-
query_get_pixel_10x, variable_values=variables
135+
gql_query,
136+
variable_values=variables,
137+
)
138+
139+
await bulk_update(pixels_index, result)
140+
log.info(
141+
"batch completed, batch: %s remaining: %s",
142+
len(pixels_index),
143+
await redis.scard("queue:pixels"),
89144
)
90145

91-
for input_name, gql_res in result.items():
92-
x, y, board = pixels_index[input_name]
93-
pixel = gql_res["data"][0]["data"]
94-
95-
update_pixel.apply_async(
96-
kwargs=dict(
97-
board_id=board,
98-
x=x,
99-
y=y,
100-
pixel_data=pixel,
101-
),
102-
priority=5,
103-
)
104-
print("batch completed, remaining: ", await redis.scard("queue:pixels"))
105-
pixels_get.clear()
146+
pixels_index.clear()
106147

107148

108149
if __name__ == "__main__":

placedump/common.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from b2sdk.v2 import B2Api, InMemoryAccountInfo
1212
from gql import Client
1313
from gql.transport.aiohttp import AIOHTTPTransport
14+
from gql.transport.websockets import WebsocketsTransport
1415

1516
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost")
1617
TOKEN_REGEX = re.compile(r'"accessToken":"([^"]+)"')
@@ -96,6 +97,28 @@ def get_gql_client(token: Optional[str] = None) -> Client:
9697
return Client(transport=transport, fetch_schema_from_transport=False)
9798

9899

100+
@asynccontextmanager
101+
async def get_async_gql_client() -> Client:
102+
token = await get_token()
103+
104+
transport = WebsocketsTransport(
105+
url="wss://gql-realtime-2.reddit.com/query",
106+
headers={
107+
"Authorization": f"Bearer {token}",
108+
"Sec-WebSocket-Protocol": "graphql-ws",
109+
"Origin": "https://hot-potato.reddit.com",
110+
"User-Agent": "r/place archiver u/nepeat nepeat#0001",
111+
},
112+
ping_interval=2.0,
113+
)
114+
115+
async with Client(
116+
transport=transport,
117+
fetch_schema_from_transport=True,
118+
) as session:
119+
yield session
120+
121+
99122
@lru_cache
100123
def get_b2_api() -> B2Api:
101124
info = InMemoryAccountInfo()

proto_gql_dump.py

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@
44

55
import aioredis
66
import backoff
7-
from gql import Client, gql
8-
from gql.transport.websockets import WebsocketsTransport
9-
from httpx import head
107

11-
from placedump.common import ctx_aioredis, get_token
8+
from placedump.common import ctx_aioredis, get_async_gql_client, get_token
129
from placedump.constants import config_gql, socket_key, sub_gql
1310
from placedump.tasks.parse import parse_message
1411

@@ -43,21 +40,8 @@ async def main():
4340
await asyncio.gather(*tasks)
4441

4542

46-
@backoff.on_exception(backoff.constant, Exception, interval=1, max_time=300)
43+
@backoff.on_exception(backoff.fibo, Exception, max_time=30)
4744
async def graphql_parser(canvas_id):
48-
token = await get_token()
49-
50-
transport = WebsocketsTransport(
51-
url="wss://gql-realtime-2.reddit.com/query",
52-
headers={
53-
"Authorization": f"Bearer {token}",
54-
"Sec-WebSocket-Protocol": "graphql-ws",
55-
"Origin": "https://hot-potato.reddit.com",
56-
"User-Agent": "r/place archiver u/nepeat nepeat#0001",
57-
},
58-
ping_interval=2.0,
59-
)
60-
6145
# pick the corrent gql schema and pick variables for canvas / config grabs.
6246
if canvas_id == "config":
6347
schema = config_gql
@@ -86,10 +70,7 @@ async def graphql_parser(canvas_id):
8670
log.info("socket connecting for canvas %s", canvas_id)
8771

8872
async with ctx_aioredis() as redis:
89-
async with Client(
90-
transport=transport,
91-
fetch_schema_from_transport=True,
92-
) as session:
73+
async with get_async_gql_client() as session:
9374
log.info("socket connected for canvas %s", canvas_id)
9475
async for result in session.subscribe(schema, variable_values=variables):
9576
# append canvas id to messages

0 commit comments

Comments
 (0)