Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1937,7 +1937,7 @@ def call_exception_handler(self, context):

def _add_callback(self, handle):
"""Add a Handle to _ready."""
if not handle._cancelled:
if not handle.cancelled():
self._ready.append(handle)

def _add_callback_signalsafe(self, handle):
Expand Down Expand Up @@ -1966,7 +1966,7 @@ def _run_once(self):
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
if handle.cancelled():
handle._scheduled = False
else:
new_scheduled.append(handle)
Expand Down Expand Up @@ -2016,7 +2016,7 @@ def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
if handle.cancelled():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant because _run checks now

continue
if self._debug:
try:
Expand Down
101 changes: 75 additions & 26 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,73 +34,123 @@

from . import format_helpers

class _HandleCancelled:
@staticmethod
def callback(*args):
pass

args = ()

_HANDLE_CANCELLED = _HandleCancelled()


class _HandlePartial:
__slots__ = ("callback", "args")

def __init__(self, callback, args):
self.callback = callback
self.args = args

def __eq__(self, other):
if isinstance(other, _HandlePartial):
return (self.callback == other.callback and
self.args == other.args)
return NotImplemented


class Handle:
"""Object returned by callback registration methods."""

__slots__ = ('_callback', '_args', '_cancelled', '_loop',
__slots__ = ('_callback_partial', '_loop',
'_source_traceback', '_repr', '__weakref__',
'_context')

def __init__(self, callback, args, loop, context=None):
if context is None:
context = contextvars.copy_context()
self._context = context
self._loop = loop
self._callback = callback
self._args = args
self._cancelled = False
self._context = context
self._callback_partial = _HandlePartial(callback, args)
self._repr = None
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
else:
self._source_traceback = None

def _repr_info(self):
def _repr_info(self, cancelling, callback_partial):
info = [self.__class__.__name__]
if self._cancelled:
if cancelling:
info.append('cancelled')
if self._callback is not None:
if callback_partial is _HANDLE_CANCELLED:
info.append('cancelled')
callback = None
args = None
else:
callback = callback_partial.callback
args = callback_partial.args

if callback is not None:
info.append(format_helpers._format_callback_source(
self._callback, self._args,
callback, args,
debug=self._loop.get_debug()))
if self._source_traceback:
frame = self._source_traceback[-1]
info.append(f'created at {frame[0]}:{frame[1]}')
return info

def _repr_atomic(self, cancelling, callback_partial):
info = self._repr_info(cancelling, callback_partial)
return '<{}>'.format(' '.join(info))

def __repr__(self):
if self._repr is not None:
return self._repr
info = self._repr_info()
return '<{}>'.format(' '.join(info))
return self._repr_atomic(cancelling=False, callback_partial=self._callback_partial)

def get_context(self):
return self._context

def cancel(self):
if not self._cancelled:
self._cancelled = True
callback_partial = self._callback_partial
self._callback_partial = _HANDLE_CANCELLED
if callback_partial is not _HANDLE_CANCELLED:
if self._loop.get_debug():
# Keep a representation in debug mode to keep callback and
# parameters. For example, to log the warning
# "Executing <Handle...> took 2.5 second"
self._repr = repr(self)
self._callback = None
self._args = None
self._repr = self._repr_atomic(cancelling=True, callback_partial=callback_partial)

def cancelled(self):
return self._cancelled
return self._callback_partial is _HANDLE_CANCELLED

@property
def _cancelled(self):
return self.cancelled()

@property
def _callback(self):
callback_partial = self._callback_partial
if callback_partial is _HANDLE_CANCELLED:
return None
return callback_partial.callback

@property
def _args(self):
callback_partial = self._callback_partial
if callback_partial is _HANDLE_CANCELLED:
return None
return callback_partial.args

def _run(self):
callback_partial = self._callback_partial
try:
self._context.run(self._callback, *self._args)
self._context.run(callback_partial.callback, *callback_partial.args)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
cb = format_helpers._format_callback_source(
self._callback, self._args,
callback_partial.callback, callback_partial.args,
debug=self._loop.get_debug())
msg = f'Exception in callback {cb}'
context = {
Expand All @@ -111,6 +161,7 @@ def _run(self):
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
callback_partial = None
self = None # Needed to break cycles when an exception occurs.


Expand All @@ -126,9 +177,9 @@ def __init__(self, when, callback, args, loop, context=None):
self._when = when
self._scheduled = False

def _repr_info(self):
info = super()._repr_info()
pos = 2 if self._cancelled else 1
def _repr_info(self, cancelling, callback_args):
info = super()._repr_info(cancelling, callback_args)
pos = 2 if (cancelling or callback_args is _HANDLE_CANCELLED) else 1
info.insert(pos, f'when={self._when}')
return info

Expand Down Expand Up @@ -158,13 +209,11 @@ def __ge__(self, other):
def __eq__(self, other):
if isinstance(other, TimerHandle):
return (self._when == other._when and
self._callback == other._callback and
self._args == other._args and
self._cancelled == other._cancelled)
self._callback_partial == other._callback_partial)
return NotImplemented

def cancel(self):
if not self._cancelled:
if not self.cancelled():
self._loop._timer_handle_cancelled(self)
super().cancel()

Expand Down
2 changes: 1 addition & 1 deletion Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _handle_signal(self, sig):
handle = self._signal_handlers.get(sig)
if handle is None:
return # Assume it's some race condition.
if handle._cancelled:
if handle.cancelled():
self.remove_signal_handler(sig) # Remove it properly.
else:
self._add_callback_signalsafe(handle)
Expand Down
4 changes: 2 additions & 2 deletions Lib/test/test_asyncio/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,11 +1810,11 @@ def call_later(delay, callback, *args):
loop.call_later = call_later
test_utils.run_briefly(loop)

self.assertFalse(handle._cancelled)
self.assertFalse(handle.cancelled())

t.cancel()
test_utils.run_briefly(loop)
self.assertTrue(handle._cancelled)
self.assertTrue(handle.cancelled())

def test_task_cancel_sleeping_task(self):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
make ``asyncio.events.Handle.cancel`` atomic for use in call_soon_threadsafe
Loading