Skip to content

Commit aafa7ab

Browse files
committed
Replace Session.post by Session.threadsafe_call
Session.post is a hack to allow communication from other threads, but it is much simpler and cleaner to allow callbacks be invoked from the main thread. Also support an optional "setup callback" in the Session.run.
1 parent aee3116 commit aafa7ab

File tree

10 files changed

+77
-93
lines changed

10 files changed

+77
-93
lines changed

neovim/api/common.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,22 +204,23 @@ def __init__(self, session, hook):
204204
self._in = self._hook.from_nvim
205205
self._out = self._hook.to_nvim
206206

207-
def post(self, name, *args):
208-
"""Wrapper for Session.post."""
209-
self._session.post(name, *args)
207+
def threadsafe_call(self, fn, *args, **kwargs):
208+
"""Wrapper for Session.threadsafe_call."""
209+
self._session.threadsafe_call(fn, *args, **kwargs)
210210

211211
def next_message(self):
212212
"""Wrapper for Session.next_message."""
213213
msg = self._session.next_message()
214-
return walk(self._in, msg, self, msg[1], msg[0])
214+
if msg:
215+
return walk(self._in, msg, self, msg[1], msg[0])
215216

216217
def request(self, name, *args):
217218
"""Wrapper for Session.request."""
218219
args = walk(self._out, args, self, name, 'out-request')
219220
return walk(self._in, self._session.request(name, *args), self, name,
220221
'out-request')
221222

222-
def run(self, request_cb, notification_cb):
223+
def run(self, request_cb, notification_cb, setup_cb=None):
223224
"""Wrapper for Session.run."""
224225
def filter_request_cb(name, args):
225226
result = request_cb(self._in(name, self, name, 'request'),
@@ -230,7 +231,7 @@ def filter_notification_cb(name, args):
230231
notification_cb(self._in(name, self, name, 'notification'),
231232
walk(self._in, args, self, name, 'notification'))
232233

233-
self._session.run(filter_request_cb, filter_notification_cb)
234+
self._session.run(filter_request_cb, filter_notification_cb, setup_cb)
234235

235236
def stop(self):
236237
"""Wrapper for Session.stop."""

neovim/msgpack_rpc/async_session.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,9 @@ def __init__(self, msgpack_stream):
2828
2: self._on_notification
2929
}
3030

31-
def post(self, method, args):
32-
"""Post a notification to the queue from another thread.
33-
34-
A msgpack-rpc notification with method `method` and argument `args` is
35-
posted to the notification queue. This can be used to send messages
36-
from other threads.
37-
"""
38-
# We encode method names to be consitent with names coming from Nvim,
39-
# which always come as byte strings
40-
self._msgpack_stream.post((2, method.encode('utf-8'), args,))
31+
def threadsafe_call(self, fn):
32+
"""Wrapper around `MsgpackStream.threadsafe_call`."""
33+
self._msgpack_stream.threadsafe_call(fn)
4134

4235
def request(self, method, args, response_cb):
4336
"""Send a msgpack-rpc request to Nvim.

neovim/msgpack_rpc/event_loop/asyncio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ def _run(self):
107107
def _stop(self):
108108
self._loop.stop()
109109

110-
def _interrupt(self):
111-
self._loop.call_soon_threadsafe(lambda: self.stop())
110+
def _threadsafe_call(self, fn):
111+
self._loop.call_soon_threadsafe(fn)
112112

113113
def _setup_signals(self, signals):
114114
self._signals = list(signals)

neovim/msgpack_rpc/event_loop/base.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Common code for event loop implementations."""
22
import logging
33
import signal
4-
import threading
54

65

76
logger = logging.getLogger(__name__)
@@ -112,11 +111,13 @@ def send(self, data):
112111
debug("Sending '%s'", data)
113112
self._send(data)
114113

115-
def interrupt(self):
116-
"""Stop the event loop from another thread."""
117-
debug('Interrupted event loop from thread %s',
118-
threading.current_thread())
119-
self._interrupt()
114+
def threadsafe_call(self, fn):
115+
"""Call a function in the event loop thread.
116+
117+
This is the only safe way to interact with a session from other
118+
threads.
119+
"""
120+
self._threadsafe_call(fn)
120121

121122
def run(self, data_cb):
122123
"""Run the event loop."""

neovim/msgpack_rpc/event_loop/uv.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Event loop implementation that uses pyuv(libuv-python bindings)."""
22
import sys
3+
from collections import deque
34

45
import pyuv
56

@@ -12,9 +13,10 @@ class UvEventLoop(BaseEventLoop):
1213

1314
def _init(self):
1415
self._loop = pyuv.Loop()
15-
self._async = pyuv.Async(self._loop, lambda h: self.stop())
16+
self._async = pyuv.Async(self._loop, self._on_async)
1617
self._connection_error = None
1718
self._error_stream = None
19+
self._callbacks = deque()
1820

1921
def _on_connect(self, stream, error):
2022
self.stop()
@@ -96,9 +98,14 @@ def _run(self):
9698
def _stop(self):
9799
self._loop.stop()
98100

99-
def _interrupt(self):
101+
def _threadsafe_call(self, fn):
102+
self._callbacks.append(fn)
100103
self._async.send()
101104

105+
def _on_async(self, handle):
106+
while self._callbacks:
107+
self._callbacks.popleft()()
108+
102109
def _setup_signals(self, signals):
103110
self._signal_handles = []
104111
handler = lambda h, signum: self._on_signal(signum)

neovim/msgpack_rpc/msgpack_stream.py

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Msgpack handling in the event loop pipeline."""
22
import logging
3-
from collections import deque
43

54
from msgpack import Packer, Unpacker
65

@@ -20,24 +19,17 @@ class MsgpackStream(object):
2019
def __init__(self, event_loop):
2120
"""Wrap `event_loop` on a msgpack-aware interface."""
2221
self._event_loop = event_loop
23-
self._posted = deque()
2422
self._packer = Packer(use_bin_type=True, encoding=None)
2523
self._unpacker = Unpacker()
2624
self._message_cb = None
27-
self._stopped = False
2825

2926
def set_packer_encoding(self, encoding):
3027
"""Switch encoding for Unicode strings."""
3128
self._packer = Packer(use_bin_type=True, encoding=encoding)
3229

33-
def post(self, msg):
34-
"""Post `msg` to the read queue of the `MsgpackStream` instance.
35-
36-
Use the event loop `interrupt()` method to push msgpack objects from
37-
other threads.
38-
"""
39-
self._posted.append(msg)
40-
self._event_loop.interrupt()
30+
def threadsafe_call(self, fn):
31+
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
32+
self._event_loop.threadsafe_call(fn)
4133

4234
def send(self, msg):
4335
"""Queue `msg` for sending to Nvim."""
@@ -51,22 +43,13 @@ def run(self, message_cb):
5143
a message has been successfully parsed from the input stream.
5244
"""
5345
self._message_cb = message_cb
54-
self._run()
46+
self._event_loop.run(self._on_data)
5547
self._message_cb = None
5648

5749
def stop(self):
5850
"""Stop the event loop."""
59-
self._stopped = True
6051
self._event_loop.stop()
6152

62-
def _run(self):
63-
self._stopped = False
64-
while not self._stopped:
65-
if self._posted:
66-
self._message_cb(self._posted.popleft())
67-
continue
68-
self._event_loop.run(self._on_data)
69-
7053
def _on_data(self, data):
7154
self._unpacker.feed(data)
7255
while True:

neovim/msgpack_rpc/session.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,18 @@ def __init__(self, async_session):
2525
self._request_cb = self._notification_cb = None
2626
self._pending_messages = deque()
2727
self._is_running = False
28+
self._setup_exception = None
2829

29-
def post(self, name, *args):
30-
"""Simple wrapper around `AsyncSession.post`."""
31-
self._async_session.post(name, args)
30+
def threadsafe_call(self, fn, *args, **kwargs):
31+
"""Wrapper around `AsyncSession.threadsafe_call`."""
32+
def handler():
33+
fn(*args, **kwargs)
34+
35+
def greenlet_wrapper():
36+
gr = greenlet.greenlet(handler)
37+
gr.switch()
38+
39+
self._async_session.threadsafe_call(greenlet_wrapper)
3240

3341
def next_message(self):
3442
"""Block until a message(request or notification) is available.
@@ -42,7 +50,8 @@ def next_message(self):
4250
return self._pending_messages.popleft()
4351
self._async_session.run(self._enqueue_request_and_stop,
4452
self._enqueue_notification_and_stop)
45-
return self._pending_messages.popleft()
53+
if self._pending_messages:
54+
return self._pending_messages.popleft()
4655

4756
def request(self, method, *args):
4857
"""Send a msgpack-rpc request and block until as response is received.
@@ -67,7 +76,7 @@ def request(self, method, *args):
6776
raise self.error_wrapper(err)
6877
return rv
6978

70-
def run(self, request_cb, notification_cb):
79+
def run(self, request_cb, notification_cb, setup_cb=None):
7180
"""Run the event loop to receive requests and notifications from Nvim.
7281
7382
Like `AsyncSession.run()`, but `request_cb` and `notification_cb` are
@@ -76,6 +85,23 @@ def run(self, request_cb, notification_cb):
7685
self._request_cb = request_cb
7786
self._notification_cb = notification_cb
7887
self._is_running = True
88+
self._setup_exception = None
89+
90+
def on_setup():
91+
try:
92+
setup_cb()
93+
except Exception as e:
94+
self._setup_exception = e
95+
self.stop()
96+
97+
if setup_cb:
98+
# Create a new greenlet to handle the setup function
99+
gr = greenlet.greenlet(on_setup)
100+
gr.switch()
101+
102+
if self._setup_exception:
103+
raise self._setup_exception
104+
79105
# Process all pending requests and notifications
80106
while self._pending_messages:
81107
msg = self._pending_messages.popleft()
@@ -85,6 +111,9 @@ def run(self, request_cb, notification_cb):
85111
self._request_cb = None
86112
self._notification_cb = None
87113

114+
if self._setup_exception:
115+
raise self._setup_exception
116+
88117
def stop(self):
89118
"""Stop the event loop."""
90119
self._async_session.stop()

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@
2424
author_email='[email protected]',
2525
license='MIT',
2626
packages=['neovim', 'neovim.api', 'neovim.msgpack_rpc',
27-
'neovim.msgpack_rpc.event_loop', 'neovim.plugins'],
27+
'neovim.msgpack_rpc.event_loop', 'neovim.plugin'],
2828
install_requires=install_requires,
2929
zip_safe=False)

test/test_client_rpc.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77

88
@with_setup(setup=cleanup)
99
def test_call_and_reply():
10-
def notification_cb(name, args):
11-
eq(name, 'setup')
10+
def setup_cb():
1211
cmd = 'let g:result = rpcrequest(%d, "client-call", 1, 2, 3)' % cid
1312
vim.command(cmd)
1413
eq(vim.vars['result'], [4, 5, 6])
@@ -19,33 +18,27 @@ def request_cb(name, args):
1918
eq(args, [1, 2, 3])
2019
return [4, 5, 6]
2120

22-
vim.session.post('setup')
23-
vim.session.run(request_cb, notification_cb)
21+
vim.session.run(request_cb, None, setup_cb)
2422

2523

2624
@with_setup(setup=cleanup)
2725
def test_call_api_before_reply():
28-
def notification_cb(name, args):
29-
eq(name, 'setup2')
26+
def setup_cb():
3027
cmd = 'let g:result = rpcrequest(%d, "client-call2", 1, 2, 3)' % cid
3128
vim.command(cmd)
3229
eq(vim.vars['result'], [7, 8, 9])
3330
vim.session.stop()
3431

3532
def request_cb(name, args):
36-
eq(name, 'client-call2')
37-
eq(args, [1, 2, 3])
3833
vim.command('let g:result2 = [7, 8, 9]')
3934
return vim.vars['result2']
4035

41-
vim.session.post('setup2')
42-
vim.session.run(request_cb, notification_cb)
36+
vim.session.run(request_cb, None, setup_cb)
4337

4438

4539
@with_setup(setup=cleanup)
4640
def test_recursion():
47-
def notification_cb(name, args):
48-
eq(name, 'setup3')
41+
def setup_cb():
4942
vim.vars['result1'] = 0
5043
vim.vars['result2'] = 0
5144
vim.vars['result3'] = 0
@@ -71,5 +64,4 @@ def request_cb(name, args):
7164
vim.command(cmd)
7265
return n
7366

74-
vim.session.post('setup3')
75-
vim.session.run(request_cb, notification_cb)
67+
vim.session.run(request_cb, None, setup_cb)

test/test_concurrency.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,11 @@
1-
from time import sleep
2-
from random import random
31
from nose.tools import with_setup, eq_ as eq
42
from common import vim, cleanup
5-
from threading import Thread, Timer
6-
7-
8-
@with_setup(setup=cleanup)
9-
def test_custom_messages():
10-
def produce(i):
11-
vim.session.post('custom-message', None)
12-
sleep(0.05 * random())
13-
14-
count = 50
15-
for i in range(count):
16-
t = Thread(target=produce, args=(i,))
17-
t.daemon = True
18-
t.start()
19-
20-
custom_messages = []
21-
while len(custom_messages) < 50:
22-
custom_messages.append(vim.session.next_message())
23-
24-
eq(len(custom_messages), 50)
3+
from threading import Timer
254

265

276
@with_setup(setup=cleanup)
287
def test_interrupt_from_another_thread():
29-
timer = Timer(0.5, lambda: vim.session.post('timeout'))
8+
session = vim.session
9+
timer = Timer(0.5, lambda: session.threadsafe_call(lambda: session.stop()))
3010
timer.start()
31-
msg = vim.session.next_message()
32-
eq(msg[0], 'notification')
33-
eq(msg[1], 'timeout')
11+
eq(vim.session.next_message(), None)

0 commit comments

Comments
 (0)