Skip to content

Commit 9b45e69

Browse files
ref archive task to mimic work load
1 parent ed52682 commit 9b45e69

File tree

2 files changed

+29
-7
lines changed

2 files changed

+29
-7
lines changed

storage-app/src/shared/archive_helpers.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@ def __init__(self, file_name: str, as_file: bool = False):
3636
self.queue = Queue()
3737
self._archive_id = None
3838
self._as_file = as_file
39+
self._done = False
3940

4041
def stop(self): self.queue.put(None)
4142

4243
def write(self, task: tuple[BytesIO, int]): self.queue.put(task)
4344

4445
def result(self) -> Optional[str]: return self._archive_id
4546

47+
@property
48+
def ready(self) -> bool: return self._done
49+
4650
def _get_write_in(self):
4751
return (
4852
open(self.file_name, "wb")
@@ -75,6 +79,7 @@ def run(self):
7579
self._archive_id = dest._id
7680
SyncDataBase.close_connection()
7781

82+
self._done = True
7883

7984
class ZipConsumer(Thread):
8085
DUMP_THRESHOLD: int = 10 << 20
@@ -91,9 +96,13 @@ def __init__(
9196
self.file_list = []
9297
self.writer = writer
9398
self._local_dir_end = 0
99+
self._done = False
94100

95101
def tell(self) -> int: return self._local_dir_end
96102

103+
@property
104+
def ready(self) -> bool: return self._done
105+
97106
def _dump_buffer(self, buffer: BytesIO, zip_buffer: ZipFile):
98107
dest_offset = self.tell()
99108

@@ -214,6 +223,8 @@ def run(self):
214223

215224
self._finalize()
216225

226+
self._done = True
227+
217228

218229
class FileProducer:
219230
def __init__(
@@ -225,6 +236,10 @@ def __init__(
225236
self.object_set = object_set
226237
self.queue = queue
227238
self.max_concurrent = max_concurrent
239+
self._done = False
240+
241+
@property
242+
def ready(self) -> bool: return self._done
228243

229244
async def produce(self):
230245
tasks = []
@@ -244,6 +259,8 @@ async def produce(self):
244259
await gather(*tasks)
245260
self.queue.put(None)
246261

262+
self._done = True
263+
247264
async def next(self, file: GridOut):
248265
try: self.queue.put((self._get_file_name(file), await file.read()))
249266
except Exception as e: print(f"next err {str(e)}")

storage-app/src/shared/worker_services.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@
44
SECRET_KEY,
55
SECRET_ALGO,
66
APP_BACKEND_URL,
7-
TEMP_ZIP,
87
ASYNC_PRODUCER_MAX_CONCURRENT as MAX_CONCURENT
98
)
10-
from asyncio import get_event_loop
9+
from asyncio import get_event_loop, sleep as async_stall_for, create_task
1110
from shared.app_services import Bucket
1211
from shared.utils import emit_token
1312
from shared.embedding_db import EmbeddingStorage
1413
from typing import Any, Optional
1514
import requests
16-
from os import mkdir, path
1715
from .hasher import VHash, IHash
1816
from queue import Queue
1917
from .archive_helpers import FileProducer, ZipConsumer, ZipWriter
@@ -44,8 +42,6 @@ def __init__(self, bucket_name: str, file_ids: list[str]) -> None:
4442
self.bucket_name = bucket_name
4543

4644
async def archive_objects(self) -> Optional[bool]:
47-
if not path.exists(TEMP_ZIP): mkdir(TEMP_ZIP)
48-
4945
json_data: Any = ("annotation.json", dumps(self.annotation, indent=4).encode("utf-8"))
5046

5147
queue = Queue()
@@ -54,12 +50,21 @@ async def archive_objects(self) -> Optional[bool]:
5450
writer = ZipWriter(f"{self.bucket_name}_dataset")
5551
consumer = ZipConsumer(queue, [json_data], writer)
5652

53+
producer_task = create_task(producer.produce())
5754
consumer.start()
5855
writer.start()
5956

60-
await producer.produce()
61-
await self.object_set.close()
57+
wait_item = lambda t, n: type("wi", (object,), {"task": t, "next": n})
58+
wait_list = wait_item(producer, wait_item(consumer, wait_item(writer, None)))
6259

60+
while wait_list:
61+
if wait_list.task.ready:
62+
wait_list = wait_list.next
63+
continue
64+
await async_stall_for(1)
65+
66+
await producer_task
67+
await self.object_set.close()
6368
consumer.join()
6469
writer.join()
6570

0 commit comments

Comments
 (0)