From ad5d9c679010f179937b5229fcdb5f4185d863d4 Mon Sep 17 00:00:00 2001 From: Manjusaka Date: Wed, 16 Oct 2024 19:57:03 +0800 Subject: [PATCH 1/4] gh-123899: Add timeout argument for notify_all/notify method in Condition class in threading/multiprocessing module --- Lib/multiprocessing/synchronize.py | 17 ++++++++--------- Lib/threading.py | 17 ++++++++++------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 3ccbfe311c71f3..c7c177709937f7 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -274,7 +274,7 @@ def wait(self, timeout=None): for i in range(count): self._lock.acquire() - def notify(self, n=1): + def notify(self, n=1, timeout=None): assert self._lock._semlock._is_mine(), 'lock is not owned' assert not self._wait_semaphore.acquire( False), ('notify: Should not have been able to acquire ' @@ -282,26 +282,25 @@ def notify(self, n=1): # to take account of timeouts since last notify*() we subtract # woken_count from sleeping_count and rezero woken_count - while self._woken_count.acquire(False): - res = self._sleeping_count.acquire(False) + while self._woken_count.acquire(False, timeout=timeout): + res = self._sleeping_count.acquire(False, timeout=timeout) assert res, ('notify: Bug in sleeping_count.acquire' + '- res should not be False') sleepers = 0 - while sleepers < n and self._sleeping_count.acquire(False): + while sleepers < n and self._sleeping_count.acquire(False, timeout=timeout): self._wait_semaphore.release() # wake up one sleeper sleepers += 1 if sleepers: for i in range(sleepers): - self._woken_count.acquire() # wait for a sleeper to wake - + self._woken_count.acquire(timeout=timeout) # wait for a sleeper to wake # rezero wait_semaphore in case some timeouts just happened - while self._wait_semaphore.acquire(False): + while self._wait_semaphore.acquire(False, timeout=timeout): pass - def notify_all(self): - self.notify(n=sys.maxsize) + def notify_all(self, timeout=None): + self.notify(n=sys.maxsize, timeout=timeout) def wait_for(self, predicate, timeout=None): result = predicate() diff --git a/Lib/threading.py b/Lib/threading.py index 94ea2f08178369..67d23f5a9df13e 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -395,7 +395,7 @@ def wait_for(self, predicate, timeout=None): result = predicate() return result - def notify(self, n=1): + def notify(self, n=1, timeout=None): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is @@ -411,7 +411,10 @@ def notify(self, n=1): while waiters and n > 0: waiter = waiters[0] try: - waiter.release() + if timeout: + waiter.release(timeout) + else: + waiter.release() except RuntimeError: # gh-92530: The previous call of notify() released the lock, # but was interrupted before removing it from the queue. @@ -425,14 +428,14 @@ def notify(self, n=1): except ValueError: pass - def notify_all(self): + def notify_all(self, timeout=None): """Wake up all threads waiting on this condition. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. """ - self.notify(len(self._waiters)) + self.notify(len(self._waiters), timeout=timeout) def notifyAll(self): """Wake up all threads waiting on this condition. @@ -723,7 +726,7 @@ def wait(self, timeout=None): try: if index + 1 == self._parties: # We release the barrier - self._release() + self._release(timeout=timeout) else: # We wait until someone releases us self._wait(timeout) @@ -746,13 +749,13 @@ def _enter(self): # Optionally run the 'action' and release the threads waiting # in the barrier. - def _release(self): + def _release(self, timeout=None): try: if self._action: self._action() # enter draining state self._state = 1 - self._cond.notify_all() + self._cond.notify_all(timeout=timeout) except: #an exception during the _action handler. Break and reraise self._break() From 51161c20c5d79411ff6f926f2b2543ad97a5e0b9 Mon Sep 17 00:00:00 2001 From: Manjusaka Date: Wed, 16 Oct 2024 19:58:24 +0800 Subject: [PATCH 2/4] add news --- .../next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst diff --git a/Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst b/Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst new file mode 100644 index 00000000000000..448210a6e34dca --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst @@ -0,0 +1,2 @@ +Add timeout argument for notify_all/notify method in Condition class in +threading/multiprocessing module From 3214d981c4cd509386887423ade5eaaf1f8095d9 Mon Sep 17 00:00:00 2001 From: Manjusaka Date: Wed, 16 Oct 2024 20:00:08 +0800 Subject: [PATCH 3/4] add docs --- Doc/library/threading.rst | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index cb82fea377697b..78971956a911e1 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -834,7 +834,7 @@ item to the buffer only needs to wake up one consumer thread. .. versionadded:: 3.2 - .. method:: notify(n=1) + .. method:: notify(n=1, timeout=None) By default, wake up one thread waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a @@ -852,7 +852,10 @@ item to the buffer only needs to wake up one consumer thread. call until it can reacquire the lock. Since :meth:`notify` does not release the lock, its caller should. - .. method:: notify_all() + .. versionchanged:: 3.14 + The *timeout* parameter is new. + + .. method:: notify_all(timeout=None) Wake up all threads waiting on this condition. This method acts like :meth:`notify`, but wakes up all waiting threads instead of one. If the @@ -861,6 +864,9 @@ item to the buffer only needs to wake up one consumer thread. The method ``notifyAll`` is a deprecated alias for this method. + .. versionchanged:: 3.14 + The *timeout* parameter is new. + .. _semaphore-objects: From cfb5397888b2026bbd20639e26356b6c05e08a2a Mon Sep 17 00:00:00 2001 From: Manjusaka Date: Wed, 16 Oct 2024 21:13:01 +0800 Subject: [PATCH 4/4] fix test --- Lib/threading.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Lib/threading.py b/Lib/threading.py index 67d23f5a9df13e..ab4e863c141ffd 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -411,10 +411,7 @@ def notify(self, n=1, timeout=None): while waiters and n > 0: waiter = waiters[0] try: - if timeout: - waiter.release(timeout) - else: - waiter.release() + waiter.release() except RuntimeError: # gh-92530: The previous call of notify() released the lock, # but was interrupted before removing it from the queue.