Skip to content

Conversation

@halleysfifthinc
Copy link
Contributor

Motivation

All messages from the front-end/server are received and handled synchronously, including custom comm messages (comm_open, comm_msg, and comm_close). So, any currently executing cell blocks the IJulia kernel from receiving and handling any IOPub/comm messages. For example, in the following WebIO MWE, a JS function updates an "output" Observable, and the JS function is triggered by setting an ("input") observable:

using WebIO, Observables
s = Scope()
s["in"] = Observable{String}("")
s["out"] = Observable{String}("")
onjs(s["in"], js"""
function (val)
    _webIOScope.setObservableValue("out",val);
end""")

you can't observe a new s["out"] value (aka the result of the JS function) during execution of the same cell that set s["in"] (which triggers the JS function).

Example Julia function that fails (hangs) without async comms

function julia_js_julia(_in, out, str)
    ch = Channel{String}()
    obsf = on(out) do val
        put!(ch, val)
    end
    t = @async take!(ch)
    _in[] = str
    out = fetch(t)
    off(obsf)
        
    return out
end

*This example function isn't thread-safe. (The scp["in"] observable isn't locked, so concurrently setting it could lead to interleaved/mismatched updates to the scp["out"] observable.)

One example of an actual use-case/benefit is PlotlyJS.to_image, which uses the same
Julia => JS => Julia observable setup to retrieve the results of a plotly.js function call.
Currently, the PlotlyJS.to_image function soft-fails because the observable that holds the generated
image is only updated after the current cell finishes execution (when IJulia can process the
comm_msg from WebIO in the Jupyter frontend/client).

Testing

I've manually tested that the above WebIO MWE works with this PR, and that interrupting still works. I realize this is a fairly fundamental rearchitecturing of the message receiving/handling, but I'm not sure what else to test and/or if there is a good way to test any of this in CI. I'm open to any hints/pointers if you want more thorough testing/test cases.

Fixes #858.

P.S. Breadcrumb for the future: This new architecture has a lot of parallels (easily adapted) to the new subshells feature that was recently implemented in ipython/ipykernel#1249.

@halleysfifthinc halleysfifthinc changed the title WIP: Make message receive and handling async Make message receive and handling async Jan 15, 2025
@JamesWrigley
Copy link
Member

This sounds like a good idea, but it absolutely needs tests before merging. At some point I'll start writing tests for more of the internals which you should be able to modify for this PR, but feel free to have a go already if you have time :)

@halleysfifthinc
Copy link
Contributor Author

👍 I will wait until you've added more internals tests before I do anything further. I am/have been running IJulia with this PR to give any bugs the opportunity to surface.

@JamesWrigley
Copy link
Member

If you rebase this on master I think we can continue with it 🙂 Couple things:

  • We should use Threads.@spawn instead of @async.
  • We should run CI with multiple threads by default to try to catch any race conditions.

@halleysfifthinc
Copy link
Contributor Author

Will do! The use of @async was actually intentional. My goal was to keep IJulia specific activity on the interactive thread. We could potentially go even further and @spawn cell execution on non-interactive threads to more intentionally separate user and IJulia activity. That could theoretically be helpful in some situations, but this PR will already be a(nother) significant rearchitecture of a core part of IJulia. (And I can't think of a specific motivating example.)

@JamesWrigley
Copy link
Member

Keeping it on the interactive threads make sense, but for that we should use Threads.@spawn :interactive. @async has the unfortunate side-effect of pinning the parent task to the same thread so it's kinda discouraged now.

@halleysfifthinc
Copy link
Contributor Author

pinning the parent task to the same thread

Right.. Is that not equivalent to Threads.@spawn :interactive? The rest of the IJulia kernel is synchronous/not using tasks, so it will always be on the first (aka interactive) thread, and we want the rest of the IJulia activity to stay on that thread too, just allowed to be asynchronous/concurrent?

Happy to learn more if I'm wrong, this was my first serious foray into async/concurrent programming!

@JamesWrigley
Copy link
Member

That is technically true, but @async is still deprecated so I'd prefer we stick with Threads.@spawn and explicitly specifying the threadpool. One other advantage is that if there's multiple threads in the interactive threadpool then we can use all of them instead of one.

@halleysfifthinc halleysfifthinc marked this pull request as draft October 2, 2025 17:56
@codecov
Copy link

codecov bot commented Oct 2, 2025

Codecov Report

❌ Patch coverage is 80.59701% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.96%. Comparing base (ab44427) to head (382b660).

Files with missing lines Patch % Lines
src/eventloop.jl 80.00% 12 Missing ⚠️
src/handlers.jl 50.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1140      +/-   ##
==========================================
+ Coverage   68.65%   68.96%   +0.30%     
==========================================
  Files          16       16              
  Lines        1056     1089      +33     
==========================================
+ Hits          725      751      +26     
- Misses        331      338       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor Author

@halleysfifthinc halleysfifthinc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left some comments to explain some design decisions and/or about open questions I have.

I'm still unsure how to add tests for this, and I'd welcome any brainstorming.

Comment on lines -51 to -52
if isa(e, InterruptException)
eventloop(socket, kernel)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a reason for this to be recursive instead of the outer loop as I reorganized it too?

Comment on lines 72 to 86
@@ -76,12 +125,14 @@
# send interrupts (user SIGINT) to the code-execution task
if isa(e, InterruptException)
@async Base.throwto(kernel.requests_task[], e)
@async Base.throwto(kernel.iopub_task[], e)
else
rethrow()
end
finally
wait(control_task)
wait(kernel.requests_task[])
wait(kernel.iopub_task[])
end
end
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this needs to be in a while loop vs something like

Suggested change
try
waitall([control_task, kernel.requests_task[], kernel.iopub_task[]])
catch
# send interrupts (user SIGINT) to the code-execution task
if isa(e, InterruptException)
@async Base.throwto(kernel.requests_task[], e)
@async Base.throwto(kernel.iopub_task[], e)
else
rethrow()
end
finally
wait(kernel.close_event)
end

And maybe not even the finally clause? Basically, with the wait, this task shouldn't be scheduled again unless one of the message handling tasks fails, which we aren't trying to recover from. So if we do get back here, its because we want to/have to stop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, I was looking at this recently and thought the control flow was a bit strange 😅

rethrow()
end
end
yield()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recollection is that the scattering of yields helped make message handling more even/fair when viewed using Tracy.

const iopub_handlers = Dict{String,Function}(
"comm_open" => comm_open,
"comm_msg" => comm_msg,
"comm_close" => comm_close,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now wondering if the async handling should be expanded to most messages besides "execute_request"? In particular, "complete_request" and "inspect_request" are (should be?) side-effect free, and would be really convenient to be able to e.g. see the docs for a functions when writing a new cell while another cell is mid-execution.

Comment on lines +119 to +120
kernel.socket_send_locks[s] = ReentrantLock()
kernel.socket_recv_locks[s] = ReentrantLock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My (now dated) recollection of reading the ZMQ docs and code is that (at least for ROUTER sockets) sending and receiving are independent, such that there's no need to prevent receiving while a (multi-part) message is being sent?

Comment on lines +98 to +99
# tasks must all be on the same thread as the `waitloop` calling thread, because
# `throwto` can't cross/change threads
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back into @async vs Threads.@spawn:

  1. @async isn't officially deprecated, it just is warned against use.
  2. More importantly, Base.throwto (and explicit task switches in general) cannot cross threads, so switching to @spawn breaks our interrupting. When I had originally looked into this, I couldn't figure out another reliable way to force interrupts on tasks on different threads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think 2. is true? e.g. on Julia 1.12:

julia> t = Threads.@spawn :interactive sleep(60)
Task (runnable, started) @0x00007fa4287822c0

julia> Threads.@spawn :default Base.throwto(t, InterruptException())
Task (runnable, started) @0x00007fa4255ff850

julia> t
Task (failed) @0x00007fa4287822c0
InterruptException:
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:1128
 [2] wait()
   @ Base ./task.jl:1200
 [3] wait(c::Base.GenericCondition{Base.Threads.SpinLock}; first::Bool)
   @ Base ./condition.jl:141
 [4] wait
   @ ./condition.jl:136 [inlined]
 [5] _trywait(t::Timer)
   @ Base ./asyncevent.jl:185
 [6] wait
   @ ./asyncevent.jl:202 [inlined]
 [7] sleep
   @ ./asyncevent.jl:312 [inlined]
 [8] (::var"#2#3")()
   @ Main ./REPL[1]:1

But in any case I'm kinda ok with @async here since people doing multithreaded stuff should be using @spawn anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that example only works because of task migration. The task.c can/will still throw an error if manually switching tasks on different threads. You can trigger that by changing t to an @async task (to make it sticky).

@JamesWrigley
Copy link
Member

Sorry I missed this 🙈 I'll try to review it this week but feel free to ping me if I forget.

Copy link
Member

@JamesWrigley JamesWrigley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite convinced that what we're doing here is safe. If I understand correctly the reasoning is:

  1. ZMQ sockets are not thread-safe.
  2. Thus we use @async to ensure that all tasks are running on the same thread.
  3. Thus we can safely recv/send in different tasks as long as we lock appropriately to prevent one recv being interleaved with another recv (likewise for send)

But that's making the assumption that ZMQ.jl's recv and send don't do anything to the socket internally that may conflict with each other, and I don't think that's true. Imagine this sequence:

Now I'm pretty sure that neither send() or recv() will yield in those places so in practice this particular situation couldn't happen right now, but that's an implementation detail of ZMQ and certainly not something we can rely on. But I also can't think of a good alternative yet 🤔

Also, I fixed some lingering-task issues in #1190 which seems to have caused some merge conflicts, sorry about that 🙈

Comment on lines +98 to +99
# tasks must all be on the same thread as the `waitloop` calling thread, because
# `throwto` can't cross/change threads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think 2. is true? e.g. on Julia 1.12:

julia> t = Threads.@spawn :interactive sleep(60)
Task (runnable, started) @0x00007fa4287822c0

julia> Threads.@spawn :default Base.throwto(t, InterruptException())
Task (runnable, started) @0x00007fa4255ff850

julia> t
Task (failed) @0x00007fa4287822c0
InterruptException:
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:1128
 [2] wait()
   @ Base ./task.jl:1200
 [3] wait(c::Base.GenericCondition{Base.Threads.SpinLock}; first::Bool)
   @ Base ./condition.jl:141
 [4] wait
   @ ./condition.jl:136 [inlined]
 [5] _trywait(t::Timer)
   @ Base ./asyncevent.jl:185
 [6] wait
   @ ./asyncevent.jl:202 [inlined]
 [7] sleep
   @ ./asyncevent.jl:312 [inlined]
 [8] (::var"#2#3")()
   @ Main ./REPL[1]:1

But in any case I'm kinda ok with @async here since people doing multithreaded stuff should be using @spawn anyway.

Comment on lines 72 to 86
@@ -76,12 +125,14 @@
# send interrupts (user SIGINT) to the code-execution task
if isa(e, InterruptException)
@async Base.throwto(kernel.requests_task[], e)
@async Base.throwto(kernel.iopub_task[], e)
else
rethrow()
end
finally
wait(control_task)
wait(kernel.requests_task[])
wait(kernel.iopub_task[])
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, I was looking at this recently and thought the control flow was a bit strange 😅

@JamesWrigley
Copy link
Member

Hmm a nice design would be to use a poller that could poll the iopub socket and an internal inproc socket that we send messages to. But ZMQ doesn't have a poller yet... JuliaInterop/ZMQ.jl#52

@JamesWrigley
Copy link
Member

Using timeouts would also work, but myeh 🤷

@halleysfifthinc
Copy link
Contributor Author

  1. ZMQ sockets are not thread-safe.
  2. Thus we use @async to ensure that all tasks are running on the same thread.
  3. Thus we can safely recv/send in different tasks as long as we lock appropriately to prevent one recv being interleaved with another recv (likewise for send)

So the @async keeping things on the same thread is unrelated to the ZMQ sockets.

The actual motivating factor behind splitting the socket locks into read/write is because the read channel/task yields (waiting to read from the socket) while holding the lock. This caused a deadlock when another task tries to send, even though the socket is otherwise quiet (not actively receiving).

To avoid the split locks, we need a way to (in the receive channel/task) release the lock on a yielding wait (i.e. the socket doesn't have anything to read so the task yields). I couldn't figure out how to do that back when I first made this PR. I'll take another look to see if I can figure it out now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async comm stuff

2 participants