Skip to content

Commit b50f70b

Browse files
committed
Refactored worker a bit.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 7825e8a commit b50f70b

File tree

1 file changed

+66
-47
lines changed

1 file changed

+66
-47
lines changed

taskiq/cli/worker.py

Lines changed: 66 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,31 @@
1414
logger = getLogger("taskiq.worker")
1515

1616

17+
restart_workers = True
18+
worker_processes: List[Process] = []
19+
20+
21+
def signal_handler(_signal: int, _frame: Any) -> None:
22+
"""
23+
This handler is used only by main process.
24+
25+
If the OS sent you SIGINT or SIGTERM,
26+
we should kill all spawned processes.
27+
28+
:param _signal: incoming signal.
29+
:param _frame: current execution frame.
30+
"""
31+
global restart_workers # noqa: WPS420
32+
global worker_processes # noqa: WPS420
33+
34+
restart_workers = False # noqa: WPS442
35+
for process in worker_processes:
36+
# This is how we send SIGTERM to child
37+
# processes.
38+
process.terminate()
39+
process.join()
40+
41+
1742
def import_broker(broker_spec: str) -> Any:
1843
"""
1944
It parses broker spec and imports it.
@@ -88,7 +113,43 @@ def start_listen(args: TaskiqArgs) -> None:
88113
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
89114

90115

91-
def run_worker(args: TaskiqArgs) -> None: # noqa: C901, WPS210, WPS213
116+
def watch_workers_restarts(args: TaskiqArgs) -> None:
117+
"""
118+
Infinate loop for main process.
119+
120+
This loop restarts worker processes
121+
if they exit with error returncodes.
122+
123+
:param args: cli arguements.
124+
"""
125+
global worker_processes # noqa: WPS420
126+
global restart_workers # noqa: WPS420
127+
128+
while worker_processes and restart_workers:
129+
# List of processes to remove.
130+
sleep(1)
131+
process_to_remove = []
132+
for worker_id, worker in enumerate(worker_processes):
133+
if worker.is_alive():
134+
continue
135+
if worker.exitcode is not None and worker.exitcode > 0 and restart_workers:
136+
logger.info("Trying to restart the worker-%s" % worker_id)
137+
worker_processes[worker_id] = Process(
138+
target=start_listen,
139+
kwargs={"args": args},
140+
name=f"worker-{worker_id}",
141+
)
142+
worker_processes[worker_id].start()
143+
else:
144+
logger.info("Worker-%s has finished." % worker_id)
145+
worker.join()
146+
process_to_remove.append(worker)
147+
148+
for dead_process in process_to_remove:
149+
worker_processes.remove(dead_process)
150+
151+
152+
def run_worker(args: TaskiqArgs) -> None:
92153
"""
93154
This function starts worker processes.
94155
@@ -102,7 +163,9 @@ def run_worker(args: TaskiqArgs) -> None: # noqa: C901, WPS210, WPS213
102163
format=("[%(asctime)s][%(levelname)-7s][%(processName)s] %(message)s"),
103164
)
104165
logger.info("Starting %s worker processes." % args.workers)
105-
worker_processes: List[Process] = []
166+
167+
global worker_processes # noqa: WPS420
168+
106169
for process in range(args.workers):
107170
work_proc = Process(
108171
target=start_listen,
@@ -119,51 +182,7 @@ def run_worker(args: TaskiqArgs) -> None: # noqa: C901, WPS210, WPS213
119182
)
120183
worker_processes.append(work_proc)
121184

122-
# This flag signalizes that we do need to restart processes.
123-
do_restarts = True
124-
125-
def signal_handler(_signal: int, _frame: Any) -> None:
126-
"""
127-
This handler is used only by main process.
128-
129-
If the OS sent you SIGINT or SIGTERM,
130-
we should kill all spawned processes.
131-
132-
:param _signal: incoming signal.
133-
:param _frame: current execution frame.
134-
"""
135-
nonlocal do_restarts # noqa: WPS420
136-
nonlocal worker_processes # noqa: WPS420
137-
138-
do_restarts = False # noqa: WPS442
139-
for process in worker_processes: # noqa: WPS442
140-
# This is how we send SIGTERM to child
141-
# processes.
142-
process.terminate()
143-
process.join()
144-
145185
signal.signal(signal.SIGINT, signal_handler)
146186
signal.signal(signal.SIGTERM, signal_handler)
147187

148-
while worker_processes and do_restarts:
149-
# List of processes to remove.
150-
sleep(1)
151-
process_to_remove = []
152-
for worker_id, worker in enumerate(worker_processes):
153-
if worker.is_alive():
154-
continue
155-
if worker.exitcode is not None and worker.exitcode > 0 and do_restarts:
156-
logger.info("Trying to restart the worker-%s" % worker_id)
157-
worker_processes[worker_id] = Process(
158-
target=start_listen,
159-
kwargs={"args": args},
160-
name=f"worker-{worker_id}",
161-
)
162-
worker_processes[worker_id].start()
163-
else:
164-
logger.info("Worker-%s has finished." % worker_id)
165-
worker.join()
166-
process_to_remove.append(worker)
167-
168-
for dead_process in process_to_remove:
169-
worker_processes.remove(dead_process)
188+
watch_workers_restarts(args=args)

0 commit comments

Comments
 (0)