|
| 1 | +import datetime |
| 2 | +import sys |
| 3 | +from concurrent.futures import ThreadPoolExecutor |
| 4 | +from typing import List |
| 5 | + |
| 6 | +import ujson as json |
| 7 | +from placedump.common import get_b2_api, get_redis |
| 8 | +from placedump.constants import socket_key |
| 9 | +from placedump.model import URL, Pixel, sm |
| 10 | +from placedump.tasks.pixels import download_url |
| 11 | +from sqlalchemy import func, select |
| 12 | + |
| 13 | +db = sm() |
| 14 | +b2 = get_b2_api() |
| 15 | +bucket = b2.get_bucket_by_name("erin-reddit-afd2022") |
| 16 | + |
| 17 | +update_batch = [] |
| 18 | +url_map = {} |
| 19 | +i = 0 |
| 20 | + |
| 21 | + |
| 22 | +def commit_batch(sql, batch: List[str]): |
| 23 | + sql.bulk_update_mappings( |
| 24 | + URL, |
| 25 | + batch, |
| 26 | + ) |
| 27 | + db_insert.commit() |
| 28 | + print("commited", i) |
| 29 | + batch.clear() |
| 30 | + |
| 31 | + |
| 32 | +for url_item in db.execute(select(URL)).scalars(): |
| 33 | + url_map[url_item.url] = url_item.id |
| 34 | + |
| 35 | +print("loaded", len(url_map)) |
| 36 | + |
| 37 | +with sm() as db_insert: |
| 38 | + for file_version, folder_name in bucket.ls( |
| 39 | + folder_to_list="hot-potato.reddit.com/media/canvas-images", latest_only=True |
| 40 | + ): |
| 41 | + fixed_url = "https://" + file_version.file_name |
| 42 | + |
| 43 | + if file_version.size and fixed_url in url_map: |
| 44 | + update_batch.append( |
| 45 | + { |
| 46 | + "id": url_map[fixed_url], |
| 47 | + "size": file_version.size, |
| 48 | + } |
| 49 | + ) |
| 50 | + i += 1 |
| 51 | + |
| 52 | + if len(update_batch) > 1024: |
| 53 | + commit_batch(db_insert, update_batch) |
| 54 | + |
| 55 | + commit_batch(db_insert, update_batch) |
| 56 | + i += 1 |
0 commit comments