Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/IJulia.jl
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@ REPL.REPLDisplay(repl::MiniREPL) = repl.display
connection_file::Union{String, Nothing} = nothing
read_stdout::RefValue{Base.PipeEndpoint} = Ref{Base.PipeEndpoint}()
read_stderr::RefValue{Base.PipeEndpoint} = Ref{Base.PipeEndpoint}()
socket_locks::Dict{Socket, ReentrantLock} = Dict{Socket, ReentrantLock}()
socket_send_locks::Dict{Socket, ReentrantLock} = Dict{Socket, ReentrantLock}()
socket_recv_locks::Dict{Socket, ReentrantLock} = Dict{Socket, ReentrantLock}()
sha_ctx::RefValue{SHA.SHA_CTX} = Ref{SHA.SHA_CTX}()
hmac_key::Vector{UInt8} = UInt8[]

stop_event::Base.Event = Base.Event()
waitloop_task::RefValue{Task} = Ref{Task}()

requests_task::RefValue{Task} = Ref{Task}()
iopub_task::RefValue{Task} = Ref{Task}()
watch_stdout_task::RefValue{Task} = Ref{Task}()
watch_stderr_task::RefValue{Task} = Ref{Task}()
watch_stdout_timer::RefValue{Timer} = Ref{Timer}()
Expand Down
147 changes: 99 additions & 48 deletions src/eventloop.jl
Original file line number Diff line number Diff line change
@@ -1,73 +1,122 @@
"""
eventloop(socket, kernel)
eventloop(socket, kernel, msgs, handlers)

Generic event loop for one of the [kernel
sockets](https://jupyter-client.readthedocs.io/en/latest/messaging.html#introduction).
"""
function eventloop(socket, kernel)
task_local_storage(:IJulia_task, "write task")
try
while true
local msg
function eventloop(socket, kernel, msgs, handlers)
while isopen(msgs)
try
while isopen(msgs)
msg = take!(msgs) # can throw if `msgs` is closed while waiting on it
try
send_status("busy", kernel, msg)
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, kernel, msg)
catch e
if e isa InterruptException && IJulia._shutting_down[]
# If we're shutting down, just return immediately
return
elseif !isa(e, InterruptException)
# Try to keep going if we get an exception, but
# send the exception traceback to the front-ends.
# (Ignore SIGINT since this may just be a user-requested
# kernel interruption to interrupt long calculations.)
content = error_content(e, msg="KERNEL EXCEPTION")
map(s -> println(orig_stderr[], s), content["traceback"])
send_ipython(kernel.publish[], kernel, msg_pub(kernel.execute_msg, "error", content))
end
finally
flush_all()
send_status("idle", kernel, msg)
end
yield()
end
catch e
if IJulia._shutting_down[] || isa(e, ZMQ.StateError) || isa(e, InvalidStateException)
# a ZMQ.StateError is almost certainly because of a closed socket
# an InvalidStateException is because of a closed channel
return
elseif !isa(e, InterruptException)
# the Jupyter manager may send us a SIGINT if the user
# chooses to interrupt the kernel; don't crash for that
rethrow()
end
end
yield()
end
end

"""
waitloop(kernel)

Main loop of a kernel. Runs the event loops for the control, shell, and iopub sockets
(note: in IJulia the shell socket is called `requests`).
"""
function waitloop(kernel)
control_msgs = Channel{Msg}(32) do ch
task_local_storage(:IJulia_task, "control msgs receive task")
while isopen(kernel.control[])
try
msg = recv_ipython(socket, kernel)
msg::Msg = recv_ipython(kernel.control[], kernel)
put!(ch, msg)
catch e
if isa(e, EOFError)
# The socket was closed
if IJulia._shutting_down[] || isa(e, EOFError)
# an EOFError is because of a closed socket
return
else
rethrow()
end
end
yield()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recollection is that the scattering of yields helped make message handling more even/fair when viewed using Tracy.

end
end

iopub_msgs = Channel{Msg}(32)
request_msgs = Channel{Msg}(32) do ch
task_local_storage(:IJulia_task, "request msgs receive task")
while isopen(kernel.requests[])
try
send_status("busy", kernel, msg)
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, kernel, msg)
msg::Msg = recv_ipython(kernel.requests[], kernel)
if haskey(iopub_handlers, msg.header["msg_type"])
put!(iopub_msgs, msg)
else
put!(ch, msg)
end
catch e
if e isa InterruptException && IJulia._shutting_down[]
# If we're shutting down, just return immediately
if IJulia._shutting_down[] || isa(e, EOFError)
close(iopub_msgs) # otherwise iopubs_msg would remain open, but with no producer anymore
# an EOFError is because of a closed socket
return
elseif !isa(e, InterruptException)
# Try to keep going if we get an exception, but
# send the exception traceback to the front-ends.
# (Ignore SIGINT since this may just be a user-requested
# kernel interruption to interrupt long calculations.)
content = error_content(e, msg="KERNEL EXCEPTION")
map(s -> println(orig_stderr[], s), content["traceback"])
send_ipython(kernel.publish[], kernel, msg_pub(kernel.execute_msg, "error", content))
else
rethrow()
end
finally
flush_all()
send_status("idle", kernel, msg)
end
end
catch e
if IJulia._shutting_down[]
return
end

# the Jupyter manager may send us a SIGINT if the user
# chooses to interrupt the kernel; don't crash on this
if isa(e, InterruptException)
eventloop(socket, kernel)
Comment on lines -51 to -52
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a reason for this to be recursive instead of the outer loop as I reorganized it too?

elseif isa(e, ZMQ.StateError)
# This is almost certainly because of a closed socket
return
else
rethrow()
yield()
end
end
end

"""
waitloop(kernel)
# tasks must all be on the same thread as the `waitloop` calling thread, because
# `throwto` can't cross/change threads
Comment on lines +98 to +99
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back into @async vs Threads.@spawn:

  1. @async isn't officially deprecated, it just is warned against use.
  2. More importantly, Base.throwto (and explicit task switches in general) cannot cross threads, so switching to @spawn breaks our interrupting. When I had originally looked into this, I couldn't figure out another reliable way to force interrupts on tasks on different threads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think 2. is true? e.g. on Julia 1.12:

julia> t = Threads.@spawn :interactive sleep(60)
Task (runnable, started) @0x00007fa4287822c0

julia> Threads.@spawn :default Base.throwto(t, InterruptException())
Task (runnable, started) @0x00007fa4255ff850

julia> t
Task (failed) @0x00007fa4287822c0
InterruptException:
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:1128
 [2] wait()
   @ Base ./task.jl:1200
 [3] wait(c::Base.GenericCondition{Base.Threads.SpinLock}; first::Bool)
   @ Base ./condition.jl:141
 [4] wait
   @ ./condition.jl:136 [inlined]
 [5] _trywait(t::Timer)
   @ Base ./asyncevent.jl:185
 [6] wait
   @ ./asyncevent.jl:202 [inlined]
 [7] sleep
   @ ./asyncevent.jl:312 [inlined]
 [8] (::var"#2#3")()
   @ Main ./REPL[1]:1

But in any case I'm kinda ok with @async here since people doing multithreaded stuff should be using @spawn anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that example only works because of task migration. The task.c can/will still throw an error if manually switching tasks on different threads. You can trigger that by changing t to an @async task (to make it sticky).

control_task = @async begin
task_local_storage(:IJulia_task, "control handle/write task")
eventloop(kernel.control[], kernel, control_msgs, handlers)
end
kernel.requests_task[] = @async begin
task_local_storage(:IJulia_task, "requests handle/write task")
eventloop(kernel.requests[], kernel, request_msgs, handlers)
end
kernel.iopub_task[] = @async begin
task_local_storage(:IJulia_task, "iopub handle/write task")
eventloop(kernel.requests[], kernel, iopub_msgs, iopub_handlers)
end

Main loop of a kernel. Runs the event loops for the control and shell sockets
(note: in IJulia the shell socket is called `requests`).
"""
function waitloop(kernel)
control_task = @async eventloop(kernel.control[], kernel)
kernel.requests_task[] = @async eventloop(kernel.requests[], kernel)
# msg channels should close when tasks are terminated
bind(control_msgs, control_task)
bind(request_msgs, kernel.requests_task[])
# unhandled errors in iopub_task should also kill the request_msgs channel (since we
# currently don't restart a failed iopub task)
bind(request_msgs, kernel.iopub_task[])
bind(iopub_msgs, kernel.iopub_task[])

while kernel.inited
try
Expand All @@ -76,12 +125,14 @@ function waitloop(kernel)
# send interrupts (user SIGINT) to the code-execution task
if isa(e, InterruptException)
@async Base.throwto(kernel.requests_task[], e)
@async Base.throwto(kernel.iopub_task[], e)
else
rethrow()
end
finally
wait(control_task)
wait(kernel.requests_task[])
wait(kernel.iopub_task[])
end
end
end
15 changes: 9 additions & 6 deletions src/handlers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,9 @@ function shutdown_request(socket, kernel, msg)
# stop heartbeat thread
stop_heartbeat(kernel)

# Shutdown the `requests` socket handler before sending any messages. This
# is necessary because otherwise the event loop will be calling
# `recv_ipython()` and holding a lock on `requests`, which will cause a
# deadlock when we try to send a message to it from the `control` socket
# handler.
IJulia._shutting_down[] = true
@async Base.throwto(kernel.requests_task[], InterruptException())
@async Base.throwto(kernel.iopub_task[], InterruptException())

# In protocol 5.4 the shutdown reply moved to the control socket
shutdown_socket = VersionNumber(msg) >= v"5.4" ? kernel.control[] : kernel.requests[]
Expand Down Expand Up @@ -445,6 +441,7 @@ will throw an `InterruptException` to the currently executing request handler.
"""
function interrupt_request(socket, kernel, msg)
@async Base.throwto(kernel.requests_task[], InterruptException())
@async Base.throwto(kernel.iopub_task[], InterruptException())
send_ipython(socket, kernel, msg_reply(msg, "interrupt_reply", Dict()))
end

Expand All @@ -465,5 +462,11 @@ const handlers = Dict{String,Function}(
"comm_open" => comm_open,
"comm_info_request" => comm_info_request,
"comm_msg" => comm_msg,
"comm_close" => comm_close
"comm_close" => comm_close,
)

const iopub_handlers = Dict{String,Function}(
"comm_open" => comm_open,
"comm_msg" => comm_msg,
"comm_close" => comm_close,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now wondering if the async handling should be expanded to most messages besides "execute_request"? In particular, "complete_request" and "inspect_request" are (should be?) side-effect free, and would be really convenient to be able to e.g. see the docs for a functions when writing a new cell while another cell is mid-execution.

)
5 changes: 3 additions & 2 deletions src/init.jl
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ function init(args, kernel, profile=nothing)
# associate a lock with each socket so that multi-part messages
# on a given socket don't get inter-mingled between tasks.
for s in (kernel.publish[], kernel.raw_input[], kernel.requests[], kernel.control[])
kernel.socket_locks[s] = ReentrantLock()
kernel.socket_send_locks[s] = ReentrantLock()
kernel.socket_recv_locks[s] = ReentrantLock()
Comment on lines +119 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My (now dated) recollection of reading the ZMQ docs and code is that (at least for ROUTER sockets) sending and receiving are independent, such that there's no need to prevent receiving while a (multi-part) message is being sent?

end

start_heartbeat(kernel)
Expand Down Expand Up @@ -154,5 +155,5 @@ function init(args, kernel, profile=nothing)
kernel.In = Dict{Int, String}()
kernel.Out = Dict{Int, Any}()

kernel.waitloop_task[] = @async waitloop(kernel)
kernel.waitloop_task[] = Threads.@spawn :interactive waitloop(kernel)
end
10 changes: 2 additions & 8 deletions src/msg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ end
Send a message `m`. This will lock `socket`.
"""
function send_ipython(socket::ZMQ.Socket, kernel::Kernel, m::Msg)
lock(kernel.socket_locks[socket])
try
@lock kernel.socket_send_locks[socket] begin
@vprintln("SENDING ", m)
for i in m.idents
send(socket, i, more=true)
Expand All @@ -56,8 +55,6 @@ function send_ipython(socket::ZMQ.Socket, kernel::Kernel, m::Msg)
send(socket, parent_header, more=true)
send(socket, metadata, more=true)
send(socket, content)
finally
unlock(kernel.socket_locks[socket])
end
end

Expand All @@ -67,8 +64,7 @@ end
Wait for and get a message. This will lock `socket`.
"""
function recv_ipython(socket::ZMQ.Socket, kernel::Kernel)
lock(kernel.socket_locks[socket])
try
@lock kernel.socket_recv_locks[socket] begin
idents = String[]
s = recv(socket, String)
@vprintln("got msg part $s")
Expand Down Expand Up @@ -99,8 +95,6 @@ function recv_ipython(socket::ZMQ.Socket, kernel::Kernel)
m = Msg(idents, JSON.parse(header), JSON.parse(content), JSON.parse(parent_header), JSON.parse(metadata))
@vprintln("RECEIVED $m")
return m
finally
unlock(kernel.socket_locks[socket])
end
end

Expand Down