Skip to content

Commit 9a18d42

Browse files
authored
Fixed reload atomicity. (#30)
At some point the reload process could get invalid index error. Signed-off-by: Pavel Kirilin <[email protected]>
1 parent ed8826e commit 9a18d42

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

taskiq/cli/worker.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
restart_workers = True
3131
worker_processes: List[Process] = []
3232
observer = Observer()
33-
reload_queue: "Queue[int]" = Queue(-1)
33+
reload_queue: "Queue[bool]" = Queue(-1)
3434

3535

3636
def signal_handler(_signal: int, _frame: Any) -> None:
@@ -73,10 +73,8 @@ def schedule_workers_reload() -> None:
7373
global worker_processes # noqa: WPS420
7474
global reload_queue # noqa: WPS420
7575

76-
logger.info("Reloading workers")
77-
for worker_id, _ in enumerate(worker_processes):
78-
reload_queue.put(worker_id)
79-
logger.info("Worker %s scheduled to reload", worker_id)
76+
reload_queue.put(True)
77+
logger.info("Scheduled workers reload.")
8078
reload_queue.join()
8179

8280

@@ -261,17 +259,20 @@ def watcher_loop(args: TaskiqArgs) -> None: # noqa: C901, WPS213
261259
# List of processes to remove.
262260
sleep(1)
263261
process_to_remove = []
264-
while not reload_queue.empty():
265-
process_id = reload_queue.get()
266-
worker_processes[process_id].terminate()
267-
worker_processes[process_id].join()
268-
worker_processes[process_id] = Process(
269-
target=start_listen,
270-
kwargs={"args": args},
271-
name=f"worker-{process_id}",
272-
)
273-
worker_processes[process_id].start()
274-
reload_queue.task_done()
262+
if not reload_queue.empty():
263+
while not reload_queue.empty():
264+
reload_queue.get()
265+
reload_queue.task_done()
266+
267+
for worker_id, worker in enumerate(worker_processes):
268+
worker.terminate()
269+
worker.join()
270+
worker_processes[worker_id] = Process(
271+
target=start_listen,
272+
kwargs={"args": args},
273+
name=f"worker-{worker_id}",
274+
)
275+
worker_processes[worker_id].start()
275276

276277
for worker_id, worker in enumerate(worker_processes):
277278
if worker.is_alive():

0 commit comments

Comments
 (0)