Skip to content

Commit 47e60f0

Browse files
authored
Programmer error protection: there can be only one (#3756)
Just assert what's already happening (and _should_ be happening). Use it to remove an indent level. # Changed Behaviour Should be no changed behavior for users. ## Type of change - Code maintenance/cleanup
1 parent 572d411 commit 47e60f0

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,6 @@ def start(self):
431431
self._start_result_queue_thread()
432432
self._start_local_interchange_process()
433433

434-
logger.debug("Created result queue thread: %s", self._result_queue_thread)
435-
436434
self.initialize_scaling()
437435

438436
@wrap_with_logs
@@ -529,6 +527,8 @@ def _start_local_interchange_process(self) -> None:
529527
get the worker task and result ports that the interchange has bound to.
530528
"""
531529

530+
assert self.interchange_proc is None, f"Already exists! {self.interchange_proc!r}"
531+
532532
interchange_config = {"client_address": self.loopback_address,
533533
"client_ports": (self.outgoing_q.port,
534534
self.incoming_q.port,
@@ -563,23 +563,26 @@ def _start_local_interchange_process(self) -> None:
563563
except CommandClientTimeoutError:
564564
logger.error("Interchange has not completed initialization. Aborting")
565565
raise Exception("Interchange failed to start")
566-
logger.debug("Got worker ports")
566+
logger.debug(
567+
"Interchange process started (%r). Worker ports: %d, %d",
568+
self.interchange_proc,
569+
self.worker_task_port,
570+
self.worker_result_port
571+
)
567572

568573
def _start_result_queue_thread(self):
569574
"""Method to start the result queue thread as a daemon.
570575
571576
Checks if a thread already exists, then starts it.
572577
Could be used later as a restart if the result queue thread dies.
573578
"""
574-
if self._result_queue_thread is None:
575-
logger.debug("Starting result queue thread")
576-
self._result_queue_thread = threading.Thread(target=self._result_queue_worker, name="HTEX-Result-Queue-Thread")
577-
self._result_queue_thread.daemon = True
578-
self._result_queue_thread.start()
579-
logger.debug("Started result queue thread")
579+
assert self._result_queue_thread is None, f"Already exists! {self._result_queue_thread!r}"
580580

581-
else:
582-
logger.error("Result queue thread already exists, returning")
581+
logger.debug("Starting result queue thread")
582+
self._result_queue_thread = threading.Thread(target=self._result_queue_worker, name="HTEX-Result-Queue-Thread")
583+
self._result_queue_thread.daemon = True
584+
self._result_queue_thread.start()
585+
logger.debug("Started result queue thread: %r", self._result_queue_thread)
583586

584587
def hold_worker(self, worker_id: str) -> None:
585588
"""Puts a worker on hold, preventing scheduling of additional tasks to it.

0 commit comments

Comments
 (0)