Skip to content

Commit 0a321de

Browse files
committed
Implement fast Task & Future.
In some cases this can boost the performance up to 30%.
1 parent 8246724 commit 0a321de

File tree

9 files changed

+1381
-12
lines changed

9 files changed

+1381
-12
lines changed

tests/test_futures.py

Lines changed: 443 additions & 0 deletions
Large diffs are not rendered by default.

tests/test_tasks.py

Lines changed: 425 additions & 0 deletions
Large diffs are not rendered by default.

uvloop/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from . import includes as __includes
66
from . import _patch
7-
from .loop import Loop as __BaseLoop
7+
from .loop import Loop as __BaseLoop, Future
88

99

1010
__all__ = ('new_event_loop', 'EventLoopPolicy')

uvloop/_patch.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import asyncio
2-
3-
from asyncio import coroutines
1+
import sys
42

53

64
def _format_coroutine(coro):
@@ -28,5 +26,49 @@ def _format_coroutine(coro):
2826
return _old_format_coroutine(coro)
2927

3028

31-
_old_format_coroutine = coroutines._format_coroutine
32-
coroutines._format_coroutine = _format_coroutine
29+
async def _wait_for_data(self, func_name):
30+
"""Wait until feed_data() or feed_eof() is called.
31+
32+
If stream was paused, automatically resume it.
33+
"""
34+
if self._waiter is not None:
35+
raise RuntimeError('%s() called while another coroutine is '
36+
'already waiting for incoming data' % func_name)
37+
38+
assert not self._eof, '_wait_for_data after EOF'
39+
40+
# Waiting for data while paused will make deadlock, so prevent it.
41+
if self._paused:
42+
self._paused = False
43+
self._transport.resume_reading()
44+
45+
try:
46+
create_future = self._loop.create_future
47+
except AttributeError:
48+
self._waiter = asyncio.Future(loop=self._loop)
49+
else:
50+
self._waiter = create_future()
51+
52+
try:
53+
await self._waiter
54+
finally:
55+
self._waiter = None
56+
57+
58+
if sys.version_info < (3, 5, 2):
59+
# In Python 3.5.2 (see PEP 478 for 3.5 release schedule)
60+
# we won't need to patch anything.
61+
62+
import asyncio
63+
64+
from asyncio import coroutines
65+
from asyncio import streams
66+
67+
# This is needed to support Cython 'async def' coroutines.
68+
_old_format_coroutine = coroutines._format_coroutine
69+
coroutines._format_coroutine = _format_coroutine
70+
71+
# Fix a possible deadlock, improve performance.
72+
_old_wait_for_data = streams.StreamReader._wait_for_data
73+
_wait_for_data.__module__ = _old_wait_for_data.__module__
74+
streams.StreamReader._wait_for_data = _wait_for_data

uvloop/future.pyx

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
DEF _FUT_PENDING = 1
2+
DEF _FUT_CANCELLED = 2
3+
DEF _FUT_FINISHED = 3
4+
5+
6+
cdef class BaseFuture:
7+
cdef:
8+
int _state
9+
10+
readonly Loop _loop
11+
readonly list _callbacks
12+
readonly object _exception
13+
readonly object _result
14+
public bint _blocking
15+
readonly object _source_traceback
16+
readonly bint _log_traceback
17+
18+
def __init__(self, Loop loop):
19+
if loop is None:
20+
loop = aio_get_event_loop()
21+
if not isinstance(loop, Loop):
22+
raise TypeError('uvloop.Future supports only uvloop.Loop')
23+
24+
self._state = _FUT_PENDING
25+
self._loop = loop
26+
self._callbacks = []
27+
self._result = None
28+
self._exception = None
29+
self._blocking = False
30+
self._log_traceback = False
31+
32+
if loop._debug:
33+
self._source_traceback = tb_extract_stack(sys_getframe(0))
34+
else:
35+
self._source_traceback = None
36+
37+
cdef _schedule_callbacks(self):
38+
cdef:
39+
list callbacks
40+
size_t cb_len = len(self._callbacks)
41+
size_t i
42+
43+
if cb_len == 0:
44+
return
45+
46+
callbacks = self._callbacks
47+
self._callbacks = []
48+
49+
for i from 0 <= i < cb_len:
50+
self._loop.call_soon(callbacks[i], self)
51+
52+
cdef _add_done_callback(self, fn):
53+
if self._state != _FUT_PENDING:
54+
self._loop.call_soon(fn, self)
55+
else:
56+
self._callbacks.append(fn)
57+
58+
cdef _done(self):
59+
return self._state != _FUT_PENDING
60+
61+
cdef _cancel(self):
62+
if self._done():
63+
return False
64+
self._state = _FUT_CANCELLED
65+
self._schedule_callbacks()
66+
return True
67+
68+
# _result would shadow the "_result" property
69+
cdef _result_impl(self):
70+
if self._state == _FUT_CANCELLED:
71+
raise aio_CancelledError
72+
if self._state != _FUT_FINISHED:
73+
raise aio_InvalidStateError('Result is not ready.')
74+
self._log_traceback = False
75+
if self._exception is not None:
76+
raise self._exception
77+
return self._result
78+
79+
cdef _str_state(self):
80+
if self._state == _FUT_PENDING:
81+
return 'PENDING'
82+
elif self._state == _FUT_CANCELLED:
83+
return 'CANCELLED'
84+
elif self._state == _FUT_FINISHED:
85+
return 'FINISHED'
86+
else:
87+
raise RuntimeError('unknown Future state')
88+
89+
property _state:
90+
def __get__(self):
91+
return self._str_state()
92+
93+
def cancel(self):
94+
"""Cancel the future and schedule callbacks.
95+
96+
If the future is already done or cancelled, return False. Otherwise,
97+
change the future's state to cancelled, schedule the callbacks and
98+
return True.
99+
"""
100+
return self._cancel()
101+
102+
def cancelled(self):
103+
"""Return True if the future was cancelled."""
104+
return self._state == _FUT_CANCELLED
105+
106+
def done(self):
107+
"""Return True if the future is done.
108+
109+
Done means either that a result / exception are available, or that the
110+
future was cancelled.
111+
"""
112+
return self._state != _FUT_PENDING
113+
114+
def result(self):
115+
"""Return the result this future represents.
116+
117+
If the future has been cancelled, raises CancelledError. If the
118+
future's result isn't yet available, raises InvalidStateError. If
119+
the future is done and has an exception set, this exception is raised.
120+
"""
121+
return self._result_impl()
122+
123+
def exception(self):
124+
"""Return the exception that was set on this future.
125+
126+
The exception (or None if no exception was set) is returned only if
127+
the future is done. If the future has been cancelled, raises
128+
CancelledError. If the future isn't done yet, raises
129+
InvalidStateError.
130+
"""
131+
if self._state == _FUT_CANCELLED:
132+
raise aio_CancelledError
133+
if self._state != _FUT_FINISHED:
134+
raise aio_InvalidStateError('Exception is not set.')
135+
self._log_traceback = False
136+
return self._exception
137+
138+
def add_done_callback(self, fn):
139+
"""Add a callback to be run when the future becomes done.
140+
141+
The callback is called with a single argument - the future object. If
142+
the future is already done when this is called, the callback is
143+
scheduled with call_soon.
144+
"""
145+
self._add_done_callback(fn)
146+
147+
def remove_done_callback(self, fn):
148+
"""Remove all instances of a callback from the "call when done" list.
149+
150+
Returns the number of callbacks removed.
151+
"""
152+
cdef:
153+
size_t clen = len(self._callbacks)
154+
size_t i
155+
size_t ni = 0
156+
object cb
157+
158+
for i from 0 <= i < clen:
159+
cb = self._callbacks[i]
160+
if cb != fn:
161+
self._callbacks[ni] = cb
162+
ni += 1
163+
164+
if ni != clen:
165+
del self._callbacks[ni:]
166+
167+
return clen - ni
168+
169+
cpdef set_result(self, result):
170+
"""Mark the future done and set its result.
171+
172+
If the future is already done when this method is called, raises
173+
InvalidStateError.
174+
"""
175+
if self._state != _FUT_PENDING:
176+
raise aio_InvalidStateError('{}: {!r}'.format(
177+
self._str_state(), self))
178+
self._result = result
179+
self._state = _FUT_FINISHED
180+
self._schedule_callbacks()
181+
182+
cpdef set_exception(self, exception):
183+
"""Mark the future done and set an exception.
184+
185+
If the future is already done when this method is called, raises
186+
InvalidStateError.
187+
"""
188+
if self._state != _FUT_PENDING:
189+
raise aio_InvalidStateError('{}: {!r}'.format(
190+
self._str_state(), self))
191+
if isinstance(exception, type):
192+
exception = exception()
193+
if type(exception) is StopIteration:
194+
raise TypeError("StopIteration interacts badly with generators "
195+
"and cannot be raised into a Future")
196+
self._exception = exception
197+
self._state = _FUT_FINISHED
198+
self._schedule_callbacks()
199+
self._log_traceback = True
200+
201+
# Copy of __await__
202+
def __iter__(self):
203+
if self._state == _FUT_PENDING:
204+
self._blocking = True
205+
yield self # This tells Task to wait for completion.
206+
207+
if self._state == _FUT_PENDING:
208+
raise AssertionError("yield from wasn't used with future")
209+
210+
return self._result_impl() # May raise too.
211+
212+
# Copy of __iter__
213+
def __await__(self):
214+
if self._state == _FUT_PENDING:
215+
self._blocking = True
216+
yield self # This tells Task to wait for completion.
217+
218+
if self._state == _FUT_PENDING:
219+
raise AssertionError("yield from wasn't used with future")
220+
221+
return self._result_impl() # May raise too.
222+
223+
224+
class Future(BaseFuture, aio_Future):
225+
# Inherit asyncio.Future.__del__ and __repr__
226+
pass
227+
228+
229+
cdef uvloop_Future = Future

uvloop/includes/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import collections
44
import concurrent.futures
55
import functools
6+
import inspect
67
import itertools
78
import os
89
import signal
@@ -11,4 +12,6 @@
1112
import ssl
1213
import sys
1314
import threading
15+
import traceback
16+
import time
1417
import warnings

uvloop/includes/stdlib.pxi

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import asyncio, asyncio.log, asyncio.base_events, \
2-
asyncio.sslproto, asyncio.coroutines
2+
asyncio.sslproto, asyncio.coroutines, \
3+
asyncio.futures
34
import collections
45
import concurrent.futures
56
import functools
7+
import inspect
68
import itertools
79
import os
810
import signal as std_signal
@@ -11,11 +13,15 @@ import subprocess
1113
import ssl
1214
import sys
1315
import threading
16+
import traceback
1417
import time
1518
import warnings
1619

1720

21+
cdef aio_chain_future = asyncio.futures._chain_future
22+
cdef aio_get_event_loop = asyncio.get_event_loop
1823
cdef aio_CancelledError = asyncio.CancelledError
24+
cdef aio_InvalidStateError = asyncio.InvalidStateError
1925
cdef aio_TimeoutError = asyncio.TimeoutError
2026
cdef aio_Future = asyncio.Future
2127
cdef aio_Task = asyncio.Task
@@ -41,6 +47,7 @@ cdef cc_ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor
4147
cdef ft_partial = functools.partial
4248

4349
cdef iter_chain = itertools.chain
50+
cdef inspect_isgenerator = inspect.isgenerator
4451

4552
cdef int has_SO_REUSEPORT = hasattr(socket, 'SO_REUSEPORT')
4653
cdef int SO_REUSEPORT = getattr(socket, 'SO_REUSEPORT', 0)
@@ -80,6 +87,7 @@ cdef sys_ignore_environment = sys.flags.ignore_environment
8087
cdef sys_exc_info = sys.exc_info
8188
cdef sys_set_coroutine_wrapper = sys.set_coroutine_wrapper
8289
cdef sys_get_coroutine_wrapper = sys.get_coroutine_wrapper
90+
cdef sys_getframe = sys._getframe
8391

8492
cdef ssl_SSLContext = ssl.SSLContext
8593

@@ -93,11 +101,15 @@ cdef int signal_NSIG = std_signal.NSIG
93101

94102
cdef time_sleep = time.sleep
95103

104+
cdef tb_extract_stack = traceback.extract_stack
105+
cdef tb_format_list = traceback.format_list
106+
96107
cdef warnings_warn = warnings.warn
97108

98109

99110
# Cython doesn't clean-up imported objects properly in Py3 mode,
100111
# so we delete refs to all modules manually (except sys)
101112
del asyncio, concurrent, collections
102-
del functools, itertools, socket, os, threading, std_signal, subprocess, ssl
103-
del time, warnings
113+
del functools, inspect, itertools, socket, os, threading
114+
del std_signal, subprocess, ssl
115+
del time, traceback, warnings

0 commit comments

Comments
 (0)