diff --git a/parsl/__init__.py b/parsl/__init__.py index 5baafb9e6e..551e0f796b 100644 --- a/parsl/__init__.py +++ b/parsl/__init__.py @@ -15,9 +15,6 @@ """ import logging -import multiprocessing as _multiprocessing -import os -import platform from parsl.app.app import bash_app, join_app, python_app from parsl.config import Config @@ -32,9 +29,6 @@ from parsl.monitoring import MonitoringHub from parsl.version import VERSION -if platform.system() == 'Darwin': - _multiprocessing.set_start_method('fork', force=True) - __author__ = 'The Parsl Team' __version__ = VERSION @@ -74,6 +68,3 @@ logging.getLogger('parsl').addHandler(logging.NullHandler()) - -if platform.system() == 'Darwin': - os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES' diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 26cd893424..a55735a296 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -346,9 +346,9 @@ def start(self, exception_happened = False while (not self._kill_event.is_set() or - self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or - self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or - resource_queue.qsize() != 0): + not self.pending_priority_queue.empty() or not self.pending_resource_queue.empty() or + not self.pending_node_queue.empty() or not self.pending_block_queue.empty() or + not resource_queue.empty()): """ WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages) @@ -357,9 +357,9 @@ def start(self, try: logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format( self._kill_event.is_set(), - self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0, - self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0, - resource_queue.qsize() != 0)) + not self.pending_priority_queue.empty(), not self.pending_resource_queue.empty(), + not self.pending_node_queue.empty(), not self.pending_block_queue.empty(), + not resource_queue.empty())) # This is the list of resource messages which can be reprocessed as if they # had just arrived because the corresponding first task message has been @@ -558,9 +558,9 @@ def start(self, def _migrate_logs_to_internal(self, logs_queue: mpq.Queue, kill_event: threading.Event) -> None: logger.info("Starting _migrate_logs_to_internal") - while not kill_event.is_set() or logs_queue.qsize() != 0: + while not kill_event.is_set() or not logs_queue.empty(): logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s", - kill_event.is_set(), logs_queue.qsize() != 0) + kill_event.is_set(), not logs_queue.empty()) try: x = logs_queue.get(timeout=0.1) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index b4a005e21c..8020a82576 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -11,9 +11,9 @@ from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ( - SizedQueue, SpawnEvent, SpawnProcess, + SpawnQueue, join_terminate_close_proc, ) from parsl.utils import RepresentationMixin @@ -126,7 +126,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.monitoring_hub_active = True self.resource_msgs: Queue[TaggedMonitoringMessage] - self.resource_msgs = SizedQueue() + self.resource_msgs = SpawnQueue() self.dbm_exit_event: ms.Event self.dbm_exit_event = SpawnEvent() diff --git a/parsl/monitoring/radios/udp_router.py b/parsl/monitoring/radios/udp_router.py index 4fddef23b7..edfac5eedb 100644 --- a/parsl/monitoring/radios/udp_router.py +++ b/parsl/monitoring/radios/udp_router.py @@ -21,9 +21,9 @@ from parsl.monitoring.radios.base import MonitoringRadioReceiver from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.multiprocessing import ( - SizedQueue, SpawnEvent, SpawnProcess, + SpawnQueue, join_terminate_close_proc, ) from parsl.process_loggers import wrap_with_logs @@ -198,7 +198,7 @@ def start_udp_receiver(*, hmac_digest: str) -> UDPRadioReceiver: udp_comm_q: Queue[Union[int, str]] - udp_comm_q = SizedQueue(maxsize=10) + udp_comm_q = SpawnQueue(maxsize=10) router_exit_event = SpawnEvent() diff --git a/parsl/monitoring/radios/zmq_router.py b/parsl/monitoring/radios/zmq_router.py index d715023cce..a557dd8228 100644 --- a/parsl/monitoring/radios/zmq_router.py +++ b/parsl/monitoring/radios/zmq_router.py @@ -19,9 +19,9 @@ from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ( - SizedQueue, SpawnEvent, SpawnProcess, + SpawnQueue, join_terminate_close_proc, ) from parsl.process_loggers import wrap_with_logs @@ -158,7 +158,7 @@ def start_zmq_receiver(*, port_range: Tuple[int, int], logdir: str, worker_debug: bool) -> ZMQRadioReceiver: - comm_q = SizedQueue(maxsize=10) + comm_q = SpawnQueue(maxsize=10) router_exit_event = SpawnEvent() diff --git a/parsl/multiprocessing.py b/parsl/multiprocessing.py index ee3ae420ce..1ed6c94233 100644 --- a/parsl/multiprocessing.py +++ b/parsl/multiprocessing.py @@ -4,7 +4,6 @@ import logging import multiprocessing import multiprocessing.queues -import platform from multiprocessing.context import ForkProcess as ForkProcessType from multiprocessing.context import SpawnProcess as SpawnProcessType from typing import Callable @@ -21,54 +20,6 @@ SpawnQueue = SpawnContext.Queue -class MacSafeQueue(multiprocessing.queues.Queue): - """ Multiprocessing queues do not have qsize attributes on MacOS. - This is slower but more portable version of the multiprocessing Queue - that adds a explicit counter - - Reference : https://github.com/keras-team/autokeras/commit/4ddd568b06b4045ace777bc0fb7bc18573b85a75 - """ - - def __init__(self, *args, **kwargs): - if 'ctx' not in kwargs: - kwargs['ctx'] = multiprocessing.get_context('spawn') - super().__init__(*args, **kwargs) - self._counter = multiprocessing.Value('i', 0) - - def put(self, *args, **kwargs): - # logger.critical("Putting item {}".format(args)) - x = super().put(*args, **kwargs) - with self._counter.get_lock(): - self._counter.value += 1 - return x - - def get(self, *args, **kwargs): - x = super().get(*args, **kwargs) - with self._counter.get_lock(): - self._counter.value -= 1 - # logger.critical("Getting item {}".format(x)) - return x - - def qsize(self): - return self._counter.value - - def empty(self): - return not self._counter.value - - -# SizedQueue should be constructable using the same calling -# convention as multiprocessing.Queue but that entire signature -# isn't expressible in mypy 0.790 -SizedQueue: Callable[..., multiprocessing.Queue] - - -if platform.system() != 'Darwin': - import multiprocessing - SizedQueue = SpawnQueue -else: - SizedQueue = MacSafeQueue - - def join_terminate_close_proc(process: SpawnProcessType, *, timeout: int = 30) -> None: """Increasingly aggressively terminate a process. diff --git a/parsl/tests/test_regression/test_854.py b/parsl/tests/test_regression/test_854.py deleted file mode 100644 index e11d7fe8ac..0000000000 --- a/parsl/tests/test_regression/test_854.py +++ /dev/null @@ -1,62 +0,0 @@ -import multiprocessing -import random -import time - -import pytest - -from parsl.multiprocessing import MacSafeQueue - - -def consumer(in_q, out_q, delay=0): - while True: - x = in_q.get() - time.sleep(delay) - if x == 'STOP': - out_q.put('STOPPED') - break - else: - out_q.put(x) - - -@pytest.mark.local -def test_mac_safe_queue(): - """ Regression test for HTEX being broken on Mac OS: https://github.com/Parsl/parsl/issues/854 - This test doesn't test the fix on mac's however it tests a multiprocessing queue replacement - that is safe to run on Mac OS. - """ - task_q = MacSafeQueue() - result_q = MacSafeQueue() - - p = multiprocessing.Process(target=consumer, args=(task_q, result_q,)) - p.start() - for i in range(10): - task_q.put(i) - result_q.get() - task_q.put('STOP') - r = result_q.get() - assert r == 'STOPPED', "Did not get stopped confirmation, got:{}".format(r) - p.terminate() - - -@pytest.mark.local -def test_mac_safe_queue_size(): - """ Regression test for HTEX being broken on Mac OS: https://github.com/Parsl/parsl/issues/854 - This test doesn't test the fix on mac's however it tests a multiprocessing queue replacement - that is safe to run on Mac OS. - """ - task_q = MacSafeQueue() - result_q = MacSafeQueue() - - x = random.randint(1, 100) - - [task_q.put(i) for i in range(x)] - assert task_q.empty() is False, "Task queue should not be empty" - assert task_q.qsize() == x, "Task queue should be {}; instead got {}".format(x, task_q.qsize()) - - p = multiprocessing.Process(target=consumer, args=(task_q, result_q,)) - p.start() - task_q.put('STOP') - p.join() - assert result_q.empty() is False, "Result queue should not be empty" - qlen = result_q.qsize() - assert qlen == x + 1, "Result queue should be {}; instead got {}".format(x + 1, qlen)