Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

__all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
]
]

import threading
import sys
Expand Down Expand Up @@ -157,7 +157,7 @@ def __repr__(self):
except Exception:
value = 'unknown'
return '<%s(value=%s, maxvalue=%s)>' % \
(self.__class__.__name__, value, self._semlock.maxvalue)
(self.__class__.__name__, value, self._semlock.maxvalue)

#
# Non-recursive lock
Expand Down Expand Up @@ -253,7 +253,7 @@ def __repr__(self):

def wait(self, timeout=None):
assert self._lock._semlock._is_mine(), \
'must acquire() condition before using wait()'
'must acquire() condition before using wait()'

# indicate that this thread is going to sleep
self._sleeping_count.release()
Expand Down
105 changes: 99 additions & 6 deletions Lib/signal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import _signal
import sys
from _signal import *
from enum import IntEnum as _IntEnum
import threading
import queue
import traceback

_globals = globals()

Expand Down Expand Up @@ -42,6 +46,74 @@ def _enum_to_int(value):
except (ValueError, TypeError):
return value

_signal_queue = queue.SimpleQueue() # SimpleQueue has reentrant put, so it can safely be called from signal handlers. https://github.com/python/cpython/issues/59181
_sys_exit_queue = queue.SimpleQueue()
_signal_thread = None
_signo_to_handler = {}

def _init_signal_thread():
assert threading.current_thread() is threading.main_thread()
global _signal_thread
if _signal_thread is None:
_signal_thread = threading.Thread(target=_signal_queue_handler, daemon=True)
_signal_thread.name = 'SignalHandlerThread'
_signal_thread.start()

def _push_signal_to_queue_handler(signo, _stack_frame):
assert threading.current_thread() is threading.main_thread()
global _signal_queue, _sys_exit_queue
try:
exit_code = _sys_exit_queue.get(block=False)
sys.exit(exit_code)
except queue.Empty:
_signal_queue.put(signo)

def _sigint_to_str(signo):
for x in valid_signals():
if x == signo:
return x.name
raise RuntimeError('Could not find signal name')

def _log_missing_signal_handler(signo):
import logging
logger = logging.getLogger(__name__)
str_name = ''
for x in valid_signals():
if x == signo:
str_name = x.name
logger.warning('Handler for signal.%s (%d) was not found.', str_name, signo)

def stop_signal_thread():
global _signal_thread, _signal_queue
if _signal_thread is not None:
_signal_queue.put('STOP_SIGNAL_HANDLER')
_signal_thread.join()
_signal_thread = None

def _signal_queue_handler():
assert threading.current_thread() is not threading.main_thread()
global _signal_queue, _signo_to_handler
while True:
signo = _signal_queue.get()
if signo == 'STOP_SIGNAL_HANDLER':
break
raise_systemexit = False
exitcode = 'NOTSET'
try:
handler = _signo_to_handler.get(signo, None)
if handler is not None:
handler(signo, None)
else:
_log_missing_signal_handler(signo)
except SystemExit as se:
exitcode = se.code
raise_systemexit = True
except Exception:
traceback.print_exc()
if raise_systemexit:
global _sys_exit_queue
_sys_exit_queue.put(exitcode)
raise_signal(signo)

# Similar to functools.wraps(), but only assign __doc__.
# __module__ should be preserved,
Expand All @@ -53,16 +125,37 @@ def decorator(wrapper):
return wrapper
return decorator

@_wraps(_signal.signal)
def signal(signalnum, handler):
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
return _int_to_enum(handler, Handlers)
def signal(signalnum, handler, use_dedicated_thread=True):
if use_dedicated_thread:
assert threading.current_thread() is threading.main_thread()
global _signo_to_handler
signal_int = _enum_to_int(signalnum)
old_handler = _signo_to_handler.get(signal_int, None)
if use_dedicated_thread and callable(handler):
assert callable(handler)
global _signal_thread
if _signal_thread is None:
_init_signal_thread()
_signo_to_handler[signal_int] = handler
handler = _signal.signal(signal_int, _enum_to_int(_push_signal_to_queue_handler))
return old_handler or _int_to_enum(handler, Handlers)
else:
if signal_int in _signo_to_handler:
del _signo_to_handler[signal_int]
if 0 == len(_signo_to_handler):
stop_signal_thread()
handler = _signal.signal(signal_int, _enum_to_int(handler))
return old_handler or _int_to_enum(handler, Handlers)


@_wraps(_signal.getsignal)
def getsignal(signalnum):
handler = _signal.getsignal(signalnum)
return _int_to_enum(handler, Handlers)
global _signo_to_handler
if signalnum in _signo_to_handler:
return _signo_to_handler[signalnum]
else:
handler = _signal.getsignal(signalnum)
return _int_to_enum(handler, Handlers)


if 'pthread_sigmask' in _globals:
Expand Down
29 changes: 29 additions & 0 deletions Lib/test/multiprocessingdata/is_set_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import multiprocessing
import os
import signal
import concurrent.futures
import time


def send_sigint(pid):
time.sleep(1)
os.kill(pid, signal.SIGINT)


def run_signal_handler_set_is_set_test():
shutdown_event = multiprocessing.Event()

def sigterm_handler(_signo, _stack_frame):
shutdown_event.set()

signal.signal(signal.SIGINT, sigterm_handler)

with concurrent.futures.ProcessPoolExecutor() as executor:
f = executor.submit(send_sigint, os.getpid())
while not shutdown_event.is_set():
pass
f.result()


if __name__ == '__main__':
run_signal_handler_set_is_set_test()
36 changes: 36 additions & 0 deletions Lib/test/multiprocessingdata/set_clear_race.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import multiprocessing
import sys


# Reproduction code copied and modified from https://github.com/python/cpython/issues/95826
# Fixes the issue above

class SimpleRepro:
def __init__(self):
self.heartbeat_event = multiprocessing.Event()
self.shutdown_event = multiprocessing.Event()
self.child_proc = multiprocessing.Process(target=self.child_process, daemon=True)
self.child_proc.start()

def child_process(self):
while True:
if self.shutdown_event.is_set():
return
self.heartbeat_event.set()
self.heartbeat_event.clear()

def test_heartbeat(self):
exit_code = 0
for i in range(2000):
success = self.heartbeat_event.wait(100)
if not success:
exit_code = 1
break
self.shutdown_event.set()
self.child_proc.join()
sys.exit(exit_code)


if __name__ == '__main__':
foo = SimpleRepro()
foo.test_heartbeat()
30 changes: 30 additions & 0 deletions Lib/test/multiprocessingdata/wait_set_no_deadlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import multiprocessing
import signal
import concurrent.futures
import time
import os


# Shows that https://github.com/python/cpython/issues/85772 is fixed

def send_sigint(pid):
time.sleep(1) # Make sure shutdown_event.wait() is called
os.kill(pid, signal.SIGINT)


def run_signal_handler_wait_set_test():
shutdown_event = multiprocessing.Event()

def sigterm_handler(_signo, _stack_frame):
shutdown_event.set()

signal.signal(signal.SIGINT, sigterm_handler)

with concurrent.futures.ProcessPoolExecutor() as executor:
f = executor.submit(send_sigint, os.getpid())
shutdown_event.wait()
f.result()


if __name__ == '__main__':
run_signal_handler_wait_set_test()
3 changes: 3 additions & 0 deletions Lib/test/support/threading_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
import time
import unittest
import signal

from test import support

Expand All @@ -28,6 +29,8 @@ def threading_setup():
def threading_cleanup(*original_values):
orig_count, orig_ndangling = original_values

signal.stop_signal_thread()

timeout = 1.0
for _ in support.sleeping_retry(timeout, error=False):
# Copy the thread list to get a consistent output. threading._dangling
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import errno
import math
import platform
import signal
import socket
import sys
import threading
Expand All @@ -26,6 +27,7 @@

def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


def mock_socket_module():
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_context.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import decimal
import unittest
import signal


def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


@unittest.skipUnless(decimal.HAVE_CONTEXTVAR, "decimal is built with a thread-local context")
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_futures2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import contextvars
import traceback
import unittest
import signal
from asyncio import tasks


def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


class FutureTests:
Expand Down
3 changes: 2 additions & 1 deletion Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Tests for locks.py"""

import signal
import unittest
from unittest import mock
import re
Expand All @@ -21,6 +21,7 @@

def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


class LockTests(unittest.IsolatedAsyncioTestCase):
Expand Down
3 changes: 2 additions & 1 deletion Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Tests for queues.py"""

import asyncio
import signal
import unittest
from types import GenericAlias


def tearDownModule():
asyncio.set_event_loop_policy(None)

signal.stop_signal_thread()

class QueueBasicTests(unittest.IsolatedAsyncioTestCase):

Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_asyncio/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


def interrupt_self():
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import signal
import socket
import time
import threading
Expand All @@ -12,6 +13,7 @@

def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


class BaseStartServer(func_tests.FunctionalTestCaseMixin):
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_staggered.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import signal
import unittest
from asyncio.staggered import staggered_race

Expand All @@ -9,6 +10,7 @@

def tearDownModule():
asyncio.set_event_loop_policy(None)
signal.stop_signal_thread()


class StaggeredTests(unittest.IsolatedAsyncioTestCase):
Expand Down
4 changes: 2 additions & 2 deletions Lib/test/test_asyncio/test_taskgroups.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Adapted with permission from the EdgeDB project;
# license: PSFL.

import signal
import sys
import gc
import asyncio
Expand All @@ -15,7 +15,7 @@
# To prevent a warning "test altered the execution environment"
def tearDownModule():
asyncio.set_event_loop_policy(None)

signal.stop_signal_thread()

class MyExc(Exception):
pass
Expand Down
Loading
Loading