diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index cb82fea377697b..78971956a911e1 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -834,7 +834,7 @@ item to the buffer only needs to wake up one consumer thread. .. versionadded:: 3.2 - .. method:: notify(n=1) + .. method:: notify(n=1, timeout=None) By default, wake up one thread waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a @@ -852,7 +852,10 @@ item to the buffer only needs to wake up one consumer thread. call until it can reacquire the lock. Since :meth:`notify` does not release the lock, its caller should. - .. method:: notify_all() + .. versionchanged:: 3.14 + The *timeout* parameter is new. + + .. method:: notify_all(timeout=None) Wake up all threads waiting on this condition. This method acts like :meth:`notify`, but wakes up all waiting threads instead of one. If the @@ -861,6 +864,9 @@ item to the buffer only needs to wake up one consumer thread. The method ``notifyAll`` is a deprecated alias for this method. + .. versionchanged:: 3.14 + The *timeout* parameter is new. + .. _semaphore-objects: diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 3ccbfe311c71f3..c7c177709937f7 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -274,7 +274,7 @@ def wait(self, timeout=None): for i in range(count): self._lock.acquire() - def notify(self, n=1): + def notify(self, n=1, timeout=None): assert self._lock._semlock._is_mine(), 'lock is not owned' assert not self._wait_semaphore.acquire( False), ('notify: Should not have been able to acquire ' @@ -282,26 +282,25 @@ def notify(self, n=1): # to take account of timeouts since last notify*() we subtract # woken_count from sleeping_count and rezero woken_count - while self._woken_count.acquire(False): - res = self._sleeping_count.acquire(False) + while self._woken_count.acquire(False, timeout=timeout): + res = self._sleeping_count.acquire(False, timeout=timeout) assert res, ('notify: Bug in sleeping_count.acquire' + '- res should not be False') sleepers = 0 - while sleepers < n and self._sleeping_count.acquire(False): + while sleepers < n and self._sleeping_count.acquire(False, timeout=timeout): self._wait_semaphore.release() # wake up one sleeper sleepers += 1 if sleepers: for i in range(sleepers): - self._woken_count.acquire() # wait for a sleeper to wake - + self._woken_count.acquire(timeout=timeout) # wait for a sleeper to wake # rezero wait_semaphore in case some timeouts just happened - while self._wait_semaphore.acquire(False): + while self._wait_semaphore.acquire(False, timeout=timeout): pass - def notify_all(self): - self.notify(n=sys.maxsize) + def notify_all(self, timeout=None): + self.notify(n=sys.maxsize, timeout=timeout) def wait_for(self, predicate, timeout=None): result = predicate() diff --git a/Lib/threading.py b/Lib/threading.py index 94ea2f08178369..ab4e863c141ffd 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -395,7 +395,7 @@ def wait_for(self, predicate, timeout=None): result = predicate() return result - def notify(self, n=1): + def notify(self, n=1, timeout=None): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is @@ -425,14 +425,14 @@ def notify(self, n=1): except ValueError: pass - def notify_all(self): + def notify_all(self, timeout=None): """Wake up all threads waiting on this condition. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. """ - self.notify(len(self._waiters)) + self.notify(len(self._waiters), timeout=timeout) def notifyAll(self): """Wake up all threads waiting on this condition. @@ -723,7 +723,7 @@ def wait(self, timeout=None): try: if index + 1 == self._parties: # We release the barrier - self._release() + self._release(timeout=timeout) else: # We wait until someone releases us self._wait(timeout) @@ -746,13 +746,13 @@ def _enter(self): # Optionally run the 'action' and release the threads waiting # in the barrier. - def _release(self): + def _release(self, timeout=None): try: if self._action: self._action() # enter draining state self._state = 1 - self._cond.notify_all() + self._cond.notify_all(timeout=timeout) except: #an exception during the _action handler. Break and reraise self._break() diff --git a/Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst b/Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst new file mode 100644 index 00000000000000..448210a6e34dca --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-16-19-58-13.gh-issue-123899.VfXZse.rst @@ -0,0 +1,2 @@ +Add timeout argument for notify_all/notify method in Condition class in +threading/multiprocessing module