Skip to content

Comments

Poller fixes#266

Open
JamesWrigley wants to merge 2 commits intomasterfrom
poller
Open

Poller fixes#266
JamesWrigley wants to merge 2 commits intomasterfrom
poller

Conversation

@JamesWrigley
Copy link
Member

See commit messages for details.

i.e. if the socket isn't closed but is inoperable because the context was
shutdown.
The poller will notify the `FDWatcher`'s of sockets with a `WAKEUP` event to
ensure that the waiter tasks wake up when calling `wait(socket)`. These events
were previously incorrectly cleared by calling `notify(socket)` with the
expectation that the new flag would completely override the `FDWatcher` state,
but this is not the case. Now we explicitly clear the `WAKEUP` flag on the
internal `_FDWatcher` struct.

Also fixed `close(::Poller)` so that it will wakeup any waiting tasks,
previously it would hang if some tasks were executing `wait(socket)`.
@JamesWrigley JamesWrigley self-assigned this Feb 1, 2026
@codecov-commenter
Copy link

codecov-commenter commented Feb 1, 2026

Codecov Report

❌ Patch coverage is 75.00000% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 91.81%. Comparing base (04fcdc1) to head (7ab9405).

Files with missing lines Patch % Lines
src/socket.jl 42.85% 4 Missing ⚠️
src/poller.jl 92.30% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #266      +/-   ##
==========================================
- Coverage   92.50%   91.81%   -0.69%     
==========================================
  Files          11       11              
  Lines         560      574      +14     
==========================================
+ Hits          518      527       +9     
- Misses         42       47       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


if (event.events & WAKEUP) == WAKEUP
# If it was a dummy event from the poller then do nothing
continue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to handle (i.e. clear) the WAKEUP event immediately? The wakeup doesn't need to exist/persist except to wakeup from the wait/zmq_poll loop and then continue here (and does need to be cleared before the next wait). The later control flow should all still be correct whether or not the poller remains open or closed?

  • Poller open: poller.channel is still open (loop continues) and we wait at waiter_wait(poller.barrier) to rearm.
  • Poller closed: poller.channel is closed (loop exits)
Suggested change
pollfd = getfield(item.socket, :pollfd) # don't need to check if socket was closed because that would be caught by the socket error check just above
pollfd.watcher.events &= ~WAKEUP
continue

Copy link
Member Author

@JamesWrigley JamesWrigley Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a go but it was quite tricky because of how wait() and close() wakeup all the sockets regardless, and clearing it directly before the polling calls is a race condition. Clearing it in the same task that does the wakeups is the simplest way I think.

@halleysfifthinc
Copy link

So my suggestion was definitely wrong, but I think the current state of this PR might be treating a symptom and not the cause?

wait(::FDWatcher) always clears watched for events (line 644 of FileWatching.jl). Reproducer is:

using ZMQ
pull1 = ZMQ.Socket(ZMQ.PULL)
bind(pull1, "inproc://pull1")

notify(pull1, ZMQ.WAKEUP)

wait(pull1) # returns WAKEUP and clears the event in the fdwatcher
getfield(pull1, :pollfd).watcher.events == 0
wait(pull1) # would block because WAKEUP was cleared

So the presence of any leftover WAKEUP's means that some sockets are being notified/woken after they've already synchronized. I played around with this a bit and could not figure out the right logic to avoid the redundant WAKEUPs.

In any case, I noticed from the FileWatching source that we should make sure to grab the fdwatcher.notify lock before clearing events:

@lock pollfd.watcher.notify pollfd.watcher.events &= ~WAKEUP

@JamesWrigley
Copy link
Member Author

wait(::FDWatcher) always clears watched for events (line 644 of FileWatching.jl). Reproducer is:

using ZMQ
pull1 = ZMQ.Socket(ZMQ.PULL)
bind(pull1, "inproc://pull1")

notify(pull1, ZMQ.WAKEUP)

wait(pull1) # returns WAKEUP and clears the event in the fdwatcher
getfield(pull1, :pollfd).watcher.events == 0
wait(pull1) # would block because WAKEUP was cleared

I don't think that line quite explains the behaviour though: https://github.com/JuliaLang/julia/blob/5735163b340af5047919b31e3857bf8b866731c6/stdlib/FileWatching/src/FileWatching.jl#L644
It's only masking the flags that are being watched, so the effect should be that only UV_READABLE is masked. So I'm not sure how the wakeup flag is getting masked too in your MWE 🤔 Will need to grok the code.

So the presence of any leftover WAKEUP's means that some sockets are being notified/woken after they've already synchronized. I played around with this a bit and could not figure out the right logic to avoid the redundant WAKEUPs.

Yeah that's done explicitly by both wait() and close() to ensure that all of the waiters syncronize at the barrier, even if they were already woken up by a readable/writable event.

In any case, I noticed from the FileWatching source that we should make sure to grab the fdwatcher.notify lock before clearing events:

@lock pollfd.watcher.notify pollfd.watcher.events &= ~WAKEUP

I considered that but figured it wasn't necessary since an invariant is that at those points there will be no other task acting on the socket or its watcher. But I'll add it, can't hurt anyway 👍

@halleysfifthinc
Copy link

It's only masking the flags that are being watched, so the effect should be that only UV_READABLE is masked.

Don't forget that all sockets now also include the WAKEUP flag in their mask:

setfield!(socket, :pollfd, FDWatcher(fd(socket), FDEvent(UV_READABLE | WAKEUP)))

@JamesWrigley
Copy link
Member Author

Ah yes you're right, it's the wakeup event that's being caught rather than a readable event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants