|
1 | 1 | # /// script |
2 | 2 | # requires-python = ">=3.14" |
3 | 3 | # dependencies = [ |
| 4 | +# "aiofiles", |
4 | 5 | # "boto3", |
5 | 6 | # "polars==1.36.1", |
6 | 7 | # "rich==14.2", |
|
21 | 22 | from multiprocessing import Value |
22 | 23 | from pathlib import Path |
23 | 24 |
|
| 25 | +import aiofiles |
24 | 26 | import s3fs |
25 | 27 | import polars as pl |
26 | 28 | from rich.progress import ( |
|
48 | 50 | DATABASES = ["full", "img", "genomes", "sra"] |
49 | 51 |
|
50 | 52 |
|
51 | | -async def download_original(client, location): |
52 | | - data = io.BytesIO() |
| 53 | +async def download_original(client, location, fp): |
53 | 54 | try: |
54 | 55 | f = await client.open_async(location) |
55 | 56 | h = hashlib.new("sha256") |
56 | 57 | while (chnk := await f.read(1024 * 1024)) != b"": |
57 | 58 | h.update(chnk) |
58 | | - data.write(chnk) |
| 59 | + await fp.write(chnk) |
59 | 60 | sha256 = h.hexdigest() |
60 | | - data.flush() |
| 61 | + await fp.flush() |
61 | 62 | finally: |
62 | 63 | await f.close() |
63 | 64 |
|
64 | | - return (data, sha256) |
| 65 | + return sha256 |
65 | 66 |
|
66 | 67 |
|
67 | 68 | async def upload_mirror(client, data, location): |
@@ -304,20 +305,21 @@ async def process_sig( |
304 | 305 | # this was already uploaded, so just return the records |
305 | 306 | pass |
306 | 307 | else: |
307 | | - # TODO: save to temp file, instead of memory |
308 | | - (data, sha256) = await download_original(src_fs, s3_path) |
309 | | - |
310 | | - raw_sig = data.getvalue() |
311 | | - del data |
312 | | - sig = load_signatures(raw_sig) |
313 | | - |
314 | | - loop = asyncio.get_running_loop() |
315 | | - extract = partial(extract_record, sig, key, sha256, last_modified, size) |
316 | | - records = await loop.run_in_executor(None, extract) |
317 | | - del sig |
318 | | - |
319 | | - if not uploaded: |
320 | | - _result = await upload_mirror(upload_fs, raw_sig, s3_path) |
| 308 | + async with aiofiles.tempfile.NamedTemporaryFile() as data: |
| 309 | + # save to temp file, instead of memory |
| 310 | + sha256 = await download_original(src_fs, s3_path, data) |
| 311 | + await data.flush() |
| 312 | + |
| 313 | + await data.seek(0) |
| 314 | + sig = load_signatures(data.name) |
| 315 | + loop = asyncio.get_running_loop() |
| 316 | + extract = partial(extract_record, sig, key, sha256, last_modified, size) |
| 317 | + records = await loop.run_in_executor(None, extract) |
| 318 | + del sig |
| 319 | + |
| 320 | + await data.seek(0) |
| 321 | + if not uploaded: |
| 322 | + _result = await upload_mirror(upload_fs, data, s3_path) |
321 | 323 |
|
322 | 324 | with current_tasks.get_lock(): |
323 | 325 | current_tasks.value += 1 |
|
0 commit comments