Skip to content

Commit 1821120

Browse files
Make radical bulk collector exit more explicitly (#3718)
In addition to the work happening [here](radical-cybertools/radical.pilot#3269) to address this issue #3708. This PR ensures the cleanup of threads generated by RPEX. - Bug fix (partially #3708) --------- Co-authored-by: Ben Clifford <[email protected]>
1 parent a01f7e4 commit 1821120

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

parsl/executors/radical/executor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ def __init__(self,
133133
self.resource = resource
134134
self._uid = RPEX.lower()
135135
self.bulk_mode = bulk_mode
136+
self._terminate = mt.Event()
136137
self.working_dir = working_dir
137138
self.pilot_kwargs = rpex_pilot_kwargs
138139
self.future_tasks: Dict[str, Future] = {}
@@ -532,7 +533,7 @@ def _bulk_collector(self):
532533

533534
bulk = list()
534535

535-
while True:
536+
while not self._terminate.is_set():
536537

537538
now = time.time() # time of last submission
538539

@@ -552,6 +553,9 @@ def _bulk_collector(self):
552553
if len(bulk) >= self._max_bulk_size:
553554
break
554555

556+
if self._terminate.is_set():
557+
break
558+
555559
if bulk:
556560
logger.debug('submit bulk: %d', len(bulk))
557561
self.tmgr.submit_tasks(bulk)
@@ -588,6 +592,13 @@ def submit(self, func, resource_specification, *args, **kwargs):
588592
def shutdown(self, hub=True, targets='all', block=False):
589593
"""Shutdown the executor, including all RADICAL-Pilot components."""
590594
logger.info("RadicalPilotExecutor is terminating...")
595+
596+
self._terminate.set()
597+
598+
# ensure we are in the bulk submssion mode
599+
if self.bulk_mode:
600+
self._bulk_thread.join()
601+
591602
self.session.close(download=True)
592603
logger.info("RadicalPilotExecutor is terminated.")
593604

0 commit comments

Comments
 (0)