Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,41 +328,70 @@ def wait_for(self, predicate, timeout=None):
class Event(object):

def __init__(self, *, ctx):
self._cond = ctx.Condition(ctx.Lock())
self._flag = ctx.Semaphore(0)
self._flag = ctx.Value('i', 0)
# Allocate a ctypes.c_ulonglong to hold the set_id:
# Represents the C unsigned long long datatype.
# The constructor accepts an optional integer initializer; no overflow checking is done.
# From https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# See multiprocessing/sharedctypes.py for typecode to ctypes definitions
self._set_id = ctx.Value('Q', 0)

def is_set(self):
with self._cond:
if self._flag.acquire(False):
self._flag.release()
return True
return False
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value :
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value.
with self._flag:
return self._flag.value == 1

def set(self):
with self._cond:
self._flag.acquire(False)
self._flag.release()
self._cond.notify_all()
# If `set` is called from a signal handler, this is fine as the lock is recursive (i.e. it won't deadlock).
# If the thread interrupted by the signal handler is wait()-ing and the signal handler calls set(),
# this is fine as wait() spins on the value. Thus, after the signal handler is done, the thread will
# return from wait()
# Fixes https://github.com/python/cpython/issues/85772
with self._flag:
with self._set_id:
if self._flag.value == 0:
# There is a theoretical chance of race here. It requires the following conditions:
# The interrupted thread must be wait()ing.
# Then set must be called reentrant for the maximum value of c_ulonglong times,
# and all interruptions must happen exactly after `if self._flag.value == 0:`.
# The _set_id value will then wrap around. Then clear() must be called
# before the original wait() code continues. The wait() code will then continue
# to (incorrectly) wait. I think this case is safe to ignore. The stack
# will grow too large before there is any chance of this actually happening.

self._flag.value = 1
self._set_id.value += 1
# There is no race here by reentrant set when reaching the maximum value for `self._set_id.value`.
# ctypes.c_ulonglong will overflow without any exception:
# https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# > no overflow checking is done.
# This means that we do not need to check if some maximum value is reached:
# C will wrap around the value for us.

def clear(self):
with self._cond:
self._flag.acquire(False)
with self._flag:
self._flag.value = 0

def wait(self, timeout=None):
with self._cond:
if self._flag.acquire(False):
self._flag.release()
else:
self._cond.wait(timeout)

if self._flag.acquire(False):
self._flag.release()
start_time = time.monotonic()
set_id = self._set_id.value
while True:
if self._flag.value == 1:
return True
return False
elif set_id != self._set_id.value:
return True # flag is unset, but set_id changed, so there must have been a `set` followed by a `clear`
# during `time.sleep()`. Fixes https://github.com/python/cpython/issues/95826
elif timeout is not None and (time.monotonic() - start_time) > timeout:
return False
else:
# Fixes https://github.com/python/cpython/issues/85772 by spinning and sleeping.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iff (lets not assume we should go this route - see my comment on the issue) we're going to abandon use of the native OS platform APIs which properly implement timeouts on inter-process semaphores without a busy loop via _multiprocessing.SemLock in Modules/_multiprocessing/semaphore.c, sleeping in the loop should be done in an exponential back-off fashion as was the case even in threading itself before we started using proper OS APIs there. See https://github.com/python/cpython/blob/v3.1.3/Lib/threading.py#L227 for the old exponential back-off delay example.

Doing a busy loop with sleeps as a low level primitive in 2024 feels very wrong to me. They've always been really unfriendly to both latency due to unnecessary delays and power usage from frequent unnecessary wakes.

I suggested an alternate idea in #126434.

History

Prior to CPython 3.2 threading.Condition was implemented with a back-off in a similar manner. 3.2 improved on that old hack by using the OS APIs for lock timeouts in 7c3e577

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gpshead and @ZeroIntensity

I will read your comments and suggestions thoroughly tomorrow, and come back to you.

Thanks and kind regards.

time.sleep(0.010) # sleep 10 milliseconds

def __repr__(self) -> str:
set_status = 'set' if self.is_set() else 'unset'
return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"

#
# Barrier
#
Expand Down
29 changes: 29 additions & 0 deletions Lib/test/multiprocessingdata/is_set_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import multiprocessing
import os
import signal
import concurrent.futures
import time


def send_sigint(pid):
time.sleep(1)
os.kill(pid, signal.SIGINT)


def run_signal_handler_set_is_set_test():
shutdown_event = multiprocessing.Event()

def sigterm_handler(_signo, _stack_frame):
shutdown_event.set()

signal.signal(signal.SIGINT, sigterm_handler)

with concurrent.futures.ProcessPoolExecutor() as executor:
f = executor.submit(send_sigint, os.getpid())
while not shutdown_event.is_set():
pass
f.result()


if __name__ == '__main__':
run_signal_handler_set_is_set_test()
36 changes: 36 additions & 0 deletions Lib/test/multiprocessingdata/set_clear_race.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import multiprocessing
import sys


# Reproduction code copied and modified from https://github.com/python/cpython/issues/95826
# Fixes the issue above

class SimpleRepro:
def __init__(self):
self.heartbeat_event = multiprocessing.Event()
self.shutdown_event = multiprocessing.Event()
self.child_proc = multiprocessing.Process(target=self.child_process, daemon=True)
self.child_proc.start()

def child_process(self):
while True:
if self.shutdown_event.is_set():
return
self.heartbeat_event.set()
self.heartbeat_event.clear()

def test_heartbeat(self):
exit_code = 0
for i in range(2000):
success = self.heartbeat_event.wait(100)
if not success:
exit_code = 1
break
self.shutdown_event.set()
self.child_proc.join()
sys.exit(exit_code)


if __name__ == '__main__':
foo = SimpleRepro()
foo.test_heartbeat()
30 changes: 30 additions & 0 deletions Lib/test/multiprocessingdata/wait_set_no_deadlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import multiprocessing
import signal
import concurrent.futures
import time
import os


# Shows that https://github.com/python/cpython/issues/85772 is fixed

def send_sigint(pid):
time.sleep(1) # Make sure shutdown_event.wait() is called
os.kill(pid, signal.SIGINT)


def run_signal_handler_wait_set_test():
shutdown_event = multiprocessing.Event()

def sigterm_handler(_signo, _stack_frame):
shutdown_event.set()

signal.signal(signal.SIGINT, sigterm_handler)

with concurrent.futures.ProcessPoolExecutor() as executor:
f = executor.submit(send_sigint, os.getpid())
shutdown_event.wait()
f.result()


if __name__ == '__main__':
run_signal_handler_wait_set_test()
5 changes: 5 additions & 0 deletions Lib/test/test_multiprocessing_event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import os.path
from test import support

def load_tests(*args):
return support.load_package_tests(os.path.dirname(__file__), *args)
73 changes: 73 additions & 0 deletions Lib/test/test_multiprocessing_event/test_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import unittest
from test import support
import sys
import signal
import os


try:
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
_have_multiprocessing = True
except (NotImplementedError, ModuleNotFoundError):
_have_multiprocessing = False


@unittest.skipUnless(_have_multiprocessing,
"requires multiprocessing")
@unittest.skipUnless(hasattr(signal, 'signal'),
"Requires signal.signal")
@unittest.skipUnless(hasattr(signal, 'SIGINT'),
"Requires signal.SIGINT")
@unittest.skipUnless(hasattr(os, 'kill'),
"Requires os.kill")
@unittest.skipUnless(hasattr(os, 'getppid'),
"Requires os.getppid")
@support.requires_subprocess()
class TestEventSignalHandling(unittest.TestCase):
def test_no_race_for_is_set_set(self):
import subprocess
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this import up top, the module is always importable regardless of whether or not it actually works on the platform which requires_subprocess tells you.

script = support.findfile("is_set_set.py", subdir="multiprocessingdata")
for x in range(10):
try:
assert 0 == subprocess.call([sys.executable, script], timeout=60)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use test.support.assert_python_ok or assert_python_failure rather than launching sys.executable yourself. child interpreters may need flags passed, that takes care of it.

except subprocess.TimeoutExpired:
assert False, 'subprocess.Timeoutexpired for is_set_set.py'

def test_no_race_set_clear(self):
import subprocess
script = support.findfile("set_clear_race.py", subdir="multiprocessingdata")
assert 0 == subprocess.call([sys.executable, script])

def test_wait_set_no_deadlock(self):
import subprocess
script = support.findfile("wait_set_no_deadlock.py", subdir="multiprocessingdata")
assert 0 == subprocess.call([sys.executable, script])

def test_wait_timeout(self):
event = multiprocessing.Event()
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Event
# multiprocessing.Event: A clone of threading.Event.

# threading.Event: https://docs.python.org/3/library/threading.html#threading.Event

# threading.Event.wait(): https://docs.python.org/3/library/threading.html#threading.Event.wait
# Block as long as the internal flag is false and the timeout, if given, has not expired.
# The return value represents the reason that this blocking method returned;
# True if returning because the internal flag is set to true, or
# False if a timeout is given and the internal flag did not become true within the given wait time.

# When the timeout argument is present and not None, it should be a floating-point number
# specifying a timeout for the operation in seconds, or fractions thereof.

# wait() supports both integer and float:
flag_set = event.wait(1)
assert flag_set == False

flag_set = event.wait(0.1)
assert flag_set == False


if __name__ == '__main__':
unittest.main()

2 changes: 2 additions & 0 deletions Makefile.pre.in
Original file line number Diff line number Diff line change
Expand Up @@ -2449,6 +2449,7 @@ TESTSUBDIRS= idlelib/idle_test \
test/leakers \
test/libregrtest \
test/mathdata \
test/multiprocessingdata \
test/regrtestdata \
test/regrtestdata/import_from_tests \
test/regrtestdata/import_from_tests/test_regrtest_b \
Expand Down Expand Up @@ -2519,6 +2520,7 @@ TESTSUBDIRS= idlelib/idle_test \
test/test_interpreters \
test/test_json \
test/test_module \
test/test_multiprocessing_event \
test/test_multiprocessing_fork \
test/test_multiprocessing_forkserver \
test/test_multiprocessing_spawn \
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs rewriting to describe the current approach. Also, please update the PR title and description.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've updated the description now.

What do you think @gpshead ?

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
All of :mod:`multiprocessing` ``Event`` is now reentrant and thread safe, and can thus be used from signal handlers.
Loading