Skip to content

Commit 88e431d

Browse files
authored
Merge pull request #518 from minrk/async-def
remove some use of old deprecated coroutine syntax
2 parents dc2eb15 + 111e81f commit 88e431d

File tree

3 files changed

+46
-29
lines changed

3 files changed

+46
-29
lines changed

ipyparallel/client/magics.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,6 @@ def nullcontext():
4343
# Distributed under the terms of the BSD License. The full license is in
4444
# the file COPYING, distributed as part of this software.
4545
# -----------------------------------------------------------------------------
46-
# -----------------------------------------------------------------------------
47-
# Imports
48-
# -----------------------------------------------------------------------------
49-
50-
try:
51-
from asyncio import coroutine
52-
except ImportError: # py2
53-
54-
def coroutine(f):
55-
raise NotImplementedError()
56-
5746

5847
import inspect
5948
import re
@@ -79,6 +68,20 @@ def _iscoroutinefunction(f):
7968
return False
8069

8170

71+
def _asyncify(f):
72+
"""Wrap a blocking call in a coroutine
73+
74+
Does not make the call non-blocking,
75+
just conforms to the API assuming it is awaitable.
76+
For use when patching-in replacement methods.
77+
"""
78+
79+
async def async_f(*args, **kwargs):
80+
return f(*args, **kwargs)
81+
82+
return async_f
83+
84+
8285
NO_LAST_RESULT = "%pxresult recalls last %px result, which has not yet been used."
8386

8487

@@ -444,14 +447,14 @@ def _enable_autopx(self):
444447
if _iscoroutinefunction(self.shell.run_cell):
445448
# original is a coroutine,
446449
# wrap ours in a coroutine
447-
pxrun_cell = coroutine(pxrun_cell)
450+
pxrun_cell = _asyncify(pxrun_cell)
448451
self.shell.run_cell = pxrun_cell
449452

450453
pxrun_nodes = self.pxrun_nodes
451454
if _iscoroutinefunction(self.shell.run_ast_nodes):
452455
# original is a coroutine,
453456
# wrap ours in a coroutine
454-
pxrun_nodes = coroutine(pxrun_nodes)
457+
pxrun_nodes = _asyncify(pxrun_nodes)
455458
self.shell.run_ast_nodes = pxrun_nodes
456459

457460
self._autopx = True

ipyparallel/controller/hub.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# Distributed under the terms of the Modified BSD License.
88
from __future__ import print_function
99

10+
import inspect
1011
import json
1112
import os
1213
import sys
@@ -20,8 +21,6 @@
2021
from jupyter_client.jsonutil import parse_date
2122
from jupyter_client.session import Session
2223
from tornado import ioloop
23-
from tornado.gen import coroutine
24-
from tornado.gen import maybe_future
2524
from traitlets import Any
2625
from traitlets import Bytes
2726
from traitlets import Dict
@@ -319,8 +318,7 @@ def dispatch_monitor_traffic(self, msg):
319318
self.log.error("Unrecognized monitor topic: %r", switch)
320319

321320
@util.log_errors
322-
@coroutine
323-
def dispatch_query(self, msg):
321+
async def dispatch_query(self, msg):
324322
"""Route registration requests and queries from clients."""
325323
try:
326324
idents, msg = self.session.feed_identities(msg)
@@ -357,8 +355,8 @@ def dispatch_query(self, msg):
357355

358356
try:
359357
f = handler(idents, msg)
360-
if f:
361-
yield maybe_future(f)
358+
if f and inspect.isawaitable(f):
359+
await f
362360
except Exception:
363361
content = error.wrap_exception()
364362
self.log.error("Error handling request: %r", msg_type, exc_info=True)
@@ -1450,16 +1448,15 @@ def db_query(self, client_id, msg):
14501448
buffers=buffers,
14511449
)
14521450

1453-
@coroutine
1454-
def become_dask(self, client_id, msg):
1451+
async def become_dask(self, client_id, msg):
14551452
"""Start a dask.distributed Scheduler."""
14561453
if self.distributed_scheduler is None:
14571454
kwargs = msg['content'].get('scheduler_args', {})
14581455
self.log.info("Becoming dask.distributed scheduler: %s", kwargs)
14591456
from distributed import Scheduler
14601457

14611458
self.distributed_scheduler = scheduler = Scheduler(**kwargs)
1462-
yield scheduler.start()
1459+
await scheduler.start()
14631460
content = {
14641461
'status': 'ok',
14651462
'ip': self.distributed_scheduler.ip,

ipyparallel/util.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
"""Some generic utilities for dealing with classes, urls, and serialization."""
33
# Copyright (c) IPython Development Team.
44
# Distributed under the terms of the Modified BSD License.
5+
import asyncio
6+
import functools
7+
import inspect
58
import logging
69
import os
710
import re
@@ -23,7 +26,6 @@
2326
import zmq
2427
from dateutil.parser import parse as dateutil_parse
2528
from dateutil.tz import tzlocal
26-
from decorator import decorator
2729
from IPython import get_ipython
2830
from IPython.core.profiledir import ProfileDir
2931
from IPython.core.profiledir import ProfileDirError
@@ -101,17 +103,32 @@ def get(self, key, default=None):
101103
# -----------------------------------------------------------------------------
102104

103105

104-
@decorator
105-
def log_errors(f, self, *args, **kwargs):
106+
def log_errors(f):
106107
"""decorator to log unhandled exceptions raised in a method.
107108
108109
For use wrapping on_recv callbacks, so that exceptions
109110
do not cause the stream to be closed.
110111
"""
111-
try:
112-
return f(self, *args, **kwargs)
113-
except Exception as e:
114-
self.log.error("Uncaught exception in %r" % f, exc_info=True)
112+
113+
@functools.wraps(f)
114+
def logs_errors(self, *args, **kwargs):
115+
try:
116+
result = f(self, *args, **kwargs)
117+
except Exception:
118+
self.log.error("Uncaught exception in {f}: {future.exception()}")
119+
return
120+
121+
if inspect.isawaitable(result):
122+
# if it's async, schedule logging for when the future resolves
123+
future = asyncio.ensure_future(result)
124+
125+
def _log_error(future):
126+
if future.exception():
127+
self.log.error("Uncaught exception in {f}: {future.exception()}")
128+
129+
future.add_done_callback(_log_error)
130+
131+
return logs_errors
115132

116133

117134
def is_url(url):

0 commit comments

Comments
 (0)