Skip to content

Commit 9f87336

Browse files
committed
Use native asyncio Future & Task in Python 3.6
1 parent 89f3f1e commit 9f87336

File tree

4 files changed

+241
-226
lines changed

4 files changed

+241
-226
lines changed

uvloop/chain_futs.pyx

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,14 @@ cdef _chain_future(source, destination):
5353
source_loop = None
5454
dest_loop = None
5555

56-
source_type = type(source)
57-
dest_type = type(destination)
58-
59-
if source_type is uvloop_Future:
56+
if _is_uvloop_future(source):
6057
source_loop = (<BaseFuture>source)._loop
61-
elif source_type is not cc_Future and isfuture(source):
58+
elif isfuture(source):
6259
source_loop = source._loop
6360

64-
if dest_type is uvloop_Future:
61+
if _is_uvloop_future(destination):
6562
dest_loop = (<BaseFuture>destination)._loop
66-
elif dest_type is not cc_Future and isfuture(destination):
63+
elif isfuture(destination):
6764
dest_loop = destination._loop
6865

6966
def _set_state(future, other):

uvloop/future.pyx

Lines changed: 235 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,240 @@ cdef class BaseFuture:
246246
return self._result_impl() # May raise too.
247247

248248

249-
class Future(BaseFuture, aio_Future):
250-
# Inherit asyncio.Future.__del__ and __repr__
251-
pass
249+
cdef class BaseTask(BaseFuture):
250+
cdef:
251+
readonly object _coro
252+
readonly object _fut_waiter
253+
readonly bint _must_cancel
254+
public bint _log_destroy_pending
255+
256+
def __init__(self, coro not None, Loop loop):
257+
BaseFuture.__init__(self, loop)
258+
259+
self._coro = coro
260+
self._fut_waiter = None
261+
self._must_cancel = False
262+
self._log_destroy_pending = True
263+
264+
self.__class__._all_tasks.add(self)
265+
266+
self._loop._call_soon_handle(
267+
new_MethodHandle1(
268+
self._loop,
269+
"Task._step",
270+
<method1_t>self._fast_step,
271+
self,
272+
None))
273+
274+
def cancel(self):
275+
if self.done():
276+
return False
277+
if self._fut_waiter is not None:
278+
if self._fut_waiter.cancel():
279+
# Leave self._fut_waiter; it may be a Task that
280+
# catches and ignores the cancellation so we may have
281+
# to cancel it again later.
282+
return True
283+
# It must be the case that self._step is already scheduled.
284+
self._must_cancel = True
285+
return True
286+
287+
cdef _raise_wrong_loop(self, fut):
288+
ex = RuntimeError(
289+
'Task {!r} got Future {!r} attached to a '
290+
'different loop'.format(self, fut))
291+
self._loop._call_soon_handle(
292+
new_MethodHandle1(
293+
self._loop,
294+
"Task._step",
295+
<method1_t>self._fast_step,
296+
self,
297+
ex))
298+
299+
cdef _raise_yield(self, fut):
300+
ex = RuntimeError(
301+
'yield was used instead of yield from '
302+
'in task {!r} with {!r}'.format(self, fut))
303+
self._loop._call_soon_handle(
304+
new_MethodHandle1(
305+
self._loop,
306+
"Task._step",
307+
<method1_t>self._fast_step,
308+
self,
309+
ex))
310+
311+
cdef _raise_generator(self, val):
312+
ex = RuntimeError(
313+
'yield was used instead of yield from for '
314+
'generator in task {!r} with {}'.format(self, val))
315+
self._loop._call_soon_handle(
316+
new_MethodHandle1(
317+
self._loop,
318+
"Task._step",
319+
<method1_t>self._fast_step,
320+
self,
321+
ex))
322+
323+
cdef _raise_else(self, val):
324+
ex = RuntimeError('Task got bad yield: {!r}'.format(val))
325+
self._loop._call_soon_handle(
326+
new_MethodHandle1(
327+
self._loop,
328+
"Task._step",
329+
<method1_t>self._fast_step,
330+
self,
331+
ex))
332+
333+
cdef _skip_oneloop(self):
334+
self._loop._call_soon_handle(
335+
new_MethodHandle1(
336+
self._loop,
337+
"Task._step",
338+
<method1_t>self._fast_step,
339+
self,
340+
None))
341+
342+
cdef _fast_step(self, exc):
343+
cdef:
344+
BaseFuture nfut
345+
object meth
346+
object _current_tasks = self.__class__._current_tasks
347+
348+
if self._state != _FUT_PENDING:
349+
raise AssertionError(
350+
'_step(): already done: {!r}, {!r}'.format(self, exc))
351+
352+
if self._must_cancel:
353+
if not isinstance(exc, aio_CancelledError):
354+
exc = aio_CancelledError()
355+
self._must_cancel = False
356+
357+
self._fut_waiter = None
358+
359+
# Let it fail early with an AttributeError if self._coro
360+
# is not a coroutine/generator.
361+
if exc is None:
362+
meth = self._coro.send
363+
else:
364+
meth = self._coro.throw
365+
366+
_current_tasks[self._loop] = self
367+
# Call either coro.throw(exc) or coro.send(None).
368+
try:
369+
if exc is None:
370+
# We use the `send` method directly, because coroutines
371+
# don't have `__iter__` and `__next__` methods.
372+
result = meth(None)
373+
else:
374+
result = meth(exc)
375+
except StopIteration as exc:
376+
self.set_result(exc.value)
377+
except aio_CancelledError as exc:
378+
BaseFuture._cancel(self) # I.e., Future.cancel(self).
379+
except Exception as exc:
380+
self.set_exception(exc)
381+
except BaseException as exc:
382+
self.set_exception(exc)
383+
raise
384+
else:
385+
result_type = type(result)
386+
if result_type is uvloop_Future:
387+
# Yielded Future must come from Future.__iter__().
388+
nfut = <BaseFuture>result
389+
if nfut._loop is not self._loop:
390+
self._raise_wrong_loop(result)
391+
elif nfut._blocking:
392+
nfut._blocking = False
393+
nfut._add_done_callback(self._wakeup)
394+
self._fut_waiter = result
395+
if self._must_cancel:
396+
if self._fut_waiter.cancel():
397+
self._must_cancel = False
398+
else:
399+
self._raise_yield(result)
400+
401+
elif result_type is aio_Future or isfuture(result):
402+
# Yielded Future must come from Future.__iter__().
403+
if result._loop is not self._loop:
404+
self._raise_wrong_loop(result)
405+
elif _future_get_blocking(result):
406+
_future_set_blocking(result, False)
407+
result.add_done_callback(self._wakeup)
408+
self._fut_waiter = result
409+
if self._must_cancel:
410+
if self._fut_waiter.cancel():
411+
self._must_cancel = False
412+
else:
413+
self._raise_yield(result)
414+
415+
elif result is None:
416+
# Bare yield relinquishes control for one event loop iteration.
417+
self._skip_oneloop()
418+
419+
elif inspect_isgenerator(result):
420+
# Yielding a generator is just wrong.
421+
self._raise_generator(result)
422+
423+
else:
424+
# Yielding something else is an error.
425+
self._raise_else(result)
426+
finally:
427+
_current_tasks.pop(self._loop)
428+
429+
cdef _fast_wakeup(self, future):
430+
try:
431+
if type(future) is uvloop_Future:
432+
(<BaseFuture>future)._result_impl()
433+
else:
434+
future.result()
435+
except Exception as exc:
436+
# This may also be a cancellation.
437+
self._fast_step(exc)
438+
else:
439+
# Don't pass the value of `future.result()` explicitly,
440+
# as `Future.__iter__` and `Future.__await__` don't need it.
441+
self._fast_step(None)
442+
443+
def _step(self, exc=None):
444+
self._fast_step(exc)
445+
self = None
446+
447+
def _wakeup(self, future):
448+
self._fast_wakeup(future)
449+
self = None
450+
451+
452+
cdef uvloop_Future = None
453+
454+
cdef future_factory
455+
cdef task_factory
456+
457+
458+
if sys.version_info >= (3, 6):
459+
# In Python 3.6 Task and Future are implemented in C and
460+
# are already fast.
461+
462+
future_factory = aio_Future
463+
task_factory = aio_Task
464+
465+
Future = aio_Future
466+
467+
else:
468+
class Future(BaseFuture, aio_Future):
469+
# Inherit asyncio.Future.__del__ and __repr__
470+
pass
471+
472+
473+
class Task(BaseTask, aio_Task):
474+
# Inherit asyncio.Task.__del__ and __repr__ and a bunch
475+
# of class methods.
476+
pass
477+
478+
uvloop_Future = Future
479+
480+
future_factory = Future
481+
task_factory = Task
252482

253483

254-
cdef uvloop_Future = Future
484+
cdef _is_uvloop_future(fut):
485+
return uvloop_Future is not None and type(fut) == uvloop_Future

uvloop/loop.pyx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ cdef class Loop:
499499
"than the current one")
500500

501501
cdef inline _new_future(self):
502-
return uvloop_Future(self)
502+
return future_factory(loop=self)
503503

504504
cdef _track_transport(self, UVBaseTransport transport):
505505
self._transports[transport._fileno()] = transport
@@ -1086,7 +1086,7 @@ cdef class Loop:
10861086
"""
10871087
self._check_closed()
10881088
if self._task_factory is None:
1089-
task = uvloop_Task(coro, self)
1089+
task = task_factory(coro, loop=self)
10901090
else:
10911091
task = self._task_factory(self, coro)
10921092
return task
@@ -2450,7 +2450,6 @@ include "server.pyx"
24502450

24512451
include "future.pyx"
24522452
include "chain_futs.pyx"
2453-
include "task.pyx"
24542453

24552454

24562455
# Used in UVProcess

0 commit comments

Comments
 (0)