Skip to content

Commit 572d411

Browse files
authored
Remove HTEX interchange control channel thread into main thread (#3753)
# Description This is a follow on from #3752 - see that PR for some motivation # Changed Behaviour some performance reduction ## Type of change - New feature
1 parent d54c945 commit 572d411

File tree

1 file changed

+55
-69
lines changed

1 file changed

+55
-69
lines changed

parsl/executors/high_throughput/interchange.py

Lines changed: 55 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import sys
1010
import threading
1111
import time
12-
from typing import Any, Dict, List, NoReturn, Optional, Sequence, Set, Tuple, cast
12+
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast
1313

1414
import zmq
1515

@@ -217,79 +217,68 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender
217217

218218
monitoring_radio.send((MessageType.NODE_INFO, d))
219219

220-
@wrap_with_logs(target="interchange")
221-
def _command_server(self) -> NoReturn:
220+
def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) -> None:
222221
""" Command server to run async command to the interchange
223222
"""
224-
logger.debug("Command Server Starting")
225-
226-
if self.hub_address is not None and self.hub_zmq_port is not None:
227-
logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
228-
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
229-
else:
230-
monitoring_radio = None
223+
logger.debug("entering command_server section")
231224

232225
reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)
233226

234-
while True:
235-
try:
236-
command_req = self.command_channel.recv_pyobj()
237-
logger.debug("Received command request: {}".format(command_req))
238-
if command_req == "CONNECTED_BLOCKS":
239-
reply = self.connected_block_history
240-
241-
elif command_req == "WORKERS":
242-
num_workers = 0
243-
for manager in self._ready_managers.values():
244-
num_workers += manager['worker_count']
245-
reply = num_workers
246-
247-
elif command_req == "MANAGERS":
248-
reply = []
249-
for manager_id in self._ready_managers:
250-
m = self._ready_managers[manager_id]
251-
idle_since = m['idle_since']
252-
if idle_since is not None:
253-
idle_duration = time.time() - idle_since
254-
else:
255-
idle_duration = 0.0
256-
resp = {'manager': manager_id.decode('utf-8'),
257-
'block_id': m['block_id'],
258-
'worker_count': m['worker_count'],
259-
'tasks': len(m['tasks']),
260-
'idle_duration': idle_duration,
261-
'active': m['active'],
262-
'parsl_version': m['parsl_version'],
263-
'python_version': m['python_version'],
264-
'draining': m['draining']}
265-
reply.append(resp)
266-
267-
elif command_req.startswith("HOLD_WORKER"):
268-
cmd, s_manager = command_req.split(';')
269-
manager_id = s_manager.encode('utf-8')
270-
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
271-
if manager_id in self._ready_managers:
272-
m = self._ready_managers[manager_id]
273-
m['active'] = False
274-
self._send_monitoring_info(monitoring_radio, m)
227+
if self.command_channel in self.socks and self.socks[self.command_channel] == zmq.POLLIN:
228+
229+
command_req = self.command_channel.recv_pyobj()
230+
logger.debug("Received command request: {}".format(command_req))
231+
if command_req == "CONNECTED_BLOCKS":
232+
reply = self.connected_block_history
233+
234+
elif command_req == "WORKERS":
235+
num_workers = 0
236+
for manager in self._ready_managers.values():
237+
num_workers += manager['worker_count']
238+
reply = num_workers
239+
240+
elif command_req == "MANAGERS":
241+
reply = []
242+
for manager_id in self._ready_managers:
243+
m = self._ready_managers[manager_id]
244+
idle_since = m['idle_since']
245+
if idle_since is not None:
246+
idle_duration = time.time() - idle_since
275247
else:
276-
logger.warning("Worker to hold was not in ready managers list")
277-
278-
reply = None
248+
idle_duration = 0.0
249+
resp = {'manager': manager_id.decode('utf-8'),
250+
'block_id': m['block_id'],
251+
'worker_count': m['worker_count'],
252+
'tasks': len(m['tasks']),
253+
'idle_duration': idle_duration,
254+
'active': m['active'],
255+
'parsl_version': m['parsl_version'],
256+
'python_version': m['python_version'],
257+
'draining': m['draining']}
258+
reply.append(resp)
259+
260+
elif command_req.startswith("HOLD_WORKER"):
261+
cmd, s_manager = command_req.split(';')
262+
manager_id = s_manager.encode('utf-8')
263+
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
264+
if manager_id in self._ready_managers:
265+
m = self._ready_managers[manager_id]
266+
m['active'] = False
267+
self._send_monitoring_info(monitoring_radio, m)
268+
else:
269+
logger.warning("Worker to hold was not in ready managers list")
279270

280-
elif command_req == "WORKER_PORTS":
281-
reply = (self.worker_task_port, self.worker_result_port)
271+
reply = None
282272

283-
else:
284-
logger.error(f"Received unknown command: {command_req}")
285-
reply = None
273+
elif command_req == "WORKER_PORTS":
274+
reply = (self.worker_task_port, self.worker_result_port)
286275

287-
logger.debug("Reply: {}".format(reply))
288-
self.command_channel.send_pyobj(reply)
276+
else:
277+
logger.error(f"Received unknown command: {command_req}")
278+
reply = None
289279

290-
except zmq.Again:
291-
logger.debug("Command thread is alive")
292-
continue
280+
logger.debug("Reply: {}".format(reply))
281+
self.command_channel.send_pyobj(reply)
293282

294283
@wrap_with_logs
295284
def start(self) -> None:
@@ -309,17 +298,13 @@ def start(self) -> None:
309298

310299
start = time.time()
311300

312-
self._command_thread = threading.Thread(target=self._command_server,
313-
name="Interchange-Command",
314-
daemon=True)
315-
self._command_thread.start()
316-
317301
kill_event = threading.Event()
318302

319303
poller = zmq.Poller()
320304
poller.register(self.task_outgoing, zmq.POLLIN)
321305
poller.register(self.results_incoming, zmq.POLLIN)
322306
poller.register(self.task_incoming, zmq.POLLIN)
307+
poller.register(self.command_channel, zmq.POLLIN)
323308

324309
# These are managers which we should examine in an iteration
325310
# for scheduling a job (or maybe any other attention?).
@@ -330,6 +315,7 @@ def start(self) -> None:
330315
while not kill_event.is_set():
331316
self.socks = dict(poller.poll(timeout=poll_period))
332317

318+
self.process_command(monitoring_radio)
333319
self.process_task_incoming()
334320
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
335321
self.process_results_incoming(interesting_managers, monitoring_radio)

0 commit comments

Comments
 (0)