Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ For an example of the usage of queues for interprocess communication see
All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.12
.. versionadded:: 3.13

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
Expand Down
87 changes: 68 additions & 19 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import types
import weakref
import errno
import ctypes

from queue import Empty, Full, ShutDown

Expand Down Expand Up @@ -55,21 +54,21 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._shutdown_state = context._default_context.Value(
ctypes.c_uint8, lock=self._rlock
)
self._shutdown_state = ctx.Value('i', _queue_alive)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state) = state
self._reset()

def _after_fork(self):
Expand All @@ -91,55 +90,77 @@ def _reset(self, after_fork=False):
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def _is_alive(self):
return self._shutdown_state.value == _queue_alive

def _is_shutdown(self):
return self._shutdown_state.value == _queue_shutdown

def _is_shutdown_immediate(self):
return self._shutdown_state.value == _queue_shutdown_immediate

def _set_shutdown(self):
self._shutdown_state.value = _queue_shutdown

def _set_shutdown_immediate(self):
self._shutdown_state.value = _queue_shutdown_immediate

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._shutdown_state.value != _queue_alive:
if not self._is_alive():
raise ShutDown
if not self._sem.acquire(block, timeout):
if not self._is_alive():
raise ShutDown
raise Full

with self._notempty:
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def get(self, block=True, timeout=None):
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
if self._shutdown_state.value != _queue_alive:
# checks shutdown state
if (self._is_shutdown_immediate()
or (self._is_shutdown() and self.empty())):
raise ShutDown
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
if (self._is_shutdown_immediate()
or (self._is_shutdown() and self.empty())):
raise ShutDown
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if self._shutdown_state.value != _queue_alive:
if not self._is_alive():
raise ShutDown
raise Empty
if self._shutdown_state.value != _queue_alive :
raise ShutDown
elif not self._poll():
if not self._is_alive():
raise ShutDown
raise Empty

# here queue is not empty
if self._is_shutdown_immediate():
raise ShutDown
# here shutdown state queue is alive or shutdown
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
if self._shutdown_state.value == _queue_shutdown:
raise ShutDown

# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand All @@ -159,6 +180,19 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
with self._shutdown_state.get_lock():
if self._is_shutdown_immediate():
return
if immediate:
self._set_shutdown_immediate()
with self._notempty:
self._notempty.notify_all()
else:
self._set_shutdown()

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -332,7 +366,11 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._is_alive():
raise ShutDown
if not self._sem.acquire(block, timeout):
if not self._is_alive():
raise ShutDown
raise Full

with self._notempty, self._cond:
Expand All @@ -344,17 +382,28 @@ def put(self, obj, block=True, timeout=None):

def task_done(self):
with self._cond:
if self._is_shutdown_immediate():
raise ShutDown
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if self._shutdown_state.value == _queue_shutdown_immediate:
return
if self._is_shutdown_immediate():
raise ShutDown
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
if self._is_shutdown_immediate():
raise ShutDown

def shutdown(self, immediate=False):
with self._cond:
is_alive = self._is_alive()
super().shutdown(immediate)
if is_alive:
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
Expand Down
Loading