Skip to content

Commit d54c945

Browse files
Move htex interchange tasks incoming thread into main thread (#3752)
This removes one of two non-main threads in the interchange - the task puller thread - and moves the behaviour there (receive a message and put it in an in-process queue) into the main thread (where that in-process queue is ultimately dequeued, anyway) This is aimed at helping with ZMQ-vs-threads issues within the interchange -- most immediately, clean shutdown #3697 performance notes: parsl-perf -t 30, my laptop, no logging before this PR, 2320 tasks/second post this PR, 2344 tasks/second cc @rjmello who expressed especial interest in this # Changed Behaviour Some performance difference, although the brief measurements above are not concerning. ## Type of change - New feature --------- Co-authored-by: Kevin Hunter Kesling <[email protected]>
1 parent 4e3e6b5 commit d54c945

File tree

1 file changed

+19
-27
lines changed

1 file changed

+19
-27
lines changed

parsl/executors/high_throughput/interchange.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ def __init__(self,
132132
self.hub_zmq_port = hub_zmq_port
133133

134134
self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)
135+
136+
# count of tasks that have been received from the submit side
137+
self.task_counter = 0
138+
139+
# count of tasks that have been sent out to worker pools
135140
self.count = 0
136141

137142
self.worker_ports = worker_ports
@@ -201,28 +206,6 @@ def get_tasks(self, count: int) -> Sequence[dict]:
201206

202207
return tasks
203208

204-
@wrap_with_logs(target="interchange")
205-
def task_puller(self) -> NoReturn:
206-
"""Pull tasks from the incoming tasks zmq pipe onto the internal
207-
pending task queue
208-
"""
209-
logger.info("Starting")
210-
task_counter = 0
211-
212-
while True:
213-
logger.debug("launching recv_pyobj")
214-
try:
215-
msg = self.task_incoming.recv_pyobj()
216-
except zmq.Again:
217-
# We just timed out while attempting to receive
218-
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize()))
219-
continue
220-
221-
logger.debug("putting message onto pending_task_queue")
222-
self.pending_task_queue.put(msg)
223-
task_counter += 1
224-
logger.debug(f"Fetched {task_counter} tasks so far")
225-
226209
def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
227210
if monitoring_radio:
228211
logger.info("Sending message {} to MonitoringHub".format(manager))
@@ -326,11 +309,6 @@ def start(self) -> None:
326309

327310
start = time.time()
328311

329-
self._task_puller_thread = threading.Thread(target=self.task_puller,
330-
name="Interchange-Task-Puller",
331-
daemon=True)
332-
self._task_puller_thread.start()
333-
334312
self._command_thread = threading.Thread(target=self._command_server,
335313
name="Interchange-Command",
336314
daemon=True)
@@ -341,6 +319,7 @@ def start(self) -> None:
341319
poller = zmq.Poller()
342320
poller.register(self.task_outgoing, zmq.POLLIN)
343321
poller.register(self.results_incoming, zmq.POLLIN)
322+
poller.register(self.task_incoming, zmq.POLLIN)
344323

345324
# These are managers which we should examine in an iteration
346325
# for scheduling a job (or maybe any other attention?).
@@ -351,6 +330,7 @@ def start(self) -> None:
351330
while not kill_event.is_set():
352331
self.socks = dict(poller.poll(timeout=poll_period))
353332

333+
self.process_task_incoming()
354334
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
355335
self.process_results_incoming(interesting_managers, monitoring_radio)
356336
self.expire_bad_managers(interesting_managers, monitoring_radio)
@@ -362,6 +342,18 @@ def start(self) -> None:
362342
logger.info(f"Processed {self.count} tasks in {delta} seconds")
363343
logger.warning("Exiting")
364344

345+
def process_task_incoming(self) -> None:
346+
"""Process incoming task message(s).
347+
"""
348+
349+
if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN:
350+
logger.debug("start task_incoming section")
351+
msg = self.task_incoming.recv_pyobj()
352+
logger.debug("putting message onto pending_task_queue")
353+
self.pending_task_queue.put(msg)
354+
self.task_counter += 1
355+
logger.debug(f"Fetched {self.task_counter} tasks so far")
356+
365357
def process_task_outgoing_incoming(
366358
self,
367359
interesting_managers: Set[bytes],

0 commit comments

Comments
 (0)