Skip to content

Commit 2caaffc

Browse files
add ThreadSafeHandle
1 parent ffece55 commit 2caaffc

File tree

4 files changed

+29
-8
lines changed

4 files changed

+29
-8
lines changed

Lib/asyncio/base_events.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,10 @@ def call_soon_threadsafe(self, callback, *args, context=None):
873873
self._check_closed()
874874
if self._debug:
875875
self._check_callback(callback, 'call_soon_threadsafe')
876-
handle = self._call_soon(callback, args, context)
876+
handle = events._ThreadSafeHandle(callback, args, self, context)
877+
self._ready.append(handle)
878+
if handle._source_traceback:
879+
del handle._source_traceback[-1]
877880
if handle._source_traceback:
878881
del handle._source_traceback[-1]
879882
self._write_to_self()
@@ -1937,7 +1940,7 @@ def call_exception_handler(self, context):
19371940

19381941
def _add_callback(self, handle):
19391942
"""Add a Handle to _ready."""
1940-
if not handle._cancelled:
1943+
if not handle.cancelled():
19411944
self._ready.append(handle)
19421945

19431946
def _add_callback_signalsafe(self, handle):
@@ -1966,7 +1969,7 @@ def _run_once(self):
19661969
# is too high
19671970
new_scheduled = []
19681971
for handle in self._scheduled:
1969-
if handle._cancelled:
1972+
if handle.cancelled():
19701973
handle._scheduled = False
19711974
else:
19721975
new_scheduled.append(handle)
@@ -2016,7 +2019,7 @@ def _run_once(self):
20162019
ntodo = len(self._ready)
20172020
for i in range(ntodo):
20182021
handle = self._ready.popleft()
2019-
if handle._cancelled:
2022+
if handle.cancelled():
20202023
continue
20212024
if self._debug:
20222025
try:

Lib/asyncio/events.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(self, callback, args, loop, context=None):
5959

6060
def _repr_info(self):
6161
info = [self.__class__.__name__]
62-
if self._cancelled:
62+
if self.cancelled():
6363
info.append('cancelled')
6464
if self._callback is not None:
6565
info.append(format_helpers._format_callback_source(
@@ -113,6 +113,24 @@ def _run(self):
113113
self._loop.call_exception_handler(context)
114114
self = None # Needed to break cycles when an exception occurs.
115115

116+
class _ThreadSafeHandle(Handle):
117+
118+
def __init__(self, callback, args, loop, context=None):
119+
super().__init__(callback, args, loop, context)
120+
self._lock = threading.RLock()
121+
122+
def cancel(self):
123+
with self._lock:
124+
return super().cancel()
125+
126+
def cancelled(self):
127+
with self._lock:
128+
return super().cancelled()
129+
130+
def _run(self):
131+
with self._lock:
132+
return super()._run()
133+
116134

117135
class TimerHandle(Handle):
118136
"""Object returned by timed callback registration methods."""

Lib/asyncio/unix_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def _handle_signal(self, sig):
138138
handle = self._signal_handlers.get(sig)
139139
if handle is None:
140140
return # Assume it's some race condition.
141-
if handle._cancelled:
141+
if handle.cancelled():
142142
self.remove_signal_handler(sig) # Remove it properly.
143143
else:
144144
self._add_callback_signalsafe(handle)

Lib/test/test_asyncio/test_tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,11 +1810,11 @@ def call_later(delay, callback, *args):
18101810
loop.call_later = call_later
18111811
test_utils.run_briefly(loop)
18121812

1813-
self.assertFalse(handle._cancelled)
1813+
self.assertFalse(handle.cancelled())
18141814

18151815
t.cancel()
18161816
test_utils.run_briefly(loop)
1817-
self.assertTrue(handle._cancelled)
1817+
self.assertTrue(handle.cancelled())
18181818

18191819
def test_task_cancel_sleeping_task(self):
18201820

0 commit comments

Comments
 (0)