Skip to content

Commit 75f6067

Browse files
committed
Add multiprocessing queue shutdown
* Include docs
1 parent f508800 commit 75f6067

File tree

3 files changed

+82
-3
lines changed

3 files changed

+82
-3
lines changed

Doc/library/multiprocessing.rst

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,8 @@ For an example of the usage of queues for interprocess communication see
845845
free slot was available within that time. Otherwise (*block* is
846846
``False``), put an item on the queue if a free slot is immediately
847847
available, else raise the :exc:`queue.Full` exception (*timeout* is
848-
ignored in that case).
848+
ignored in that case). Raises :exc:`ShutDown` if the queue has been shut
849+
down.
849850

850851
.. versionchanged:: 3.8
851852
If the queue is closed, :exc:`ValueError` is raised instead of
@@ -863,7 +864,9 @@ For an example of the usage of queues for interprocess communication see
863864
it blocks at most *timeout* seconds and raises the :exc:`queue.Empty`
864865
exception if no item was available within that time. Otherwise (block is
865866
``False``), return an item if one is immediately available, else raise the
866-
:exc:`queue.Empty` exception (*timeout* is ignored in that case).
867+
:exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises
868+
:exc:`queue.ShutDown` if the queue has been shut down and is empty, or if
869+
the queue has been shut down immediately.
867870

868871
.. versionchanged:: 3.8
869872
If the queue is closed, :exc:`ValueError` is raised instead of
@@ -873,6 +876,19 @@ For an example of the usage of queues for interprocess communication see
873876

874877
Equivalent to ``get(False)``.
875878

879+
.. method:: shutdown(immediate=False)
880+
881+
Shut-down the queue, making queue gets and puts raise
882+
:exc:`queue.ShutDown`.
883+
884+
By default, gets will only raise once the queue is empty. Set
885+
*immediate* to true to make gets raise immediately instead.
886+
887+
All blocked callers of put() will be unblocked, and also get()
888+
and join() if *immediate* is true.
889+
890+
.. versionadded:: 3.12
891+
876892
:class:`multiprocessing.Queue` has a few additional methods not found in
877893
:class:`queue.Queue`. These methods are usually unnecessary for most
878894
code:
@@ -962,6 +978,8 @@ For an example of the usage of queues for interprocess communication see
962978
Raises a :exc:`ValueError` if called more times than there were items
963979
placed in the queue.
964980

981+
Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.
982+
965983

966984
.. method:: join()
967985

@@ -973,6 +991,8 @@ For an example of the usage of queues for interprocess communication see
973991
it is complete. When the count of unfinished tasks drops to zero,
974992
:meth:`~queue.Queue.join` unblocks.
975993

994+
Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.
995+
976996

977997
Miscellaneous
978998
~~~~~~~~~~~~~

Lib/multiprocessing/queues.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import types
1818
import weakref
1919
import errno
20+
import ctypes
2021

21-
from queue import Empty, Full
22+
from queue import Empty, Full, ShutDown
2223

2324
import _multiprocessing
2425

@@ -28,6 +29,10 @@
2829

2930
from .util import debug, info, Finalize, register_after_fork, is_exiting
3031

32+
_queue_alive = 0
33+
_queue_shutdown = 1
34+
_queue_shutdown_immediate = 2
35+
3136
#
3237
# Queue type using a pipe, buffer and thread
3338
#
@@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx):
5055
# For use by concurrent.futures
5156
self._ignore_epipe = False
5257
self._reset()
58+
self._shutdown_state = context._default_context.Value(
59+
ctypes.c_uint8, lock=self._rlock
60+
)
5361

5462
if sys.platform != 'win32':
5563
register_after_fork(self, Queue._after_fork)
@@ -86,20 +94,28 @@ def _reset(self, after_fork=False):
8694
def put(self, obj, block=True, timeout=None):
8795
if self._closed:
8896
raise ValueError(f"Queue {self!r} is closed")
97+
if self._shutdown_state.value != _queue_alive:
98+
raise ShutDown
8999
if not self._sem.acquire(block, timeout):
90100
raise Full
91101

92102
with self._notempty:
103+
if self._shutdown_state.value != _queue_alive:
104+
raise ShutDown
93105
if self._thread is None:
94106
self._start_thread()
95107
self._buffer.append(obj)
96108
self._notempty.notify()
97109

98110
def get(self, block=True, timeout=None):
111+
if self._shutdown_state.value == _queue_shutdown_immediate:
112+
raise ShutDown
99113
if self._closed:
100114
raise ValueError(f"Queue {self!r} is closed")
101115
if block and timeout is None:
102116
with self._rlock:
117+
if self._shutdown_state.value != _queue_alive:
118+
raise ShutDown
103119
res = self._recv_bytes()
104120
self._sem.release()
105121
else:
@@ -111,13 +127,19 @@ def get(self, block=True, timeout=None):
111127
if block:
112128
timeout = deadline - time.monotonic()
113129
if not self._poll(timeout):
130+
if self._shutdown_state.value != _queue_alive:
131+
raise ShutDown
114132
raise Empty
133+
if self._shutdown_state.value != _queue_alive :
134+
raise ShutDown
115135
elif not self._poll():
116136
raise Empty
117137
res = self._recv_bytes()
118138
self._sem.release()
119139
finally:
120140
self._rlock.release()
141+
if self._shutdown_state.value == _queue_shutdown:
142+
raise ShutDown
121143
# unserialize the data after having released the lock
122144
return _ForkingPickler.loads(res)
123145

@@ -329,6 +351,8 @@ def task_done(self):
329351

330352
def join(self):
331353
with self._cond:
354+
if self._shutdown_state.value == _queue_shutdown_immediate:
355+
return
332356
if not self._unfinished_tasks._semlock._is_zero():
333357
self._cond.wait()
334358

Lib/test/_test_multiprocessing.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,41 @@ def test_closed_queue_put_get_exceptions(self):
12771277
q.put('foo')
12781278
with self.assertRaisesRegex(ValueError, 'is closed'):
12791279
q.get()
1280+
1281+
def test_shutdown_empty(self):
1282+
q = multiprocessing.Queue()
1283+
q.shutdown()
1284+
try:
1285+
q.put("data")
1286+
self.fail("Didn't appear to shut-down queue")
1287+
except pyqueue.ShutDown:
1288+
pass
1289+
try:
1290+
q.get()
1291+
self.fail("Didn't appear to shut-down queue")
1292+
except pyqueue.ShutDown:
1293+
pass
1294+
1295+
def test_shutdown_nonempty(self):
1296+
q = multiprocessing.Queue()
1297+
q.put("data")
1298+
q.shutdown()
1299+
q.get()
1300+
try:
1301+
q.get()
1302+
self.fail("Didn't appear to shut-down queue")
1303+
except pyqueue.ShutDown:
1304+
pass
1305+
1306+
def test_shutdown_immediate(self):
1307+
q = multiprocessing.Queue()
1308+
q.put("data")
1309+
q.shutdown(immediate=True)
1310+
try:
1311+
q.get()
1312+
self.fail("Didn't appear to shut-down queue")
1313+
except pyqueue.ShutDown:
1314+
pass
12801315
#
12811316
#
12821317
#

0 commit comments

Comments
 (0)