Skip to content

Commit 6e2a01b

Browse files
Refactor eventloop to enable async comms
1 parent fa415b8 commit 6e2a01b

File tree

2 files changed

+82
-31
lines changed

2 files changed

+82
-31
lines changed

src/eventloop.jl

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,95 @@
1-
function eventloop(socket)
2-
task_local_storage(:IJulia_task, "write task")
3-
try
4-
while true
5-
msg = recv_ipython(socket)
6-
try
7-
send_status("busy", msg)
8-
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, msg)
9-
catch e
10-
# Try to keep going if we get an exception, but
11-
# send the exception traceback to the front-ends.
12-
# (Ignore SIGINT since this may just be a user-requested
13-
# kernel interruption to interrupt long calculations.)
14-
if !isa(e, InterruptException)
15-
content = error_content(e, msg="KERNEL EXCEPTION")
16-
map(s -> println(orig_stderr[], s), content["traceback"])
17-
send_ipython(publish[], msg_pub(execute_msg, "error", content))
1+
function eventloop(socket::Socket, msgs::Channel, handlers)
2+
while true
3+
try
4+
while true
5+
msg = take!(msgs)
6+
try
7+
send_status("busy", msg)
8+
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, msg)
9+
catch e
10+
# Try to keep going if we get an exception, but
11+
# send the exception traceback to the front-ends.
12+
# (Ignore SIGINT since this may just be a user-requested
13+
# kernel interruption to interrupt long calculations.)
14+
if !isa(e, InterruptException)
15+
content = error_content(e, msg="KERNEL EXCEPTION")
16+
map(s -> println(orig_stderr[], s), content["traceback"])
17+
send_ipython(publish[], msg_pub(execute_msg, "error", content))
18+
else
19+
rethrow()
20+
end
21+
finally
22+
flush_all()
23+
send_status("idle", msg)
1824
end
19-
finally
20-
flush_all()
21-
send_status("idle", msg)
25+
yield()
26+
end
27+
catch e
28+
# the Jupyter manager may send us a SIGINT if the user
29+
# chooses to interrupt the kernel; don't crash on this
30+
if !isa(e, InterruptException)
31+
rethrow()
2232
end
2333
end
24-
catch e
25-
# the Jupyter manager may send us a SIGINT if the user
26-
# chooses to interrupt the kernel; don't crash on this
27-
if isa(e, InterruptException)
28-
eventloop(socket)
29-
else
30-
rethrow()
31-
end
34+
yield()
3235
end
3336
end
3437

38+
const iopub_task = Ref{Task}()
3539
const requests_task = Ref{Task}()
3640
function waitloop()
37-
@async eventloop(control[])
38-
requests_task[] = @async eventloop(requests[])
41+
control_msgs = Channel{Msg}(32) do ch
42+
task_local_storage(:IJulia_task, "control_msgs task")
43+
while isopen(control[])
44+
msg::Msg = recv_ipython(control[])
45+
put!(ch, msg)
46+
yield()
47+
end
48+
end
49+
50+
iopub_msgs = Channel{Msg}(32)
51+
request_msgs = Channel{Msg}(32) do ch
52+
task_local_storage(:IJulia_task, "request_msgs task")
53+
while isopen(requests[])
54+
msg::Msg = recv_ipython(requests[])
55+
if haskey(iopub_handlers, msg.header["msg_type"])
56+
put!(iopub_msgs, msg)
57+
else
58+
put!(ch, msg)
59+
end
60+
yield()
61+
end
62+
end
63+
64+
control_task = @async begin
65+
task_local_storage(:IJulia_task, "control handle/write task")
66+
eventloop(control[], control_msgs, handlers)
67+
end
68+
requests_task[] = @async begin
69+
task_local_storage(:IJulia_task, "requests handle/write task")
70+
eventloop(requests[], request_msgs, handlers)
71+
end
72+
iopub_task[] = @async begin
73+
task_local_storage(:IJulia_task, "iopub handle/write task")
74+
eventloop(requests[], iopub_msgs, iopub_handlers)
75+
end
76+
77+
bind(control_msgs, control_task)
78+
bind(request_msgs, requests_task[])
79+
bind(iopub_msgs, iopub_task[])
80+
3981
while true
4082
try
4183
wait()
4284
catch e
4385
# send interrupts (user SIGINT) to the code-execution task
4486
if isa(e, InterruptException)
87+
@async Base.throwto(iopub_task[], e)
4588
@async Base.throwto(requests_task[], e)
4689
else
4790
rethrow()
4891
end
4992
end
5093
end
5194
end
95+

src/handlers.jl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ end
271271

272272
function interrupt_request(socket, msg)
273273
@async Base.throwto(requests_task[], InterruptException())
274+
@async Base.throwto(iopub_task[], InterruptException())
274275
send_ipython(requests[], msg_reply(msg, "interrupt_reply", Dict()))
275276
end
276277

@@ -291,5 +292,11 @@ const handlers = Dict{String,Function}(
291292
"comm_open" => comm_open,
292293
"comm_info_request" => comm_info_request,
293294
"comm_msg" => comm_msg,
294-
"comm_close" => comm_close
295+
"comm_close" => comm_close,
296+
)
297+
298+
const iopub_handlers = Dict{String,Function}(
299+
"comm_open" => comm_open,
300+
"comm_msg" => comm_msg,
301+
"comm_close" => comm_close,
295302
)

0 commit comments

Comments
 (0)