Skip to content

Commit 55e430f

Browse files
committed
Refactor the shutdown logic to be threadsafe
Occasionally CI would hang during the `run_kernel()` test. This is most likely because Julia will sometimes hang if there are tasks still running upon exit, and `shutdown_handler()` previously did not bother closing all the eventloop tasks before calling `exit()`. Now `shutdown_handler()` will begin the shutdown process and allow `run_kernel()` to close the kernel properly.
1 parent 365650e commit 55e430f

File tree

6 files changed

+53
-67
lines changed

6 files changed

+53
-67
lines changed

src/IJulia.jl

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ REPL.REPLDisplay(repl::MiniREPL) = repl.display
120120
# from the front-end.
121121
comms::Dict{String, Comm} = Dict{String, Comm}()
122122

123-
shutdown::Function = exit
123+
shutdown::Function = start_shutdown
124124

125125
# the following constants need to be initialized in init().
126126
publish::RefValue{Socket} = Ref{Socket}()
127127
raw_input::RefValue{Socket} = Ref{Socket}()
128128
requests::RefValue{Socket} = Ref{Socket}()
129129
control::RefValue{Socket} = Ref{Socket}()
130130
heartbeat::RefValue{Socket} = Ref{Socket}()
131-
heartbeat_context::RefValue{Context} = Ref{Context}()
131+
zmq_context::RefValue{Context} = Ref{Context}()
132132
profile::Dict{String, Any} = Dict{String, Any}()
133133
connection_file::Union{String, Nothing} = nothing
134134
read_stdout::RefValue{Base.PipeEndpoint} = Ref{Base.PipeEndpoint}()
@@ -180,6 +180,24 @@ function Base.wait(kernel::Kernel)
180180
end
181181
end
182182

183+
function start_shutdown(kernel::Kernel)
184+
IJulia._shutting_down[] = true
185+
kernel.inited = false
186+
187+
# First we call zmq_ctx_shutdown() to close the context and stop all sockets
188+
# from working. We don't call ZMQ.close(::Context) directly because that
189+
# currently isn't threadsafe:
190+
# https://github.com/JuliaInterop/ZMQ.jl/issues/256
191+
ZMQ.lib.zmq_ctx_shutdown(kernel.zmq_context[])
192+
193+
# Wait for the heartbeat thread to stop
194+
@ccall uv_thread_join(kernel.heartbeat_threadid::Ptr{Int})::Cint
195+
196+
# Now all the sockets should have been cancelled and the eventloop tasks
197+
# should be ready to shutdown.
198+
notify(kernel.stop_event)
199+
end
200+
183201
function Base.close(kernel::Kernel)
184202
# Reset the IO streams first so that any later errors get printed
185203
if kernel.capture_stdout
@@ -205,17 +223,16 @@ function Base.close(kernel::Kernel)
205223
end
206224
popdisplay()
207225

226+
start_shutdown(kernel)
227+
wait(kernel)
228+
208229
# Close all sockets
209230
close(kernel.publish[])
210231
close(kernel.raw_input[])
211232
close(kernel.requests[])
212233
close(kernel.control[])
213-
stop_heartbeat(kernel)
214-
215-
# The waitloop should now be ready to exit
216-
kernel.inited = false
217-
notify(kernel.stop_event)
218-
wait(kernel)
234+
close(kernel.heartbeat[])
235+
close(kernel.zmq_context[])
219236

220237
# Reset global variables
221238
IJulia.n = 0

src/handlers.jl

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -181,26 +181,26 @@ function kernel_info_request(socket, kernel, msg)
181181
send_ipython(socket, kernel,
182182
msg_reply(msg, "kernel_info_reply",
183183
Dict("protocol_version" => "5.4",
184-
"implementation" => "ijulia",
185-
"implementation_version" => string(pkgversion(@__MODULE__)),
186-
"language_info" =>
187-
Dict("name" => "julia",
188-
"version" =>
184+
"implementation" => "ijulia",
185+
"implementation_version" => string(pkgversion(@__MODULE__)),
186+
"language_info" =>
187+
Dict("name" => "julia",
188+
"version" =>
189189
string(VERSION.major, '.',
190190
VERSION.minor, '.',
191191
VERSION.patch),
192-
"mimetype" => "application/julia",
193-
"file_extension" => ".jl"),
194-
"banner" => "Julia: A fresh approach to technical computing.",
195-
"help_links" => [
196-
Dict("text"=>"Julia Home Page",
197-
"url"=>"http://julialang.org/"),
198-
Dict("text"=>"Julia Documentation",
199-
"url"=>"http://docs.julialang.org/"),
200-
Dict("text"=>"Julia Packages",
201-
"url"=>"https://juliahub.com/ui/Packages")
202-
],
203-
"status" => "ok")))
192+
"mimetype" => "application/julia",
193+
"file_extension" => ".jl"),
194+
"banner" => "Julia: A fresh approach to technical computing.",
195+
"help_links" => [
196+
Dict("text"=>"Julia Home Page",
197+
"url"=>"http://julialang.org/"),
198+
Dict("text"=>"Julia Documentation",
199+
"url"=>"http://docs.julialang.org/"),
200+
Dict("text"=>"Julia Packages",
201+
"url"=>"https://juliahub.com/ui/Packages")
202+
],
203+
"status" => "ok")))
204204
end
205205

206206
"""
@@ -226,23 +226,11 @@ request](https://jupyter-client.readthedocs.io/en/latest/messaging.html#kernel-s
226226
sending the reply this will exit the process.
227227
"""
228228
function shutdown_request(socket, kernel, msg)
229-
# stop heartbeat thread
230-
stop_heartbeat(kernel)
231-
232-
# Shutdown the `requests` socket handler before sending any messages. This
233-
# is necessary because otherwise the event loop will be calling
234-
# `recv_ipython()` and holding a lock on `requests`, which will cause a
235-
# deadlock when we try to send a message to it from the `control` socket
236-
# handler.
237-
IJulia._shutting_down[] = true
238-
@async Base.throwto(kernel.requests_task[], InterruptException())
239-
240-
# In protocol 5.4 the shutdown reply moved to the control socket
241-
shutdown_socket = VersionNumber(msg) >= v"5.4" ? kernel.control[] : kernel.requests[]
242-
send_ipython(shutdown_socket, kernel,
229+
send_ipython(kernel.control[], kernel,
243230
msg_reply(msg, "shutdown_reply", msg.content))
244231
sleep(0.1) # short delay (like in ipykernel), to hopefully ensure shutdown_reply is sent
245-
kernel.shutdown()
232+
233+
kernel.shutdown(kernel)
246234

247235
nothing
248236
end

src/heartbeat.jl

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,3 @@ function start_heartbeat(kernel)
2525
ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Ptr{Cvoid}),
2626
kernel.heartbeat_threadid, heartbeat_c, heartbeat)
2727
end
28-
29-
function stop_heartbeat(kernel)
30-
if !isopen(kernel.heartbeat_context[])
31-
# Do nothing if it has already been stopped (which can happen in the tests)
32-
return
33-
end
34-
35-
# First we call zmq_ctx_shutdown() to ensure that the zmq_proxy() call
36-
# returns. We don't call ZMQ.close(::Context) directly because that
37-
# currently isn't threadsafe:
38-
# https://github.com/JuliaInterop/ZMQ.jl/issues/256
39-
ZMQ.lib.zmq_ctx_shutdown(kernel.heartbeat_context[])
40-
@ccall uv_thread_join(kernel.heartbeat_threadid::Ptr{Int})::Cint
41-
42-
# Now that the heartbeat thread has joined and its guaranteed to no longer
43-
# be working on the heartbeat socket, we can safely close it and then the
44-
# context.
45-
close(kernel.heartbeat[])
46-
close(kernel.heartbeat_context[])
47-
end

src/init.jl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ function init(args, kernel, profile=nothing)
108108
kernel.hmac_key = collect(UInt8, profile["key"])
109109
end
110110

111-
kernel.publish[] = Socket(PUB)
112-
kernel.raw_input[] = Socket(ROUTER)
113-
kernel.requests[] = Socket(ROUTER)
114-
kernel.control[] = Socket(ROUTER)
115-
kernel.heartbeat_context[] = Context()
116-
kernel.heartbeat[] = Socket(kernel.heartbeat_context[], ROUTER)
111+
kernel.zmq_context[] = Context()
112+
kernel.publish[] = Socket(kernel.zmq_context[], PUB)
113+
kernel.raw_input[] = Socket(kernel.zmq_context[], ROUTER)
114+
kernel.requests[] = Socket(kernel.zmq_context[], ROUTER)
115+
kernel.control[] = Socket(kernel.zmq_context[], ROUTER)
116+
kernel.heartbeat[] = Socket(kernel.zmq_context[], ROUTER)
117117
sep = profile["transport"]=="ipc" ? "-" : ":"
118118
bind(kernel.publish[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["iopub_port"])")
119119
bind(kernel.requests[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["shell_port"])")

src/kernel.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ function run_kernel()
2525
end
2626

2727
wait(IJulia._default_kernel::Kernel)
28+
close(IJulia._default_kernel::Kernel)
2829
end

test/kernel.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ end
217217
end
218218

219219
shutdown_called = false
220-
Kernel(profile; capture_stdout=false, capture_stderr=false, shutdown=() -> shutdown_called = true) do kernel
220+
Kernel(profile; capture_stdout=false, capture_stderr=false, shutdown=(_) -> shutdown_called = true) do kernel
221221
jupyter_client(profile) do client
222222
@testset "Comms" begin
223223
# Try opening a Comm without a target_name, which should

0 commit comments

Comments
 (0)