diff --git a/src/IJulia.jl b/src/IJulia.jl index 5ae631bd..4d4bbce5 100644 --- a/src/IJulia.jl +++ b/src/IJulia.jl @@ -133,7 +133,8 @@ REPL.REPLDisplay(repl::MiniREPL) = repl.display connection_file::Union{String, Nothing} = nothing read_stdout::RefValue{Base.PipeEndpoint} = Ref{Base.PipeEndpoint}() read_stderr::RefValue{Base.PipeEndpoint} = Ref{Base.PipeEndpoint}() - socket_locks::Dict{Socket, ReentrantLock} = Dict{Socket, ReentrantLock}() + socket_send_locks::Dict{Socket, ReentrantLock} = Dict{Socket, ReentrantLock}() + socket_recv_locks::Dict{Socket, ReentrantLock} = Dict{Socket, ReentrantLock}() sha_ctx::RefValue{SHA.SHA_CTX} = Ref{SHA.SHA_CTX}() hmac_key::Vector{UInt8} = UInt8[] @@ -141,6 +142,7 @@ REPL.REPLDisplay(repl::MiniREPL) = repl.display waitloop_task::RefValue{Task} = Ref{Task}() requests_task::RefValue{Task} = Ref{Task}() + iopub_task::RefValue{Task} = Ref{Task}() watch_stdout_task::RefValue{Task} = Ref{Task}() watch_stderr_task::RefValue{Task} = Ref{Task}() watch_stdout_timer::RefValue{Timer} = Ref{Timer}() diff --git a/src/eventloop.jl b/src/eventloop.jl index 7faf9c83..41a83ab5 100644 --- a/src/eventloop.jl +++ b/src/eventloop.jl @@ -1,73 +1,122 @@ """ - eventloop(socket, kernel) + eventloop(socket, kernel, msgs, handlers) Generic event loop for one of the [kernel sockets](https://jupyter-client.readthedocs.io/en/latest/messaging.html#introduction). """ -function eventloop(socket, kernel) - task_local_storage(:IJulia_task, "write task") - try - while true - local msg +function eventloop(socket, kernel, msgs, handlers) + while isopen(msgs) + try + while isopen(msgs) + msg = take!(msgs) # can throw if `msgs` is closed while waiting on it + try + send_status("busy", kernel, msg) + invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, kernel, msg) + catch e + if e isa InterruptException && IJulia._shutting_down[] + # If we're shutting down, just return immediately + return + elseif !isa(e, InterruptException) + # Try to keep going if we get an exception, but + # send the exception traceback to the front-ends. + # (Ignore SIGINT since this may just be a user-requested + # kernel interruption to interrupt long calculations.) + content = error_content(e, msg="KERNEL EXCEPTION") + map(s -> println(orig_stderr[], s), content["traceback"]) + send_ipython(kernel.publish[], kernel, msg_pub(kernel.execute_msg, "error", content)) + end + finally + flush_all() + send_status("idle", kernel, msg) + end + yield() + end + catch e + if IJulia._shutting_down[] || isa(e, ZMQ.StateError) || isa(e, InvalidStateException) + # a ZMQ.StateError is almost certainly because of a closed socket + # an InvalidStateException is because of a closed channel + return + elseif !isa(e, InterruptException) + # the Jupyter manager may send us a SIGINT if the user + # chooses to interrupt the kernel; don't crash for that + rethrow() + end + end + yield() + end +end + +""" + waitloop(kernel) + +Main loop of a kernel. Runs the event loops for the control, shell, and iopub sockets +(note: in IJulia the shell socket is called `requests`). +""" +function waitloop(kernel) + control_msgs = Channel{Msg}(32) do ch + task_local_storage(:IJulia_task, "control msgs receive task") + while isopen(kernel.control[]) try - msg = recv_ipython(socket, kernel) + msg::Msg = recv_ipython(kernel.control[], kernel) + put!(ch, msg) catch e - if isa(e, EOFError) - # The socket was closed + if IJulia._shutting_down[] || isa(e, EOFError) + # an EOFError is because of a closed socket return else rethrow() end end + yield() + end + end + iopub_msgs = Channel{Msg}(32) + request_msgs = Channel{Msg}(32) do ch + task_local_storage(:IJulia_task, "request msgs receive task") + while isopen(kernel.requests[]) try - send_status("busy", kernel, msg) - invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, kernel, msg) + msg::Msg = recv_ipython(kernel.requests[], kernel) + if haskey(iopub_handlers, msg.header["msg_type"]) + put!(iopub_msgs, msg) + else + put!(ch, msg) + end catch e - if e isa InterruptException && IJulia._shutting_down[] - # If we're shutting down, just return immediately + if IJulia._shutting_down[] || isa(e, EOFError) + close(iopub_msgs) # otherwise iopubs_msg would remain open, but with no producer anymore + # an EOFError is because of a closed socket return - elseif !isa(e, InterruptException) - # Try to keep going if we get an exception, but - # send the exception traceback to the front-ends. - # (Ignore SIGINT since this may just be a user-requested - # kernel interruption to interrupt long calculations.) - content = error_content(e, msg="KERNEL EXCEPTION") - map(s -> println(orig_stderr[], s), content["traceback"]) - send_ipython(kernel.publish[], kernel, msg_pub(kernel.execute_msg, "error", content)) + else + rethrow() end - finally - flush_all() - send_status("idle", kernel, msg) end - end - catch e - if IJulia._shutting_down[] - return - end - - # the Jupyter manager may send us a SIGINT if the user - # chooses to interrupt the kernel; don't crash on this - if isa(e, InterruptException) - eventloop(socket, kernel) - elseif isa(e, ZMQ.StateError) - # This is almost certainly because of a closed socket - return - else - rethrow() + yield() end end -end -""" - waitloop(kernel) + # tasks must all be on the same thread as the `waitloop` calling thread, because + # `throwto` can't cross/change threads + control_task = @async begin + task_local_storage(:IJulia_task, "control handle/write task") + eventloop(kernel.control[], kernel, control_msgs, handlers) + end + kernel.requests_task[] = @async begin + task_local_storage(:IJulia_task, "requests handle/write task") + eventloop(kernel.requests[], kernel, request_msgs, handlers) + end + kernel.iopub_task[] = @async begin + task_local_storage(:IJulia_task, "iopub handle/write task") + eventloop(kernel.requests[], kernel, iopub_msgs, iopub_handlers) + end -Main loop of a kernel. Runs the event loops for the control and shell sockets -(note: in IJulia the shell socket is called `requests`). -""" -function waitloop(kernel) - control_task = @async eventloop(kernel.control[], kernel) - kernel.requests_task[] = @async eventloop(kernel.requests[], kernel) + # msg channels should close when tasks are terminated + bind(control_msgs, control_task) + bind(request_msgs, kernel.requests_task[]) + # unhandled errors in iopub_task should also kill the request_msgs channel (since we + # currently don't restart a failed iopub task) + bind(request_msgs, kernel.iopub_task[]) + bind(iopub_msgs, kernel.iopub_task[]) while kernel.inited try @@ -76,12 +125,14 @@ function waitloop(kernel) # send interrupts (user SIGINT) to the code-execution task if isa(e, InterruptException) @async Base.throwto(kernel.requests_task[], e) + @async Base.throwto(kernel.iopub_task[], e) else rethrow() end finally wait(control_task) wait(kernel.requests_task[]) + wait(kernel.iopub_task[]) end end end diff --git a/src/handlers.jl b/src/handlers.jl index 040ebd60..73c7b87f 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -229,13 +229,9 @@ function shutdown_request(socket, kernel, msg) # stop heartbeat thread stop_heartbeat(kernel) - # Shutdown the `requests` socket handler before sending any messages. This - # is necessary because otherwise the event loop will be calling - # `recv_ipython()` and holding a lock on `requests`, which will cause a - # deadlock when we try to send a message to it from the `control` socket - # handler. IJulia._shutting_down[] = true @async Base.throwto(kernel.requests_task[], InterruptException()) + @async Base.throwto(kernel.iopub_task[], InterruptException()) # In protocol 5.4 the shutdown reply moved to the control socket shutdown_socket = VersionNumber(msg) >= v"5.4" ? kernel.control[] : kernel.requests[] @@ -445,6 +441,7 @@ will throw an `InterruptException` to the currently executing request handler. """ function interrupt_request(socket, kernel, msg) @async Base.throwto(kernel.requests_task[], InterruptException()) + @async Base.throwto(kernel.iopub_task[], InterruptException()) send_ipython(socket, kernel, msg_reply(msg, "interrupt_reply", Dict())) end @@ -465,5 +462,11 @@ const handlers = Dict{String,Function}( "comm_open" => comm_open, "comm_info_request" => comm_info_request, "comm_msg" => comm_msg, - "comm_close" => comm_close + "comm_close" => comm_close, +) + +const iopub_handlers = Dict{String,Function}( + "comm_open" => comm_open, + "comm_msg" => comm_msg, + "comm_close" => comm_close, ) diff --git a/src/init.jl b/src/init.jl index 624fea21..d05c86cc 100644 --- a/src/init.jl +++ b/src/init.jl @@ -116,7 +116,8 @@ function init(args, kernel, profile=nothing) # associate a lock with each socket so that multi-part messages # on a given socket don't get inter-mingled between tasks. for s in (kernel.publish[], kernel.raw_input[], kernel.requests[], kernel.control[]) - kernel.socket_locks[s] = ReentrantLock() + kernel.socket_send_locks[s] = ReentrantLock() + kernel.socket_recv_locks[s] = ReentrantLock() end start_heartbeat(kernel) @@ -154,5 +155,5 @@ function init(args, kernel, profile=nothing) kernel.In = Dict{Int, String}() kernel.Out = Dict{Int, Any}() - kernel.waitloop_task[] = @async waitloop(kernel) + kernel.waitloop_task[] = Threads.@spawn :interactive waitloop(kernel) end diff --git a/src/msg.jl b/src/msg.jl index 6da64703..883629f5 100644 --- a/src/msg.jl +++ b/src/msg.jl @@ -40,8 +40,7 @@ end Send a message `m`. This will lock `socket`. """ function send_ipython(socket::ZMQ.Socket, kernel::Kernel, m::Msg) - lock(kernel.socket_locks[socket]) - try + @lock kernel.socket_send_locks[socket] begin @vprintln("SENDING ", m) for i in m.idents send(socket, i, more=true) @@ -56,8 +55,6 @@ function send_ipython(socket::ZMQ.Socket, kernel::Kernel, m::Msg) send(socket, parent_header, more=true) send(socket, metadata, more=true) send(socket, content) - finally - unlock(kernel.socket_locks[socket]) end end @@ -67,8 +64,7 @@ end Wait for and get a message. This will lock `socket`. """ function recv_ipython(socket::ZMQ.Socket, kernel::Kernel) - lock(kernel.socket_locks[socket]) - try + @lock kernel.socket_recv_locks[socket] begin idents = String[] s = recv(socket, String) @vprintln("got msg part $s") @@ -99,8 +95,6 @@ function recv_ipython(socket::ZMQ.Socket, kernel::Kernel) m = Msg(idents, JSON.parse(header), JSON.parse(content), JSON.parse(parent_header), JSON.parse(metadata)) @vprintln("RECEIVED $m") return m - finally - unlock(kernel.socket_locks[socket]) end end