-
-
Notifications
You must be signed in to change notification settings - Fork 423
Make message receive and handling async #1140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
5e55f55 to
6e2a01b
Compare
|
This sounds like a good idea, but it absolutely needs tests before merging. At some point I'll start writing tests for more of the internals which you should be able to modify for this PR, but feel free to have a go already if you have time :) |
|
👍 I will wait until you've added more internals tests before I do anything further. I am/have been running IJulia with this PR to give any bugs the opportunity to surface. |
|
If you rebase this on master I think we can continue with it 🙂 Couple things:
|
|
Will do! The use of |
|
Keeping it on the interactive threads make sense, but for that we should use |
Right.. Is that not equivalent to Happy to learn more if I'm wrong, this was my first serious foray into async/concurrent programming! |
|
That is technically true, but |
6e2a01b to
382b660
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1140 +/- ##
==========================================
+ Coverage 68.65% 68.96% +0.30%
==========================================
Files 16 16
Lines 1056 1089 +33
==========================================
+ Hits 725 751 +26
- Misses 331 338 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
halleysfifthinc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some comments to explain some design decisions and/or about open questions I have.
I'm still unsure how to add tests for this, and I'd welcome any brainstorming.
| if isa(e, InterruptException) | ||
| eventloop(socket, kernel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of a reason for this to be recursive instead of the outer loop as I reorganized it too?
| @@ -76,12 +125,14 @@ | |||
| # send interrupts (user SIGINT) to the code-execution task | |||
| if isa(e, InterruptException) | |||
| @async Base.throwto(kernel.requests_task[], e) | |||
| @async Base.throwto(kernel.iopub_task[], e) | |||
| else | |||
| rethrow() | |||
| end | |||
| finally | |||
| wait(control_task) | |||
| wait(kernel.requests_task[]) | |||
| wait(kernel.iopub_task[]) | |||
| end | |||
| end | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that this needs to be in a while loop vs something like
| try | |
| waitall([control_task, kernel.requests_task[], kernel.iopub_task[]]) | |
| catch | |
| # send interrupts (user SIGINT) to the code-execution task | |
| if isa(e, InterruptException) | |
| @async Base.throwto(kernel.requests_task[], e) | |
| @async Base.throwto(kernel.iopub_task[], e) | |
| else | |
| rethrow() | |
| end | |
| finally | |
| wait(kernel.close_event) | |
| end |
And maybe not even the finally clause? Basically, with the wait, this task shouldn't be scheduled again unless one of the message handling tasks fails, which we aren't trying to recover from. So if we do get back here, its because we want to/have to stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, I was looking at this recently and thought the control flow was a bit strange 😅
| rethrow() | ||
| end | ||
| end | ||
| yield() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My recollection is that the scattering of yields helped make message handling more even/fair when viewed using Tracy.
| const iopub_handlers = Dict{String,Function}( | ||
| "comm_open" => comm_open, | ||
| "comm_msg" => comm_msg, | ||
| "comm_close" => comm_close, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am now wondering if the async handling should be expanded to most messages besides "execute_request"? In particular, "complete_request" and "inspect_request" are (should be?) side-effect free, and would be really convenient to be able to e.g. see the docs for a functions when writing a new cell while another cell is mid-execution.
| kernel.socket_send_locks[s] = ReentrantLock() | ||
| kernel.socket_recv_locks[s] = ReentrantLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My (now dated) recollection of reading the ZMQ docs and code is that (at least for ROUTER sockets) sending and receiving are independent, such that there's no need to prevent receiving while a (multi-part) message is being sent?
| # tasks must all be on the same thread as the `waitloop` calling thread, because | ||
| # `throwto` can't cross/change threads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking back into @async vs Threads.@spawn:
@asyncisn't officially deprecated, it just is warned against use.- More importantly,
Base.throwto(and explicit task switches in general) cannot cross threads, so switching to@spawnbreaks our interrupting. When I had originally looked into this, I couldn't figure out another reliable way to force interrupts on tasks on different threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think 2. is true? e.g. on Julia 1.12:
julia> t = Threads.@spawn :interactive sleep(60)
Task (runnable, started) @0x00007fa4287822c0
julia> Threads.@spawn :default Base.throwto(t, InterruptException())
Task (runnable, started) @0x00007fa4255ff850
julia> t
Task (failed) @0x00007fa4287822c0
InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:1128
[2] wait()
@ Base ./task.jl:1200
[3] wait(c::Base.GenericCondition{Base.Threads.SpinLock}; first::Bool)
@ Base ./condition.jl:141
[4] wait
@ ./condition.jl:136 [inlined]
[5] _trywait(t::Timer)
@ Base ./asyncevent.jl:185
[6] wait
@ ./asyncevent.jl:202 [inlined]
[7] sleep
@ ./asyncevent.jl:312 [inlined]
[8] (::var"#2#3")()
@ Main ./REPL[1]:1But in any case I'm kinda ok with @async here since people doing multithreaded stuff should be using @spawn anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that example only works because of task migration. The task.c can/will still throw an error if manually switching tasks on different threads. You can trigger that by changing t to an @async task (to make it sticky).
|
Sorry I missed this 🙈 I'll try to review it this week but feel free to ping me if I forget. |
JamesWrigley
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite convinced that what we're doing here is safe. If I understand correctly the reasoning is:
- ZMQ sockets are not thread-safe.
- Thus we use
@asyncto ensure that all tasks are running on the same thread. - Thus we can safely recv/send in different tasks as long as we lock appropriately to prevent one recv being interleaved with another recv (likewise for send)
But that's making the assumption that ZMQ.jl's recv and send don't do anything to the socket internally that may conflict with each other, and I don't think that's true. Imagine this sequence:
- Task 1 is sending and yields immediately after calling
zmq_msg_send(): https://github.com/JuliaInterop/ZMQ.jl/blob/1e1b458180311b19127937e8dd0befa79a93d54f/src/comm.jl#L8
Let's say thatzmq_msg_send()fails because we have to try again (EAGAIN). - Task 2 is receiving and yields immediately after calling
zmq_msg_recv(): https://github.com/JuliaInterop/ZMQ.jl/blob/1e1b458180311b19127937e8dd0befa79a93d54f/src/comm.jl#L80
Let's say it fails for some non-EAGAIN reason (maybe a corrupted message or something). This overwrites the internal error code fromzmq_msg_send(). - Control switches back to Task 1 which calls
zmq_errno(), which returns the error code from the call tozmq_msg_recv()and thus incorrectly fails instead of trying again.
Now I'm pretty sure that neither send() or recv() will yield in those places so in practice this particular situation couldn't happen right now, but that's an implementation detail of ZMQ and certainly not something we can rely on. But I also can't think of a good alternative yet 🤔
Also, I fixed some lingering-task issues in #1190 which seems to have caused some merge conflicts, sorry about that 🙈
| # tasks must all be on the same thread as the `waitloop` calling thread, because | ||
| # `throwto` can't cross/change threads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think 2. is true? e.g. on Julia 1.12:
julia> t = Threads.@spawn :interactive sleep(60)
Task (runnable, started) @0x00007fa4287822c0
julia> Threads.@spawn :default Base.throwto(t, InterruptException())
Task (runnable, started) @0x00007fa4255ff850
julia> t
Task (failed) @0x00007fa4287822c0
InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:1128
[2] wait()
@ Base ./task.jl:1200
[3] wait(c::Base.GenericCondition{Base.Threads.SpinLock}; first::Bool)
@ Base ./condition.jl:141
[4] wait
@ ./condition.jl:136 [inlined]
[5] _trywait(t::Timer)
@ Base ./asyncevent.jl:185
[6] wait
@ ./asyncevent.jl:202 [inlined]
[7] sleep
@ ./asyncevent.jl:312 [inlined]
[8] (::var"#2#3")()
@ Main ./REPL[1]:1But in any case I'm kinda ok with @async here since people doing multithreaded stuff should be using @spawn anyway.
| @@ -76,12 +125,14 @@ | |||
| # send interrupts (user SIGINT) to the code-execution task | |||
| if isa(e, InterruptException) | |||
| @async Base.throwto(kernel.requests_task[], e) | |||
| @async Base.throwto(kernel.iopub_task[], e) | |||
| else | |||
| rethrow() | |||
| end | |||
| finally | |||
| wait(control_task) | |||
| wait(kernel.requests_task[]) | |||
| wait(kernel.iopub_task[]) | |||
| end | |||
| end | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, I was looking at this recently and thought the control flow was a bit strange 😅
|
Hmm a nice design would be to use a poller that could poll the iopub socket and an internal |
|
Using timeouts would also work, but myeh 🤷 |
So the The actual motivating factor behind splitting the socket locks into read/write is because the read channel/task yields (waiting to read from the socket) while holding the lock. This caused a deadlock when another task tries to send, even though the socket is otherwise quiet (not actively receiving). To avoid the split locks, we need a way to (in the receive channel/task) release the lock on a yielding wait (i.e. the socket doesn't have anything to read so the task yields). I couldn't figure out how to do that back when I first made this PR. I'll take another look to see if I can figure it out now. |
Motivation
All messages from the front-end/server are received and handled synchronously, including custom comm messages (
comm_open,comm_msg, andcomm_close). So, any currently executing cell blocks the IJulia kernel from receiving and handling any IOPub/comm messages. For example, in the following WebIO MWE, a JS function updates an "output"Observable, and the JS function is triggered by setting an ("input") observable:you can't observe a new
s["out"]value (aka the result of the JS function) during execution of the same cell that sets["in"](which triggers the JS function).Example Julia function that fails (hangs) without async comms
*This example function isn't thread-safe. (The
scp["in"]observable isn't locked, so concurrently setting it could lead to interleaved/mismatched updates to thescp["out"]observable.)One example of an actual use-case/benefit is
PlotlyJS.to_image, which uses the sameJulia => JS => Julia observable setup to retrieve the results of a plotly.js function call.
Currently, the
PlotlyJS.to_imagefunction soft-fails because the observable that holds the generatedimage is only updated after the current cell finishes execution (when IJulia can process the
comm_msgfrom WebIO in the Jupyter frontend/client).Testing
I've manually tested that the above WebIO MWE works with this PR, and that interrupting still works. I realize this is a fairly fundamental rearchitecturing of the message receiving/handling, but I'm not sure what else to test and/or if there is a good way to test any of this in CI. I'm open to any hints/pointers if you want more thorough testing/test cases.
Fixes #858.
P.S. Breadcrumb for the future: This new architecture has a lot of parallels (easily adapted) to the new subshells feature that was recently implemented in ipython/ipykernel#1249.