Skip to content

Commit 47b5a09

Browse files
committed
go shutdown go
1 parent 39cd911 commit 47b5a09

File tree

2 files changed

+41
-25
lines changed

2 files changed

+41
-25
lines changed

Lib/multiprocessing/queues.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,11 @@ def __init__(self, maxsize=0, *, ctx):
4747
self._wlock = ctx.Lock()
4848
self._sem = ctx.BoundedSemaphore(maxsize)
4949

50-
5150
self._lock_shutdown = ctx.Lock()
5251
# Cannot use a ctx.Value because 'ctypes' library is
5352
# not always available on all Linux platforms.
54-
# Using Semaphores instead of heap.BufferWrapper
55-
# as an array of int is more explicit.
53+
# Use of Semaphores instead of an array from `heap.BufferWrapper'
54+
# is here more efficient and explicit.
5655
self._sem_flag_shutdown = ctx.Semaphore(0)
5756
self._sem_flag_shutdown_immediate = ctx.Semaphore(0)
5857
self._sem_pending_getters = ctx.Semaphore(0)
@@ -64,14 +63,6 @@ def __init__(self, maxsize=0, *, ctx):
6463
if sys.platform != 'win32':
6564
register_after_fork(self, Queue._after_fork)
6665

67-
def _is_shutdown(self):
68-
return not self._sem_flag_shutdown.locked()
69-
70-
def _set_shutdown(self, immediate=False):
71-
self._sem_flag_shutdown.release()
72-
if immediate:
73-
self._sem_flag_shutdown_immediate.release()
74-
7566
def __getstate__(self):
7667
context.assert_spawning(self)
7768
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
@@ -107,6 +98,14 @@ def _reset(self, after_fork=False):
10798
self._recv_bytes = self._reader.recv_bytes
10899
self._poll = self._reader.poll
109100

101+
def _is_shutdown(self):
102+
return not self._sem_flag_shutdown.locked()
103+
104+
def _set_shutdown(self, immediate=False):
105+
self._sem_flag_shutdown.release()
106+
if immediate:
107+
self._sem_flag_shutdown_immediate.release()
108+
110109
@contextmanager
111110
def _handle_pending_processes(self, sem):
112111
# Count pending getter or putter processes in a dedicated
@@ -189,7 +188,7 @@ def get(self, block=True, timeout=None):
189188
if self._is_shutdown() \
190189
and isinstance(item, _ShutdownSentinel):
191190
# A pending getter process is just unblocked,
192-
# we try to unblock a next one if exists.
191+
# Unblock a next one if exists.
193192
self._release_pending_getters()
194193
raise ShutDown
195194

@@ -220,7 +219,7 @@ def _put_sentinel(self):
220219
# When put a sentinel into an empty queue,
221220
# dont forget to call to _sem.acquire in order to
222221
# maintain a correct count of acquire/release
223-
#calls for BoudedSempaphore.
222+
# calls for BoudedSempaphore.
224223
self._sem.acquire()
225224

226225
with self._notempty:
@@ -248,13 +247,13 @@ def shutdown(self, immediate=False):
248247
self._set_shutdown(immediate)
249248

250249
# Shut down is immediatly and there is no pending getter,
251-
# we purge the queue (pipe). If data is into the buffer and
252-
# not into pipe, the 'put' thread should erase remaining data.
250+
# we purge the queue (pipe). If there are datas into the buffer
251+
# the 'feeder' thread should erase all of them.
253252
if immediate and not is_pending_getters:
254253
self._clear()
255254

256255
# Starting release one pending getter process.
257-
# Put a first shutdown sentinel into the pipe.
256+
# Put a first shutdown sentinel data into the pipe.
258257
if self.empty() and is_pending_getters:
259258
self._put_sentinel()
260259

@@ -386,12 +385,12 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
386385
writer_close()
387386
return
388387

389-
# When queue shuts down immediatly, don`t insert
388+
# When queue shuts down immediatly, do not insert
390389
# regular data in pipe, only shutdown sentinel.
391390
if is_shutdown_immediate() \
392391
and not isinstance(obj, _ShutdownSentinel):
393392
debug("Queue shuts down immediatly, " \
394-
"don't feed regular data in pipe")
393+
"don't feed regular data to pipe")
395394
continue
396395

397396
# serialize the data before acquiring the lock
@@ -504,7 +503,8 @@ def _clear(self):
504503
super()._clear()
505504

506505
# Data could be in the buffer, not in the pipe.
507-
# Call acquire until Semaphore counter is zero.
506+
# Call acquire method of '_unfinished_tasks' Semaphore
507+
# until counter is zero.
508508
with self._cond:
509509
while not self._unfinished_tasks.locked():
510510
self._unfinished_tasks.acquire(block=False)

Lib/test/_test_multiprocessing.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,7 +1687,9 @@ def test_queue_shutdown_count_pending_put(self):
16871687

16881688
self._start_processes_pending(n, target=self._pending_put,
16891689
args=(q, b, results))
1690+
# Wait for all procesess start.
16901691
b.wait()
1692+
16911693
# to be sure that queue is full, and all 'n-size' others processes
16921694
# are pending.
16931695
self._wait()
@@ -1719,7 +1721,9 @@ def test_queue_shutdown_immediate_pending_put(self):
17191721
b = self.Barrier(n+1)
17201722
self._start_processes_pending(n, target=self._pending_put,
17211723
args=(q, b, results))
1724+
# Wait for all procesess start.
17221725
b.wait()
1726+
17231727
# We need to call _wait to be sure that queue is full,
17241728
# and all others processes are pending.
17251729
self._wait()
@@ -1757,8 +1761,12 @@ def test_queue_shutdown_count_pending_get(self):
17571761
b = self.Barrier(n+1)
17581762
self._start_processes_pending(n, target=self._pending_get,
17591763
args=(q, b, results))
1764+
# Wait for all procesess start.
17601765
b.wait()
1761-
self._wait() # wait for all pending get processes to be blocked.
1766+
1767+
# wait for all pending get processes to be blocked.
1768+
self._wait()
1769+
17621770
self.assertTrue(q.empty())
17631771
self.assertEqual(q._sem_pending_getters.get_value(), n)
17641772
self.assertEqual(q._sem_pending_putters.get_value(), 0)
@@ -1784,8 +1792,11 @@ def test_queue_shutdown_immediate_pending_get(self):
17841792
b = self.Barrier(n+1)
17851793
self._start_processes_pending(n, target=self._pending_get,
17861794
args=(q, b, results))
1795+
# Wait for all procesess start.
17871796
b.wait()
1788-
self._wait() # wait for all pending get processes to be blocked.
1797+
1798+
# wait for all pending get processes to be blocked.
1799+
self._wait()
17891800

17901801
q.shutdown(immediate=True)
17911802
self.assertTrue(q.empty())
@@ -1814,8 +1825,11 @@ def test_queue_shutdown_pending_get(self):
18141825
b = self.Barrier(n+1)
18151826
self._start_processes_pending(n, target=self._pending_get,
18161827
args=(q, b, results))
1828+
# Wait for all procesess start.
18171829
b.wait()
1818-
self._wait() # wait for all pending get processes to be blocked.
1830+
1831+
# wait for all pending get processes to be blocked.
1832+
self._wait()
18191833

18201834
q.shutdown(immediate=False)
18211835
self.assertTrue(q._is_shutdown())
@@ -1828,7 +1842,7 @@ def test_queue_shutdown_pending_get(self):
18281842
close_queue(q)
18291843

18301844
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
1831-
def queue_shutdown_immediate_purge_buffer(self):
1845+
def queue_shutdown_immediate_check_purge_buffer(self):
18321846
if self.TYPE != 'processes':
18331847
self.skipTest(f'test not appropriate for {self.TYPE}')
18341848

@@ -1850,7 +1864,7 @@ def queue_shutdown_immediate_purge_buffer(self):
18501864
q.join()
18511865

18521866
log_record = stream.getvalue()
1853-
self.assertIn("don't feed regular data in pipe", log_record)
1867+
self.assertIn("don't feed regular data to pipe", log_record)
18541868

18551869
close_queue(q)
18561870
finally:
@@ -1882,7 +1896,9 @@ def _join_joinablequeue(self, immediate):
18821896
return_process = 1000
18831897
p = self.Process(target=self._shutdown,
18841898
args=(q, immediate, results,
1885-
return_process+immediate))
1899+
return_process+immediate
1900+
)
1901+
)
18861902
p.start()
18871903
q.join()
18881904
p.join()

0 commit comments

Comments
 (0)