Skip to content

Commit 7e1b647

Browse files
f
1 parent 59b6c73 commit 7e1b647

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

storage-app/src/shared/archive_helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from asyncio import create_task, wait, FIRST_COMPLETED, gather
1+
from asyncio import create_task, wait, FIRST_COMPLETED
22
from typing import Optional
33
from threading import Thread
44
from queue import Queue
@@ -257,7 +257,7 @@ async def produce(self):
257257
self.iter_count += 1
258258
if not self.iter_count % GC_FREQ: gc_collect()
259259

260-
await gather(*tasks)
260+
for task in tasks: await task
261261
self.queue.put(None)
262262

263263
self._done = True

storage-app/src/shared/worker_services.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
APP_BACKEND_URL,
77
ASYNC_PRODUCER_MAX_CONCURRENT as MAX_CONCURENT
88
)
9-
from asyncio import get_event_loop, sleep as async_stall_for, create_task
9+
from asyncio import get_event_loop
10+
from time import sleep as stall_for
1011
from shared.app_services import Bucket
1112
from shared.utils import emit_token
1213
from shared.embedding_db import EmbeddingStorage
@@ -57,12 +58,13 @@ async def archive_objects(self) -> Optional[bool]:
5758
writer = ZipWriter(f"{self.bucket_name}_dataset")
5859
consumer = ZipConsumer(queue, [json_data], writer)
5960

60-
producer_task = create_task(producer.produce())
6161
consumer.start()
6262
writer.start()
6363

6464
wait_item = lambda t, n: type("wi", (object,), {"task": t, "next": n})
65-
wait_list = wait_item(producer, wait_item(consumer, wait_item(writer, None)))
65+
wait_list = wait_item(consumer, wait_item(writer, None))
66+
67+
await producer.produce()
6668

6769
while wait_list:
6870
if wait_list.task.ready:
@@ -71,9 +73,8 @@ async def archive_objects(self) -> Optional[bool]:
7173

7274
print(f"ZIP WORK STALL, {producer.iter_count}")
7375
self._task.update_state(state="PROGRESS")
74-
await async_stall_for(5)
76+
stall_for(5)
7577

76-
await producer_task
7778
await object_set.close()
7879
consumer.join()
7980
writer.join()

storage-app/src/worker.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ def default(self, o) -> Any: return getattr(o, "__json__", super().default)(o)
1818
worker.conf.broker_url = BROKER_URL
1919
worker.conf.result_backend = RESULT_URL
2020
worker.conf.broker_transport_options = {"visibility_timeout": 1 * 60 * 60 * 24}
21+
worker.conf.task_time_limit = None
22+
worker.conf.task_soft_time_limit = None
23+
worker.conf.worker_lost_wait = None
2124

2225
register(
2326
"custom_encoder",
@@ -29,10 +32,22 @@ def default(self, o) -> Any: return getattr(o, "__json__", super().default)(o)
2932

3033
@worker.task(
3134
bind=True,
32-
name="produce_download_task",
35+
name="test",
3336
time_limit=None,
3437
soft_time_limit=None
3538
)
39+
def test(self):
40+
from subprocess import PIPE, Popen
41+
from time import sleep
42+
43+
pc = Popen(["sleep", "40"], stdout=PIPE)
44+
while not pc.poll() == 0:
45+
sleep(5)
46+
print("tick")
47+
return pc.stdout.read()
48+
49+
50+
@worker.task(bind=True, name="produce_download_task",)
3651
def produce_download_task(self, bucket_name: str, file_ids: list[str]) -> str | None:
3752
task = Zipper(bucket_name, file_ids, self)
3853
run(task.archive_objects())

0 commit comments

Comments
 (0)