From 75f6067e28c27780294eb08e1dbe1226ff9b6c8f Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 1 Sep 2022 18:33:18 +1000 Subject: [PATCH 1/7] Add multiprocessing queue shutdown * Include docs --- Doc/library/multiprocessing.rst | 24 +++++++++++++++++++-- Lib/multiprocessing/queues.py | 26 ++++++++++++++++++++++- Lib/test/_test_multiprocessing.py | 35 +++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 8454296b815b41..4e8e91749cc81e 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -845,7 +845,8 @@ For an example of the usage of queues for interprocess communication see free slot was available within that time. Otherwise (*block* is ``False``), put an item on the queue if a free slot is immediately available, else raise the :exc:`queue.Full` exception (*timeout* is - ignored in that case). + ignored in that case). Raises :exc:`ShutDown` if the queue has been shut + down. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -863,7 +864,9 @@ For an example of the usage of queues for interprocess communication see it blocks at most *timeout* seconds and raises the :exc:`queue.Empty` exception if no item was available within that time. Otherwise (block is ``False``), return an item if one is immediately available, else raise the - :exc:`queue.Empty` exception (*timeout* is ignored in that case). + :exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises + :exc:`queue.ShutDown` if the queue has been shut down and is empty, or if + the queue has been shut down immediately. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -873,6 +876,19 @@ For an example of the usage of queues for interprocess communication see Equivalent to ``get(False)``. + .. method:: shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise + :exc:`queue.ShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.12 + :class:`multiprocessing.Queue` has a few additional methods not found in :class:`queue.Queue`. These methods are usually unnecessary for most code: @@ -962,6 +978,8 @@ For an example of the usage of queues for interprocess communication see Raises a :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`queue.ShutDown` if the queue has been shut down immediately. + .. method:: join() @@ -973,6 +991,8 @@ For an example of the usage of queues for interprocess communication see it is complete. When the count of unfinished tasks drops to zero, :meth:`~queue.Queue.join` unblocks. + Raises :exc:`queue.ShutDown` if the queue has been shut down immediately. + Miscellaneous ~~~~~~~~~~~~~ diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index daf9ee94a19431..f9854cc1b7e227 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -17,8 +17,9 @@ import types import weakref import errno +import ctypes -from queue import Empty, Full +from queue import Empty, Full, ShutDown import _multiprocessing @@ -28,6 +29,10 @@ from .util import debug, info, Finalize, register_after_fork, is_exiting +_queue_alive = 0 +_queue_shutdown = 1 +_queue_shutdown_immediate = 2 + # # Queue type using a pipe, buffer and thread # @@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset() + self._shutdown_state = context._default_context.Value( + ctypes.c_uint8, lock=self._rlock + ) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -86,20 +94,28 @@ def _reset(self, after_fork=False): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if self._shutdown_state.value != _queue_alive: + raise ShutDown if not self._sem.acquire(block, timeout): raise Full with self._notempty: + if self._shutdown_state.value != _queue_alive: + raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() def get(self, block=True, timeout=None): + if self._shutdown_state.value == _queue_shutdown_immediate: + raise ShutDown if self._closed: raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: + if self._shutdown_state.value != _queue_alive: + raise ShutDown res = self._recv_bytes() self._sem.release() else: @@ -111,13 +127,19 @@ def get(self, block=True, timeout=None): if block: timeout = deadline - time.monotonic() if not self._poll(timeout): + if self._shutdown_state.value != _queue_alive: + raise ShutDown raise Empty + if self._shutdown_state.value != _queue_alive : + raise ShutDown elif not self._poll(): raise Empty res = self._recv_bytes() self._sem.release() finally: self._rlock.release() + if self._shutdown_state.value == _queue_shutdown: + raise ShutDown # unserialize the data after having released the lock return _ForkingPickler.loads(res) @@ -329,6 +351,8 @@ def task_done(self): def join(self): with self._cond: + if self._shutdown_state.value == _queue_shutdown_immediate: + return if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 9a2db24b4bd597..5d525257a9ad30 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1277,6 +1277,41 @@ def test_closed_queue_put_get_exceptions(self): q.put('foo') with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + + def test_shutdown_empty(self): + q = multiprocessing.Queue() + q.shutdown() + try: + q.put("data") + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass + + def test_shutdown_nonempty(self): + q = multiprocessing.Queue() + q.put("data") + q.shutdown() + q.get() + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass + + def test_shutdown_immediate(self): + q = multiprocessing.Queue() + q.put("data") + q.shutdown(immediate=True) + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass # # # From 3dd24152aa11b82ae894b604b60f33786b433a5e Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:52:33 +0100 Subject: [PATCH 2/7] Fix queue shutdown * Remove ctypes import (use string identifiers for value types) * Use queue's context to get shared value for queue state * Include queue state in pickle * Factor out queue-state checks and updates to methods * Logic fixes in put and get * Move shutdown method to before close * Raise when shutting down closed queue * Don't re-notify if immediately shutting down a queue already immediately shut-down * Support shutdown in JoinableQueue * Handle in task_done and join * Logic fixes in put and shutdown * Updated tests * Document feature added in 3.13 --- Doc/library/multiprocessing.rst | 2 +- Lib/multiprocessing/queues.py | 87 +++++++--- Lib/test/_test_multiprocessing.py | 271 +++++++++++++++++++++++++++--- 3 files changed, 315 insertions(+), 45 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 4e8e91749cc81e..7f1835e98ef91e 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -887,7 +887,7 @@ For an example of the usage of queues for interprocess communication see All blocked callers of put() will be unblocked, and also get() and join() if *immediate* is true. - .. versionadded:: 3.12 + .. versionadded:: 3.13 :class:`multiprocessing.Queue` has a few additional methods not found in :class:`queue.Queue`. These methods are usually unnecessary for most diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index f9854cc1b7e227..c9d5d4b567b4cd 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -17,7 +17,6 @@ import types import weakref import errno -import ctypes from queue import Empty, Full, ShutDown @@ -55,9 +54,7 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset() - self._shutdown_state = context._default_context.Value( - ctypes.c_uint8, lock=self._rlock - ) + self._shutdown_state = ctx.Value('i', _queue_alive) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -65,11 +62,13 @@ def __init__(self, maxsize=0, *, ctx): def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) = state + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) = state self._reset() def _after_fork(self): @@ -91,30 +90,45 @@ def _reset(self, after_fork=False): self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll + def _is_alive(self): + return self._shutdown_state.value == _queue_alive + + def _is_shutdown(self): + return self._shutdown_state.value == _queue_shutdown + + def _is_shutdown_immediate(self): + return self._shutdown_state.value == _queue_shutdown_immediate + + def _set_shutdown(self): + self._shutdown_state.value = _queue_shutdown + + def _set_shutdown_immediate(self): + self._shutdown_state.value = _queue_shutdown_immediate + def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if self._shutdown_state.value != _queue_alive: + if not self._is_alive(): raise ShutDown if not self._sem.acquire(block, timeout): + if not self._is_alive(): + raise ShutDown raise Full with self._notempty: - if self._shutdown_state.value != _queue_alive: - raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() def get(self, block=True, timeout=None): - if self._shutdown_state.value == _queue_shutdown_immediate: - raise ShutDown if self._closed: raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: - if self._shutdown_state.value != _queue_alive: + # checks shutdown state + if (self._is_shutdown_immediate() + or (self._is_shutdown() and self.empty())): raise ShutDown res = self._recv_bytes() self._sem.release() @@ -122,24 +136,31 @@ def get(self, block=True, timeout=None): if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout): + if (self._is_shutdown_immediate() + or (self._is_shutdown() and self.empty())): + raise ShutDown raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout): - if self._shutdown_state.value != _queue_alive: + if not self._is_alive(): raise ShutDown raise Empty - if self._shutdown_state.value != _queue_alive : - raise ShutDown elif not self._poll(): + if not self._is_alive(): + raise ShutDown raise Empty + + # here queue is not empty + if self._is_shutdown_immediate(): + raise ShutDown + # here shutdown state queue is alive or shutdown res = self._recv_bytes() self._sem.release() finally: self._rlock.release() - if self._shutdown_state.value == _queue_shutdown: - raise ShutDown + # unserialize the data after having released the lock return _ForkingPickler.loads(res) @@ -159,6 +180,19 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def shutdown(self, immediate=False): + if self._closed: + raise ValueError(f"Queue {self!r} is closed") + with self._shutdown_state.get_lock(): + if self._is_shutdown_immediate(): + return + if immediate: + self._set_shutdown_immediate() + with self._notempty: + self._notempty.notify_all() + else: + self._set_shutdown() + def close(self): self._closed = True close = self._close @@ -332,7 +366,11 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if not self._is_alive(): + raise ShutDown if not self._sem.acquire(block, timeout): + if not self._is_alive(): + raise ShutDown raise Full with self._notempty, self._cond: @@ -344,6 +382,8 @@ def put(self, obj, block=True, timeout=None): def task_done(self): with self._cond: + if self._is_shutdown_immediate(): + raise ShutDown if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): @@ -351,10 +391,19 @@ def task_done(self): def join(self): with self._cond: - if self._shutdown_state.value == _queue_shutdown_immediate: - return + if self._is_shutdown_immediate(): + raise ShutDown if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() + if self._is_shutdown_immediate(): + raise ShutDown + + def shutdown(self, immediate=False): + with self._cond: + is_alive = self._is_alive() + super().shutdown(immediate) + if is_alive: + self._cond.notify_all() # # Simplified Queue type -- really just a locked pipe diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 5d525257a9ad30..ae76c697c71ac1 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1279,39 +1279,260 @@ def test_closed_queue_put_get_exceptions(self): q.get() def test_shutdown_empty(self): - q = multiprocessing.Queue() - q.shutdown() - try: - q.put("data") - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass - try: - q.get() - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + q.shutdown() + _wait() + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.put("data") + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.get() def test_shutdown_nonempty(self): - q = multiprocessing.Queue() - q.put("data") - q.shutdown() - q.get() - try: + for q in multiprocessing.Queue(1), multiprocessing.JoinableQueue(1): + q.put("data") + q.shutdown() + _wait() q.get() - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.get() def test_shutdown_immediate(self): - q = multiprocessing.Queue() - q.put("data") - q.shutdown(immediate=True) + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + q.put("data") + q.shutdown(immediate=True) + _wait() + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.get() + + def test_shutdown_allowed_transitions(self): + # allowed transitions would be from `alive`` via `shutdown` to `shutdown_immediate`` + mod_q = multiprocessing.queues + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + self.assertEqual(mod_q._queue_alive, q._shutdown_state.value) + + # default -> immediate=False + q.shutdown() + self.assertEqual(mod_q._queue_shutdown, q._shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual(mod_q._queue_shutdown_immediate, q._shutdown_state.value) + + q.shutdown(immediate=False) + self.assertNotEqual(mod_q._queue_shutdown, q._shutdown_state.value) + + def _shutdown_all_methods_in_one_process(self, immediate): + # part 1: Queue + q = multiprocessing.Queue(2) + q.put("L") + _wait() # Give time to simulate many processes + q.put_nowait("O") + q.shutdown(immediate) + _wait() # simulate time of synchro primitive + + with self.assertRaises(pyqueue.ShutDown): + q.put("E") + with self.assertRaises(pyqueue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(pyqueue.ShutDown): + q.get() + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() + else: + # Neither `task_done`, neither `join`methods` to test + self.assertEqual(q.get(), "L") + self.assertEqual(q.get_nowait(), "O") + _wait() + + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(pyqueue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() # q.get(False) + with self.assertRaises(pyqueue.ShutDown): + q.get(True, 1.0) + + # part 2: JoinableQueue + q = multiprocessing.JoinableQueue(2) + q.put("L") + _wait() + q.put_nowait("O") + q.shutdown(immediate) + _wait() + + with self.assertRaises(pyqueue.ShutDown): + q.put("E") + with self.assertRaises(pyqueue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(pyqueue.ShutDown): + q.get() + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() + with self.assertRaises(pyqueue.ShutDown): + q.task_done() + with self.assertRaises(pyqueue.ShutDown): + q.join() + else: + self.assertEqual(q.get(), "L") + q.task_done() + _wait() + self.assertEqual(q.get(), "O") + q.task_done() + _wait() + q.join() + # when `shutdown` queue is empty, should raise ShutDown Exception + with self.assertRaises(pyqueue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() # p.get(False) + with self.assertRaises(pyqueue.ShutDown): + q.get(True, 1.0) + + def test_shutdown_all_methods_in_one_process(self): + return self._shutdown_all_methods_in_one_process(False) + + def test_shutdown_immediate_all_methods_in_one_process(self): + return self._shutdown_all_methods_in_one_process(True) + + @classmethod + def _write_msg_process(cls, q, n, results, delay, + i_when_exec_shutdown, + event_start, event_end): + event_start.wait() + for i in range(1, n+1): + try: + q.put((i, "YDLO")) + results.append(True) + except pyqueue.ShutDown: + results.append(False) + # triggers shutdown of queue + if i == i_when_exec_shutdown: + event_end.set() + time.sleep(delay) + # end of all puts + if isinstance(q, type(multiprocessing.JoinableQueue())): + try: + q.join() + except pyqueue.ShutDown: + pass + + @classmethod + def _read_msg_process(cls, q, nb, results, delay, event_start): + event_start.wait() + block = True + while nb: + time.sleep(delay) + try: + # Get at least one message + q.get(block) + block = False + if isinstance(q, type(multiprocessing.JoinableQueue())): + q.task_done() + results.append(True) + nb -= 1 + except pyqueue.ShutDown: + results.append(False) + nb -= 1 + except pyqueue.Empty: + pass + # end of all gets + if isinstance(q, type(multiprocessing.JoinableQueue())): + try: + q.join() + except pyqueue.ShutDown: + pass + + @classmethod + def _shutdown_process(cls, q, event_end, immediate): + event_end.wait() + q.shutdown(immediate) + if isinstance(q, type(multiprocessing.JoinableQueue())): + try: + q.join() + except pyqueue.ShutDown: + pass + + @classmethod + def _join_process(cls, q, delay, event_start): + event_start.wait() + time.sleep(delay) try: - q.get() - self.fail("Didn't appear to shut-down queue") + q.join() except pyqueue.ShutDown: pass + + #@classmethod + def _shutdown_all_methods_in_many_processes(self, immediate): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + ps = [] + ev_start = multiprocessing.Event() + ev_exec_shutdown = multiprocessing.Event() + m = multiprocessing.Manager() + res_puts = m.list() + res_gets = m.list() + delay = 1e-4 + read_process = 4 + nb_msgs = read_process * 16 + nb_msgs_r = nb_msgs // read_process + when_exec_shutdown = nb_msgs // 2 + if isinstance(q, type(multiprocessing.Queue())): + lprocs = ( + (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_process, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)), + ) + else: + # add 2 self._join process processes + lprocs = ( + (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_process, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._join_process, 2, (q, delay*2, ev_start)), + (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)), + ) + # start all processes + for func, n, args in lprocs: + for i in range(n): + ps.append(multiprocessing.Process(target=func, args=args)) + ps[-1].start() + # set event in order to run q.shutdown() + ev_start.set() + _wait() + # wait + if isinstance(q, type(multiprocessing.Queue())): + for p in ps: + p.join() + + if not immediate: + self.assertTrue(q.empty()) + self.assertEqual(res_gets.count(True), res_puts.count(True)) + else: + self.assertTrue(res_gets.count(True) <= res_puts.count(True)) + + def test_shutdown_all_methods_in_many_processes(self): + return self._shutdown_all_methods_in_many_processes(False) + + def test_shutdown_immediate_all_methods_in_many_processes(self): + return self._shutdown_all_methods_in_many_processes(True) + + # # # From 7a8569556de24bf0987ff49a0ef5cd78ae03bb52 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 6 May 2023 05:17:07 +0000 Subject: [PATCH 3/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst diff --git a/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst b/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst new file mode 100644 index 00000000000000..e70049dc26f218 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst @@ -0,0 +1 @@ +Add :class:`multiprocessing.Queue` and :class:`multiprocessing.JoinableQueue` termination with ``shutdown`` method. From 82364c2cffcba5efad1b2e414fb3740c6d27e735 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 9 May 2023 20:36:08 +1000 Subject: [PATCH 4/7] Add ShutDown exception to threading queues module --- Lib/queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/queue.py b/Lib/queue.py index 55f50088460f9e..b49c8945d0f4b8 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -25,6 +25,10 @@ class Full(Exception): pass +class ShutDown(Exception): + '''Raised when put/get with shut-down queue.''' + + class Queue: '''Create a queue object with a given maximum size. From f5fcdd6be4a26c9c70edd5ef5f235b446dcf9e97 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 20 Feb 2024 21:27:10 +1000 Subject: [PATCH 5/7] Add references in docs and news entry --- Doc/library/multiprocessing.rst | 14 ++++++++------ .../2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 35ceaf7f46319f..e7fc9df595300b 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -884,14 +884,16 @@ For an example of the usage of queues for interprocess communication see .. method:: shutdown(immediate=False) - Shut-down the queue, making queue gets and puts raise - :exc:`queue.ShutDown`. + Shut-down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` + raise :exc:`queue.ShutDown`. - By default, gets will only raise once the queue is empty. Set - *immediate* to true to make gets raise immediately instead. + By default, :meth:`~Queue.get` on a shut down queue will only raise once + the queue is empty. Set *immediate* to true to make :meth:`~Queue.get` + raise immediately instead. - All blocked callers of put() will be unblocked, and also get() - and join() if *immediate* is true. + All blocked callers of :meth:`~Queue.put` will be unblocked. If + *immediate* is true, also unblock callers of :meth:`~Queue.get` and + :meth:`~Queue.join`. .. versionadded:: 3.13 diff --git a/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst b/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst index e70049dc26f218..3ff73e83ea0068 100644 --- a/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst +++ b/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst @@ -1 +1,2 @@ -Add :class:`multiprocessing.Queue` and :class:`multiprocessing.JoinableQueue` termination with ``shutdown`` method. +Add :class:`multiprocessing.Queue` and :class:`multiprocessing.JoinableQueue` +termination with :py:meth:`~multiprocessing.Queue.shutdown` method. From ee8d4df7ebd9e38fbe9ae01c8bc8881bbfa5c12d Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 22 Mar 2024 16:07:26 +1000 Subject: [PATCH 6/7] Update docs, add to what's-new --- Doc/library/multiprocessing.rst | 19 +++++++++---------- Doc/whatsnew/3.13.rst | 6 ++++++ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index e7fc9df595300b..81cdd2a8a368c7 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -851,8 +851,8 @@ For an example of the usage of queues for interprocess communication see free slot was available within that time. Otherwise (*block* is ``False``), put an item on the queue if a free slot is immediately available, else raise the :exc:`queue.Full` exception (*timeout* is - ignored in that case). Raises :exc:`ShutDown` if the queue has been shut - down. + ignored in that case). Raises the :exc:`queue.ShutDown` if the queue has + been shut down. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -871,8 +871,8 @@ For an example of the usage of queues for interprocess communication see exception if no item was available within that time. Otherwise (block is ``False``), return an item if one is immediately available, else raise the :exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises - :exc:`queue.ShutDown` if the queue has been shut down and is empty, or if - the queue has been shut down immediately. + the :exc:`queue.ShutDown` exception if the queue has been shut down and + is empty, or if the queue has been shut down immediately. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -884,14 +884,14 @@ For an example of the usage of queues for interprocess communication see .. method:: shutdown(immediate=False) - Shut-down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` + Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` raise :exc:`queue.ShutDown`. By default, :meth:`~Queue.get` on a shut down queue will only raise once the queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise immediately instead. - All blocked callers of :meth:`~Queue.put` will be unblocked. If + All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate* is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`. @@ -983,11 +983,12 @@ For an example of the usage of queues for interprocess communication see items have been processed (meaning that a :meth:`task_done` call was received for every item that had been :meth:`~Queue.put` into the queue). + ``shutdown(immediate=True)`` calls :meth:`task_done` for each remaining + item in the queue. + Raises a :exc:`ValueError` if called more times than there were items placed in the queue. - Raises :exc:`queue.ShutDown` if the queue has been shut down immediately. - .. method:: join() @@ -999,8 +1000,6 @@ For an example of the usage of queues for interprocess communication see it is complete. When the count of unfinished tasks drops to zero, :meth:`~queue.Queue.join` unblocks. - Raises :exc:`queue.ShutDown` if the queue has been shut down immediately. - Miscellaneous ^^^^^^^^^^^^^ diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index 7e6c79dbf50aac..8fc5f98ee4e08d 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -439,6 +439,12 @@ mmap the file descriptor specified by *fileno* will not be duplicated. (Contributed by Zackery Spytz and Petr Viktorin in :gh:`78502`.) +multiprocessing +--------------- + +* Add :meth:`multiprocessing.Queue.shutdown` for queue termination. + (Contributed by Laurie Opperman in :gh:`104230`.) + opcode ------ From 42fbea6323296cff903008cd1bd19b9c8ade201b Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 22 Mar 2024 17:24:16 +1000 Subject: [PATCH 7/7] WIP: consume queue on immediate shutdown --- Lib/multiprocessing/queues.py | 91 +++---- Lib/test/_test_multiprocessing.py | 393 +++++++++++++----------------- 2 files changed, 202 insertions(+), 282 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index ab1b6f44163d89..b605a05c55bda7 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -26,10 +26,6 @@ from .util import debug, info, Finalize, register_after_fork, is_exiting -_queue_alive = 0 -_queue_shutdown = 1 -_queue_shutdown_immediate = 2 - # # Queue type using a pipe, buffer and thread # @@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset() - self._shutdown_state = ctx.Value('i', _queue_alive) + self._is_shutdown = ctx.Value('B', False) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -61,12 +57,12 @@ def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid, - self._shutdown_state) + self._is_shutdown) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid, - self._shutdown_state) = state + self._is_shutdown) = state self._reset() def _after_fork(self): @@ -88,32 +84,19 @@ def _reset(self, after_fork=False): self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll - def _is_alive(self): - return self._shutdown_state.value == _queue_alive - - def _is_shutdown(self): - return self._shutdown_state.value == _queue_shutdown - - def _is_shutdown_immediate(self): - return self._shutdown_state.value == _queue_shutdown_immediate - - def _set_shutdown(self): - self._shutdown_state.value = _queue_shutdown - - def _set_shutdown_immediate(self): - self._shutdown_state.value = _queue_shutdown_immediate - def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if not self._is_alive(): + if self._is_shutdown.value: raise ShutDown if not self._sem.acquire(block, timeout): - if not self._is_alive(): + if self._is_shutdown.value: raise ShutDown raise Full with self._notempty: + if self._is_shutdown.value: + raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) @@ -124,9 +107,7 @@ def get(self, block=True, timeout=None): raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: - # checks shutdown state - if (self._is_shutdown_immediate() - or (self._is_shutdown() and self.empty())): + if self._is_shutdown.value and self.empty(): raise ShutDown res = self._recv_bytes() self._sem.release() @@ -134,26 +115,21 @@ def get(self, block=True, timeout=None): if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout): - if (self._is_shutdown_immediate() - or (self._is_shutdown() and self.empty())): + if self._is_shutdown.value and self.empty(): raise ShutDown raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout): - if not self._is_alive(): + if self._is_shutdown.value: raise ShutDown raise Empty elif not self._poll(): - if not self._is_alive(): + if self._is_shutdown.value: raise ShutDown raise Empty - # here queue is not empty - if self._is_shutdown_immediate(): - raise ShutDown - # here shutdown state queue is alive or shutdown res = self._recv_bytes() self._sem.release() finally: @@ -178,18 +154,24 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def _clear(self): + with self._rlock: + while self._poll(): + self._recv_bytes() + def shutdown(self, immediate=False): if self._closed: raise ValueError(f"Queue {self!r} is closed") - with self._shutdown_state.get_lock(): - if self._is_shutdown_immediate(): - return + with self._is_shutdown.get_lock(): + self._is_shutdown.value = True if immediate: - self._set_shutdown_immediate() - with self._notempty: - self._notempty.notify_all() - else: - self._set_shutdown() + self._clear() + # TODO: unblock all getters to check empty (then shutdown) + for _ in range(self._maxsize): + try: + self._sem.release() + except ValueError: + break def close(self): self._closed = True @@ -384,14 +366,16 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if not self._is_alive(): + if self._is_shutdown.value: raise ShutDown if not self._sem.acquire(block, timeout): - if not self._is_alive(): + if self._is_shutdown.value: raise ShutDown raise Full with self._notempty, self._cond: + if self._is_shutdown.value: + raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) @@ -400,8 +384,6 @@ def put(self, obj, block=True, timeout=None): def task_done(self): with self._cond: - if self._is_shutdown_immediate(): - raise ShutDown if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): @@ -409,18 +391,15 @@ def task_done(self): def join(self): with self._cond: - if self._is_shutdown_immediate(): - raise ShutDown if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() - if self._is_shutdown_immediate(): - raise ShutDown - def shutdown(self, immediate=False): - with self._cond: - is_alive = self._is_alive() - super().shutdown(immediate) - if is_alive: + def _clear(self): + with self._rlock: + while self._poll(): + self._recv_bytes() + self._unfinished_tasks.acquire(block=False) + with self._cond: self._cond.notify_all() # diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 66359ec08f5cd7..4aa4729a029e93 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -295,6 +295,70 @@ def get_value(self): # Testcases # +class ProcessFuture: + __slots__ = ("process", "connection") + + def __init__(self, process, connection): + self.process = process + self.connection = connection + + def __del__(self): + self.process.kill() + self.connection.close() + + @staticmethod + def _target(conn, fn, args): + try: + result = fn(*args) + except Exception as e: + try: + conn.send((False, e)) + except pickle.PicklingError: + exc = Exception(str(e)) + exc.__class__ = e.__class__ + conn.send((False, exc)) + else: + conn.send((True, result)) + conn.close() + + def _run(self, fn, *args): + def get_result(timeout=None): + process.join(timeout) + result = recv.recv() + if isinstance(result, Exception): + raise result + else: + return result + + recv, send = multiprocessing.Pipe() + process = multiprocessing.Process(target=self._target, args=(send, fn, args)) + process.start() + return process, get_result + + @classmethod + def start(cls, target, args=(), kwargs={}): + recv, send = multiprocessing.Pipe() + process = multiprocessing.Process( + target=cls._target, args=(send, target, args) + ) + process.start() + return cls(process, recv) + + @classmethod + def run(cls, target, args=(), kwargs={}): + future = cls.start(target, args, kwargs) + return future.result() + + def result(self): + self.process.join() + if not self.connection.poll(): + raise RuntimeError("Process failed to send result") + success, result = self.connection.recv() + if success: + return result + else: + raise result + class DummyCallable: def __call__(self, q, c): assert isinstance(c, DummyCallable) @@ -1340,260 +1404,137 @@ def test_closed_queue_put_get_exceptions(self): with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + @classmethod + def _join_worker(cls, q): + q.join() + + @classmethod + def _task_done_worker(cls, q): + q.task_done() + + @classmethod + def _get_worker(cls, q): + return q.get() + + def assertRaisesShutdown(self, msg="Didn't appear to shut-down queue"): + return self.assertRaises(pyqueue.ShutDown, msg=msg) + def test_shutdown_empty(self): for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): - q.shutdown() + if isinstance(q, multiprocessing.JoinableQueue): + join = ProcessFuture.start(self._join_worker, (q,)) + q.shutdown(immediate=False) # [joinable] unfinished tasks: 0 -> 0 _wait() - with self.assertRaises( - pyqueue.ShutDown, msg="Didn't appear to shut-down queue" - ): - q.put("data") - with self.assertRaises( - pyqueue.ShutDown, msg="Didn't appear to shut-down queue" - ): - q.get() + + self.assertEqual(q.qsize(), 0) + + if isinstance(q, multiprocessing.JoinableQueue): + self.assertFalse(join.process.is_alive()) + join.result() + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() def test_shutdown_nonempty(self): for q in multiprocessing.Queue(1), multiprocessing.JoinableQueue(1): q.put("data") - q.shutdown() + if isinstance(q, multiprocessing.JoinableQueue): + join = ProcessFuture.start(self._join_worker, (q,)) + q.shutdown(immediate=False) # [joinable] unfinished tasks: 1 -> 1 _wait() - q.get() - with self.assertRaises( - pyqueue.ShutDown, msg="Didn't appear to shut-down queue" - ): - q.get() - - def test_shutdown_immediate(self): - for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): - q.put("data") - q.shutdown(immediate=True) - _wait() - with self.assertRaises( - pyqueue.ShutDown, msg="Didn't appear to shut-down queue" - ): - q.get() - def test_shutdown_allowed_transitions(self): - # allowed transitions would be from `alive`` via `shutdown` to `shutdown_immediate`` - mod_q = multiprocessing.queues - for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): - self.assertEqual(mod_q._queue_alive, q._shutdown_state.value) + self.assertEqual(q.qsize(), 1) - # default -> immediate=False - q.shutdown() - self.assertEqual(mod_q._queue_shutdown, q._shutdown_state.value) + self.assertEqual(ProcessFuture.run(q.get), "data") - q.shutdown(immediate=True) - self.assertEqual(mod_q._queue_shutdown_immediate, q._shutdown_state.value) - - q.shutdown(immediate=False) - self.assertNotEqual(mod_q._queue_shutdown, q._shutdown_state.value) - - def _shutdown_all_methods_in_one_process(self, immediate): - # part 1: Queue - q = multiprocessing.Queue(2) - q.put("L") - _wait() # Give time to simulate many processes - q.put_nowait("O") - q.shutdown(immediate) - _wait() # simulate time of synchro primitive - - with self.assertRaises(pyqueue.ShutDown): - q.put("E") - with self.assertRaises(pyqueue.ShutDown): - q.put_nowait("W") - if immediate: - with self.assertRaises(pyqueue.ShutDown): - q.get() - with self.assertRaises(pyqueue.ShutDown): - q.get_nowait() - else: - # Neither `task_done`, neither `join`methods` to test - self.assertEqual(q.get(), "L") - self.assertEqual(q.get_nowait(), "O") - _wait() + if isinstance(q, multiprocessing.JoinableQueue): + self.assertTrue(join.process.is_alive()) - # on shutdown(immediate=False) - # when queue is empty, should raise ShutDown Exception - with self.assertRaises(pyqueue.ShutDown): - q.get() # p.get(True) - with self.assertRaises(pyqueue.ShutDown): - q.get_nowait() # q.get(False) - with self.assertRaises(pyqueue.ShutDown): - q.get(True, 1.0) - - # part 2: JoinableQueue - q = multiprocessing.JoinableQueue(2) - q.put("L") - _wait() - q.put_nowait("O") - q.shutdown(immediate) - _wait() + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") - with self.assertRaises(pyqueue.ShutDown): - q.put("E") - with self.assertRaises(pyqueue.ShutDown): - q.put_nowait("W") - if immediate: - with self.assertRaises(pyqueue.ShutDown): - q.get() - with self.assertRaises(pyqueue.ShutDown): + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): q.get_nowait() - with self.assertRaises(pyqueue.ShutDown): + + if isinstance(q, multiprocessing.JoinableQueue): q.task_done() - with self.assertRaises(pyqueue.ShutDown): - q.join() - else: - self.assertEqual(q.get(), "L") - q.task_done() - _wait() - self.assertEqual(q.get(), "O") - q.task_done() + + if isinstance(q, multiprocessing.JoinableQueue): + self.assertFalse(join.process.is_alive()) + join.result() + + def test_shutdown_immediate(self): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + q.put("data") + if isinstance(q, multiprocessing.JoinableQueue): + join = ProcessFuture.start(self._join_worker, (q,)) + q.shutdown(immediate=True) _wait() - q.join() - # when `shutdown` queue is empty, should raise ShutDown Exception - with self.assertRaises(pyqueue.ShutDown): - q.get() # p.get(True) - with self.assertRaises(pyqueue.ShutDown): - q.get_nowait() # p.get(False) - with self.assertRaises(pyqueue.ShutDown): - q.get(True, 1.0) - def test_shutdown_all_methods_in_one_process(self): - return self._shutdown_all_methods_in_one_process(False) + self.assertEqual(q.qsize(), 0) - def test_shutdown_immediate_all_methods_in_one_process(self): - return self._shutdown_all_methods_in_one_process(True) + if isinstance(q, multiprocessing.JoinableQueue): + self.assertFalse(join.process.is_alive()) + join.result() - @classmethod - def _write_msg_process(cls, q, n, results, delay, - i_when_exec_shutdown, - event_start, event_end): - event_start.wait() - for i in range(1, n+1): - try: - q.put((i, "YDLO")) - results.append(True) - except pyqueue.ShutDown: - results.append(False) - # triggers shutdown of queue - if i == i_when_exec_shutdown: - event_end.set() - time.sleep(delay) - # end of all puts - if isinstance(q, type(multiprocessing.JoinableQueue())): - try: - q.join() - except pyqueue.ShutDown: - pass + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") - @classmethod - def _read_msg_process(cls, q, nb, results, delay, event_start): - event_start.wait() - block = True - while nb: - time.sleep(delay) - try: - # Get at least one message - q.get(block) - block = False - if isinstance(q, type(multiprocessing.JoinableQueue())): + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() + + if isinstance(q, multiprocessing.JoinableQueue): + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): q.task_done() - results.append(True) - nb -= 1 - except pyqueue.ShutDown: - results.append(False) - nb -= 1 - except pyqueue.Empty: - pass - # end of all gets - if isinstance(q, type(multiprocessing.JoinableQueue())): - try: - q.join() - except pyqueue.ShutDown: - pass - @classmethod - def _shutdown_process(cls, q, event_end, immediate): - event_end.wait() - q.shutdown(immediate) - if isinstance(q, type(multiprocessing.JoinableQueue())): - try: - q.join() - except pyqueue.ShutDown: - pass + def test_shutdown_immediate_with_unfinished(self): + q = multiprocessing.JoinableQueue() + q.put("data") + q.put("data") + join = ProcessFuture.start(self._join_worker, (q,)) + self.assertEqual(ProcessFuture.run(q.get), "data") + q.shutdown(immediate=True) + _wait() - @classmethod - def _join_process(cls, q, delay, event_start): - event_start.wait() - time.sleep(delay) - try: - q.join() - except pyqueue.ShutDown: - pass + self.assertEqual(q.qsize(), 0) - #@classmethod - def _shutdown_all_methods_in_many_processes(self, immediate): - for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): - ps = [] - ev_start = multiprocessing.Event() - ev_exec_shutdown = multiprocessing.Event() - m = multiprocessing.Manager() - res_puts = m.list() - res_gets = m.list() - delay = 1e-4 - read_process = 4 - nb_msgs = read_process * 16 - nb_msgs_r = nb_msgs // read_process - when_exec_shutdown = nb_msgs // 2 - if isinstance(q, type(multiprocessing.Queue())): - lprocs = ( - (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay, - when_exec_shutdown, - ev_start, ev_exec_shutdown)), - (self._read_msg_process, read_process, (q, nb_msgs_r, - res_gets, delay*2, - ev_start)), - (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)), - ) - else: - # add 2 self._join process processes - lprocs = ( - (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay, - when_exec_shutdown, - ev_start, ev_exec_shutdown)), - (self._read_msg_process, read_process, (q, nb_msgs_r, - res_gets, delay*2, - ev_start)), - (self._join_process, 2, (q, delay*2, ev_start)), - (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)), - ) - # start all processes - for func, n, args in lprocs: - for i in range(n): - ps.append(multiprocessing.Process(target=func, args=args)) - ps[-1].start() - # set event in order to run q.shutdown() - ev_start.set() - _wait() - # wait - if isinstance(q, type(multiprocessing.Queue())): - for p in ps: - p.join() + self.assertTrue(join.process.is_alive()) - if not immediate: - self.assertTrue(q.empty()) - self.assertEqual(res_gets.count(True), res_puts.count(True)) - else: - self.assertTrue(res_gets.count(True) <= res_puts.count(True)) + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") - def test_shutdown_all_methods_in_many_processes(self): - return self._shutdown_all_methods_in_many_processes(False) + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() - def test_shutdown_immediate_all_methods_in_many_processes(self): - return self._shutdown_all_methods_in_many_processes(True) + q.task_done() + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): + q.task_done() + self.assertFalse(join.process.is_alive()) + join.result() # #