Skip to content

Commit 3dd2415

Browse files
YvesDupEpicWink
authored andcommitted
Fix queue shutdown
* Remove ctypes import (use string identifiers for value types) * Use queue's context to get shared value for queue state * Include queue state in pickle * Factor out queue-state checks and updates to methods * Logic fixes in put and get * Move shutdown method to before close * Raise when shutting down closed queue * Don't re-notify if immediately shutting down a queue already immediately shut-down * Support shutdown in JoinableQueue * Handle in task_done and join * Logic fixes in put and shutdown * Updated tests * Document feature added in 3.13
1 parent 75f6067 commit 3dd2415

File tree

3 files changed

+315
-45
lines changed

3 files changed

+315
-45
lines changed

Doc/library/multiprocessing.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ For an example of the usage of queues for interprocess communication see
887887
All blocked callers of put() will be unblocked, and also get()
888888
and join() if *immediate* is true.
889889

890-
.. versionadded:: 3.12
890+
.. versionadded:: 3.13
891891

892892
:class:`multiprocessing.Queue` has a few additional methods not found in
893893
:class:`queue.Queue`. These methods are usually unnecessary for most

Lib/multiprocessing/queues.py

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import types
1818
import weakref
1919
import errno
20-
import ctypes
2120

2221
from queue import Empty, Full, ShutDown
2322

@@ -55,21 +54,21 @@ def __init__(self, maxsize=0, *, ctx):
5554
# For use by concurrent.futures
5655
self._ignore_epipe = False
5756
self._reset()
58-
self._shutdown_state = context._default_context.Value(
59-
ctypes.c_uint8, lock=self._rlock
60-
)
57+
self._shutdown_state = ctx.Value('i', _queue_alive)
6158

6259
if sys.platform != 'win32':
6360
register_after_fork(self, Queue._after_fork)
6461

6562
def __getstate__(self):
6663
context.assert_spawning(self)
6764
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
68-
self._rlock, self._wlock, self._sem, self._opid)
65+
self._rlock, self._wlock, self._sem, self._opid,
66+
self._shutdown_state)
6967

7068
def __setstate__(self, state):
7169
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
72-
self._rlock, self._wlock, self._sem, self._opid) = state
70+
self._rlock, self._wlock, self._sem, self._opid,
71+
self._shutdown_state) = state
7372
self._reset()
7473

7574
def _after_fork(self):
@@ -91,55 +90,77 @@ def _reset(self, after_fork=False):
9190
self._recv_bytes = self._reader.recv_bytes
9291
self._poll = self._reader.poll
9392

93+
def _is_alive(self):
94+
return self._shutdown_state.value == _queue_alive
95+
96+
def _is_shutdown(self):
97+
return self._shutdown_state.value == _queue_shutdown
98+
99+
def _is_shutdown_immediate(self):
100+
return self._shutdown_state.value == _queue_shutdown_immediate
101+
102+
def _set_shutdown(self):
103+
self._shutdown_state.value = _queue_shutdown
104+
105+
def _set_shutdown_immediate(self):
106+
self._shutdown_state.value = _queue_shutdown_immediate
107+
94108
def put(self, obj, block=True, timeout=None):
95109
if self._closed:
96110
raise ValueError(f"Queue {self!r} is closed")
97-
if self._shutdown_state.value != _queue_alive:
111+
if not self._is_alive():
98112
raise ShutDown
99113
if not self._sem.acquire(block, timeout):
114+
if not self._is_alive():
115+
raise ShutDown
100116
raise Full
101117

102118
with self._notempty:
103-
if self._shutdown_state.value != _queue_alive:
104-
raise ShutDown
105119
if self._thread is None:
106120
self._start_thread()
107121
self._buffer.append(obj)
108122
self._notempty.notify()
109123

110124
def get(self, block=True, timeout=None):
111-
if self._shutdown_state.value == _queue_shutdown_immediate:
112-
raise ShutDown
113125
if self._closed:
114126
raise ValueError(f"Queue {self!r} is closed")
115127
if block and timeout is None:
116128
with self._rlock:
117-
if self._shutdown_state.value != _queue_alive:
129+
# checks shutdown state
130+
if (self._is_shutdown_immediate()
131+
or (self._is_shutdown() and self.empty())):
118132
raise ShutDown
119133
res = self._recv_bytes()
120134
self._sem.release()
121135
else:
122136
if block:
123137
deadline = time.monotonic() + timeout
124138
if not self._rlock.acquire(block, timeout):
139+
if (self._is_shutdown_immediate()
140+
or (self._is_shutdown() and self.empty())):
141+
raise ShutDown
125142
raise Empty
126143
try:
127144
if block:
128145
timeout = deadline - time.monotonic()
129146
if not self._poll(timeout):
130-
if self._shutdown_state.value != _queue_alive:
147+
if not self._is_alive():
131148
raise ShutDown
132149
raise Empty
133-
if self._shutdown_state.value != _queue_alive :
134-
raise ShutDown
135150
elif not self._poll():
151+
if not self._is_alive():
152+
raise ShutDown
136153
raise Empty
154+
155+
# here queue is not empty
156+
if self._is_shutdown_immediate():
157+
raise ShutDown
158+
# here shutdown state queue is alive or shutdown
137159
res = self._recv_bytes()
138160
self._sem.release()
139161
finally:
140162
self._rlock.release()
141-
if self._shutdown_state.value == _queue_shutdown:
142-
raise ShutDown
163+
143164
# unserialize the data after having released the lock
144165
return _ForkingPickler.loads(res)
145166

@@ -159,6 +180,19 @@ def get_nowait(self):
159180
def put_nowait(self, obj):
160181
return self.put(obj, False)
161182

183+
def shutdown(self, immediate=False):
184+
if self._closed:
185+
raise ValueError(f"Queue {self!r} is closed")
186+
with self._shutdown_state.get_lock():
187+
if self._is_shutdown_immediate():
188+
return
189+
if immediate:
190+
self._set_shutdown_immediate()
191+
with self._notempty:
192+
self._notempty.notify_all()
193+
else:
194+
self._set_shutdown()
195+
162196
def close(self):
163197
self._closed = True
164198
close = self._close
@@ -332,7 +366,11 @@ def __setstate__(self, state):
332366
def put(self, obj, block=True, timeout=None):
333367
if self._closed:
334368
raise ValueError(f"Queue {self!r} is closed")
369+
if not self._is_alive():
370+
raise ShutDown
335371
if not self._sem.acquire(block, timeout):
372+
if not self._is_alive():
373+
raise ShutDown
336374
raise Full
337375

338376
with self._notempty, self._cond:
@@ -344,17 +382,28 @@ def put(self, obj, block=True, timeout=None):
344382

345383
def task_done(self):
346384
with self._cond:
385+
if self._is_shutdown_immediate():
386+
raise ShutDown
347387
if not self._unfinished_tasks.acquire(False):
348388
raise ValueError('task_done() called too many times')
349389
if self._unfinished_tasks._semlock._is_zero():
350390
self._cond.notify_all()
351391

352392
def join(self):
353393
with self._cond:
354-
if self._shutdown_state.value == _queue_shutdown_immediate:
355-
return
394+
if self._is_shutdown_immediate():
395+
raise ShutDown
356396
if not self._unfinished_tasks._semlock._is_zero():
357397
self._cond.wait()
398+
if self._is_shutdown_immediate():
399+
raise ShutDown
400+
401+
def shutdown(self, immediate=False):
402+
with self._cond:
403+
is_alive = self._is_alive()
404+
super().shutdown(immediate)
405+
if is_alive:
406+
self._cond.notify_all()
358407

359408
#
360409
# Simplified Queue type -- really just a locked pipe

0 commit comments

Comments
 (0)