Skip to content

Commit 382b660

Browse files
Refactor eventloop to enable async comms
1 parent c954180 commit 382b660

File tree

4 files changed

+110
-55
lines changed

4 files changed

+110
-55
lines changed

src/IJulia.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ REPL.REPLDisplay(repl::MiniREPL) = repl.display
142142
waitloop_task::RefValue{Task} = Ref{Task}()
143143

144144
requests_task::RefValue{Task} = Ref{Task}()
145+
iopub_task::RefValue{Task} = Ref{Task}()
145146
watch_stdout_task::RefValue{Task} = Ref{Task}()
146147
watch_stderr_task::RefValue{Task} = Ref{Task}()
147148
watch_stdout_timer::RefValue{Timer} = Ref{Timer}()

src/eventloop.jl

Lines changed: 99 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,122 @@
11
"""
2-
eventloop(socket, kernel)
2+
eventloop(socket, kernel, msgs, handlers)
33
44
Generic event loop for one of the [kernel
55
sockets](https://jupyter-client.readthedocs.io/en/latest/messaging.html#introduction).
66
"""
7-
function eventloop(socket, kernel)
8-
task_local_storage(:IJulia_task, "write task")
9-
try
10-
while true
11-
local msg
7+
function eventloop(socket, kernel, msgs, handlers)
8+
while isopen(msgs)
9+
try
10+
while isopen(msgs)
11+
msg = take!(msgs) # can throw if `msgs` is closed while waiting on it
12+
try
13+
send_status("busy", kernel, msg)
14+
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, kernel, msg)
15+
catch e
16+
if e isa InterruptException && IJulia._shutting_down[]
17+
# If we're shutting down, just return immediately
18+
return
19+
elseif !isa(e, InterruptException)
20+
# Try to keep going if we get an exception, but
21+
# send the exception traceback to the front-ends.
22+
# (Ignore SIGINT since this may just be a user-requested
23+
# kernel interruption to interrupt long calculations.)
24+
content = error_content(e, msg="KERNEL EXCEPTION")
25+
map(s -> println(orig_stderr[], s), content["traceback"])
26+
send_ipython(kernel.publish[], kernel, msg_pub(kernel.execute_msg, "error", content))
27+
end
28+
finally
29+
flush_all()
30+
send_status("idle", kernel, msg)
31+
end
32+
yield()
33+
end
34+
catch e
35+
if IJulia._shutting_down[] || isa(e, ZMQ.StateError) || isa(e, InvalidStateException)
36+
# a ZMQ.StateError is almost certainly because of a closed socket
37+
# an InvalidStateException is because of a closed channel
38+
return
39+
elseif !isa(e, InterruptException)
40+
# the Jupyter manager may send us a SIGINT if the user
41+
# chooses to interrupt the kernel; don't crash for that
42+
rethrow()
43+
end
44+
end
45+
yield()
46+
end
47+
end
48+
49+
"""
50+
waitloop(kernel)
51+
52+
Main loop of a kernel. Runs the event loops for the control, shell, and iopub sockets
53+
(note: in IJulia the shell socket is called `requests`).
54+
"""
55+
function waitloop(kernel)
56+
control_msgs = Channel{Msg}(32) do ch
57+
task_local_storage(:IJulia_task, "control msgs receive task")
58+
while isopen(kernel.control[])
1259
try
13-
msg = recv_ipython(socket, kernel)
60+
msg::Msg = recv_ipython(kernel.control[], kernel)
61+
put!(ch, msg)
1462
catch e
15-
if isa(e, EOFError)
16-
# The socket was closed
63+
if IJulia._shutting_down[] || isa(e, EOFError)
64+
# an EOFError is because of a closed socket
1765
return
1866
else
1967
rethrow()
2068
end
2169
end
70+
yield()
71+
end
72+
end
2273

74+
iopub_msgs = Channel{Msg}(32)
75+
request_msgs = Channel{Msg}(32) do ch
76+
task_local_storage(:IJulia_task, "request msgs receive task")
77+
while isopen(kernel.requests[])
2378
try
24-
send_status("busy", kernel, msg)
25-
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, kernel, msg)
79+
msg::Msg = recv_ipython(kernel.requests[], kernel)
80+
if haskey(iopub_handlers, msg.header["msg_type"])
81+
put!(iopub_msgs, msg)
82+
else
83+
put!(ch, msg)
84+
end
2685
catch e
27-
if e isa InterruptException && IJulia._shutting_down[]
28-
# If we're shutting down, just return immediately
86+
if IJulia._shutting_down[] || isa(e, EOFError)
87+
close(iopub_msgs) # otherwise iopubs_msg would remain open, but with no producer anymore
88+
# an EOFError is because of a closed socket
2989
return
30-
elseif !isa(e, InterruptException)
31-
# Try to keep going if we get an exception, but
32-
# send the exception traceback to the front-ends.
33-
# (Ignore SIGINT since this may just be a user-requested
34-
# kernel interruption to interrupt long calculations.)
35-
content = error_content(e, msg="KERNEL EXCEPTION")
36-
map(s -> println(orig_stderr[], s), content["traceback"])
37-
send_ipython(kernel.publish[], kernel, msg_pub(kernel.execute_msg, "error", content))
90+
else
91+
rethrow()
3892
end
39-
finally
40-
flush_all()
41-
send_status("idle", kernel, msg)
4293
end
43-
end
44-
catch e
45-
if IJulia._shutting_down[]
46-
return
47-
end
48-
49-
# the Jupyter manager may send us a SIGINT if the user
50-
# chooses to interrupt the kernel; don't crash on this
51-
if isa(e, InterruptException)
52-
eventloop(socket, kernel)
53-
elseif isa(e, ZMQ.StateError)
54-
# This is almost certainly because of a closed socket
55-
return
56-
else
57-
rethrow()
94+
yield()
5895
end
5996
end
60-
end
6197

62-
"""
63-
waitloop(kernel)
98+
# tasks must all be on the same thread as the `waitloop` calling thread, because
99+
# `throwto` can't cross/change threads
100+
control_task = @async begin
101+
task_local_storage(:IJulia_task, "control handle/write task")
102+
eventloop(kernel.control[], kernel, control_msgs, handlers)
103+
end
104+
kernel.requests_task[] = @async begin
105+
task_local_storage(:IJulia_task, "requests handle/write task")
106+
eventloop(kernel.requests[], kernel, request_msgs, handlers)
107+
end
108+
kernel.iopub_task[] = @async begin
109+
task_local_storage(:IJulia_task, "iopub handle/write task")
110+
eventloop(kernel.requests[], kernel, iopub_msgs, iopub_handlers)
111+
end
64112

65-
Main loop of a kernel. Runs the event loops for the control and shell sockets
66-
(note: in IJulia the shell socket is called `requests`).
67-
"""
68-
function waitloop(kernel)
69-
control_task = @async eventloop(kernel.control[], kernel)
70-
kernel.requests_task[] = @async eventloop(kernel.requests[], kernel)
113+
# msg channels should close when tasks are terminated
114+
bind(control_msgs, control_task)
115+
bind(request_msgs, kernel.requests_task[])
116+
# unhandled errors in iopub_task should also kill the request_msgs channel (since we
117+
# currently don't restart a failed iopub task)
118+
bind(request_msgs, kernel.iopub_task[])
119+
bind(iopub_msgs, kernel.iopub_task[])
71120

72121
while kernel.inited
73122
try
@@ -76,12 +125,14 @@ function waitloop(kernel)
76125
# send interrupts (user SIGINT) to the code-execution task
77126
if isa(e, InterruptException)
78127
@async Base.throwto(kernel.requests_task[], e)
128+
@async Base.throwto(kernel.iopub_task[], e)
79129
else
80130
rethrow()
81131
end
82132
finally
83133
wait(control_task)
84134
wait(kernel.requests_task[])
135+
wait(kernel.iopub_task[])
85136
end
86137
end
87138
end

src/handlers.jl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,9 @@ function shutdown_request(socket, kernel, msg)
229229
# stop heartbeat thread
230230
stop_heartbeat(kernel)
231231

232-
# Shutdown the `requests` socket handler before sending any messages. This
233-
# is necessary because otherwise the event loop will be calling
234-
# `recv_ipython()` and holding a lock on `requests`, which will cause a
235-
# deadlock when we try to send a message to it from the `control` socket
236-
# handler.
237232
IJulia._shutting_down[] = true
238233
@async Base.throwto(kernel.requests_task[], InterruptException())
234+
@async Base.throwto(kernel.iopub_task[], InterruptException())
239235

240236
# In protocol 5.4 the shutdown reply moved to the control socket
241237
shutdown_socket = VersionNumber(msg) >= v"5.4" ? kernel.control[] : kernel.requests[]
@@ -445,6 +441,7 @@ will throw an `InterruptException` to the currently executing request handler.
445441
"""
446442
function interrupt_request(socket, kernel, msg)
447443
@async Base.throwto(kernel.requests_task[], InterruptException())
444+
@async Base.throwto(kernel.iopub_task[], InterruptException())
448445
send_ipython(socket, kernel, msg_reply(msg, "interrupt_reply", Dict()))
449446
end
450447

@@ -465,5 +462,11 @@ const handlers = Dict{String,Function}(
465462
"comm_open" => comm_open,
466463
"comm_info_request" => comm_info_request,
467464
"comm_msg" => comm_msg,
468-
"comm_close" => comm_close
465+
"comm_close" => comm_close,
466+
)
467+
468+
const iopub_handlers = Dict{String,Function}(
469+
"comm_open" => comm_open,
470+
"comm_msg" => comm_msg,
471+
"comm_close" => comm_close,
469472
)

src/init.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,5 +155,5 @@ function init(args, kernel, profile=nothing)
155155
kernel.In = Dict{Int, String}()
156156
kernel.Out = Dict{Int, Any}()
157157

158-
kernel.waitloop_task[] = @async waitloop(kernel)
158+
kernel.waitloop_task[] = Threads.@spawn :interactive waitloop(kernel)
159159
end

0 commit comments

Comments
 (0)