make event dispatching non-blocking#6762
make event dispatching non-blocking#6762benjamin-stacks wants to merge 28 commits intostacks-network:developfrom
Conversation
| @@ -0,0 +1,322 @@ | |||
| use std::path::PathBuf; | |||
There was a problem hiding this comment.
Hey! Please add the copyright header to each source file. Thanks!
There was a problem hiding this comment.
Ah, good call, will do. Is "Stacks Open Internet Foundation" still the correct copyright holder?
Also, there's a whole bunch of files that lack that header, I'm wondering if there's a good way to automate this.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #6762 +/- ##
===========================================
+ Coverage 72.67% 76.86% +4.19%
===========================================
Files 411 419 +8
Lines 221663 223340 +1677
Branches 0 338 +338
===========================================
+ Hits 161086 171678 +10592
+ Misses 60577 51662 -8915
... and 253 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
98b3395 to
0d62bd4
Compare
This commit is the main implementation work for stacks-network#6543. It moves event dispatcher HTTP requests to a separate thread. That way, a slow event observer doesn't block the node from continuing its work. Only if your event observers are so slow that the node is continuously producing events faster than they can be delivered, will it eventually start blocking again, because the queue size for pending requests is bounded (at 1,000 right now, but I picked that number out of a hat, happy to change it if anyone has thoughts). Each new event payload is stored in the event observer DB, and its ID is then sent to the subthread, which will make the request and then delete the DB entry. That way, if a node is shut down while there are pending requests, they're in the DB ready to be retried after restart via `process_pending_payloads()` (which blocks until completion). So that's exactly as before (except that previously there couldn't have been more than one or two pending payloads).
This fixes [this integration test failure](https://github.com/stacks-network/stacks-core/actions/runs/20749024845/job/59577684952?pr=6762), caused by the fact that event delivery wasn't complete by the time the assertions were made.
Doing this work in the RunLoop implementations' startup code is *almost* the same thing, but not quite, since the nakamoto run loop might be started later (after an epoch 3 transition), at which point the event DB may already have new items from the current run of the application, which should *not* be touched by `process_pending_payloads`. This used to not be a problem, but now that that DB is used for the actual queue of the (concurrently running) EventDispatcherWorker, it has become one.
This is like 72437b2, but it works for all the tests instead of only the one. While only that one test very obviously failed, the issue exists for pretty much all of the integration tests, because they rely on the test_observer to capture all relevant data. Things are fast enough, and therefore we've only seen one blatant failure, but 1) it's going to be flaky (I can create a whole lot of test failures by adding a small artificial delay to event delivery), and 2) it might actually be *hiding* test failures (in some cases, like e.g. neon_integrations::deep_contract, we're asserting that certain things are *not* in the data, and if the data is incomplete to begin with, those assertions are moot).
478efa3 to
d5fa2fc
Compare
When switching runloops at the epoch 2/3 transition, this ensures that the same event dispatcher worker thread is handling delivery, which in turn ensures that all payloads are delivered in order
Thanks Hank for the tip!
Not sure why this wasn't caught in the pre-commit hook, I'd have assumed the checks are the same.
The logic is slightly tricky here, because the size of the queue (the max number of in-flight requests before we start blocking the thread) is implemented through the `bound` parameter of the `sync_channel`, but those two values aren't actually the same. See the comment at the top of `EventDispatcherWorker::new()` for details.
See the discussion in stacks-network#6543 for some background.
| /// to `true`, as no in-flight requests are allowed. | ||
| /// --- | ||
| /// @default: `1_000` | ||
| pub event_dispatcher_queue_size: usize, |
There was a problem hiding this comment.
This value ultimately ends up as the bound argument to sync_channel, which is of type usize.
I don't know if we have any concerns about having a platform-dependent number type on the configuration object -- if yes, we can also use something else here. Arguably, any reasonable values for this setting should fit into 16 bits anyway. If you need your queue to be bigger than 64k, you should reconsider you architecture.
|
|
||
| // we waited 500ms previously, so it should take on the order of 1.5s until | ||
| // the first request is complete | ||
| assert!( |
There was a problem hiding this comment.
So, this logic has the potential to be flaky since it's making assumptions about how long it takes processed_mined_nakamoto_block_event() will take. Is there a way to make this more robust?
There was a problem hiding this comment.
I thought about that, but I don't think there's a way to completely get around this theoretical possibility, short of #[cfg(test)] code that reports "I promise, I blocked!", which wouldn't be a true test of actual behavior.
The only way to assert that thread A blocked until the completion of thread B is to assert that B finished before A, but that will always make the assumption that waiting for B was the reason that A didn't finish earlier.
And conversely, the only way to assert that thread A was not blocked is to assert that it continues before B is finished, but there can't be a 100% guarantee that B isn't faster -- the CPU starvation theory might as well apply to thread A.
In addition, some such assertions would require a complex setup that need a third thread to coordinate, which would make the test harder to reason about and increase the chance of testing the wrong thing.
That is why I decided to rely on timing conditions that realistically could only be achieved by correct behavior. I also made sure to avoid false negatives by asserting that the measured duration are neither too short nor too long. And I picked durations that I felt are long enough to make the likelihood extremely small that other effects are causing the behavior -- hundreds and thousands of milliseconds seem like an eternity in CPU land (but I'm happy to increase that even more if you disagree).
Since these aren't end-to-end tests, there's no interplay between a bitcoin daemon, a signer, a chainstate coordinator, etc. This further reduces the chance for such flakes. Yes, process_mined_nakamoto_block_event() could theoretically take a long time, but realistically it's simple enough that that's unlikely.
| assert_eq!(start_count.load(Ordering::SeqCst), 2); | ||
| assert_eq!(end_count.load(Ordering::SeqCst), 1); | ||
|
|
||
| thread::sleep(Duration::from_secs(2)); |
There was a problem hiding this comment.
Same here -- this logic can easily flake out if the event observer thread is starved of CPU time for long enough. Can we use something other than wall-clock time?
| "dispatcher did not block while sending event" | ||
| ); | ||
|
|
||
| thread::sleep(Duration::from_millis(100)); |
There was a problem hiding this comment.
Flagging this as well, to try and use something more robust to verify that the event dispatcher is making progress correctly.
|
|
||
| debug!("Event Dispatcher Worker: doing payload {id}"); | ||
|
|
||
| // This will block forever if we were passed a non-existing ID. Don't do that. |
There was a problem hiding this comment.
Can the worker just abort in this case? Or better, can the worker send the process a SIGTERM to initiate a clean shutdown?
There was a problem hiding this comment.
Yeah that's a fair point.
This code needs to be retry-able because it involves I/O, but the behavior should be different between "failure because the records simply doesn't exist" and "failure because I/O timeout". Will change.
| // If the sending fails (i.e. the receiver has been dropped), that means a logic bug | ||
| // has been introduced to the code -- at time of writing, the main function is waiting | ||
| // for this message a few lines down, outside the thread closure. | ||
| // We log this, but we still start the loop. |
There was a problem hiding this comment.
I'm not sure I agree with this line of reasoning. If the worker thread cannot reliably communicate with the supervisor, then the worker should terminate as soon as possible. Otherwise, we'd make it impossible to shut down the Stacks node with a termination signal, since this thread may still be running in the background with no means of cancellation short of a SIGKILL.
There was a problem hiding this comment.
This is about the channel over which the worker thread reports to the main thread that it's ready, not about communication to the worker.
There was a problem hiding this comment.
Right -- if the worker can't reach the supervisor, then the worker should die and (ideally) the node would shut down since something is seriously amiss.
There was a problem hiding this comment.
Yeah, that's fair. I'll change this to panic instead of just logging. Our panic hook will then kill the whole node.
| let mut payload = conn.get_payload_with_retry(id); | ||
|
|
||
| // Deliberately not handling the error case of `duration_since()` -- if the `timestamp` | ||
| // is *after* `now` (which should be extremely rare), the most likely reason is a *slight* |
There was a problem hiding this comment.
This very failure mode has happened to us before, and has led to node crashes that warranted an emergency hotfix. Please gracefully handle the case where duration_since() fails, since time can go backwards due to NTP sync (as you mention).
There was a problem hiding this comment.
It is handled gracefully -- by simply doing nothing. This is only used for logging a warning if events are old (i.e. have been stuck in the queue). If the payload comes from the future (for all we know), it's definitely not late.
| // is *after* `now` (which should be extremely rare), the most likely reason is a *slight* | ||
| // adjustment to the the system clock (e.g. NTP sync) that happened between storing the | ||
| // entity and retrieving it, and that should be fine. | ||
| // If there was a *major* adjustment, all bets are off anyway. You shouldn't mess with your |
There was a problem hiding this comment.
No, we should be robust in the face of clock changes. Please use the system monotonic clock instead of the wall clock. We do elsewhere.
There was a problem hiding this comment.
Can you point me to an example of where we do that? I honestly don't even understand how it could be possible to do that. And all instances of serializing time stamps that I've seen in the code are either strings or unix time stamp integers, all based on wall clock time.
In my understanding, a monotonic clock can at most be expected to be reliable between operating system restarts. But a database file survives a restart, and could even theoretically be moved to a different machine (even a different platform!).
Now, what we could do is additionally pass an Instant to the worker from the calling thread if the event was in fact generated in the current execution of the application, and only fall back to the DB-stored timestamp if it's a retry across restarts.
However, if we want to log a warning if payloads are older than a certain threshold, we would still have to keep this code around that you're objecting to. And if we don't want to log that, I could just remove all the timestamp-related code here, since the logging is all we're using it for.
Thoughts?
| // Cap the backoff at 3x the timeout | ||
| let max_backoff = data.timeout.saturating_mul(3); | ||
|
|
||
| loop { |
There was a problem hiding this comment.
Could this be factored to use the with_retry() function above?
Also, I see that this code performs retry jitter, whereas with_retry() does not. Is there a reason for this?
There was a problem hiding this comment.
Could this be factored to use the
with_retry()function above?
Possibly, but as you noted, the two behave differently. That is why I didn't use with_retry for the HTTP request, because I didn't want to introduce unrelated functional changes or make with_retry() more complex to support a single use case.
As for the question why that difference is there, I could guess, but I don't know for sure because that code predates me by a long time.
The with_retry logic was added in #5358, I just moved it to a helper so I could reuse it for get_payload_with_retry().
The backoff jitter for the HTTP request retries was added in #5327.
Since both of those came from @brice-stacks, I guess he could add some color here, but in either case, I don't think a change to any of this needs to be in this PR.
stacks-node/src/event_dispatcher.rs
Outdated
| pub stackerdb_channel: Arc<Mutex<StackerDBChannel>>, | ||
| /// Path to the database where pending payloads are stored. | ||
| db_path: PathBuf, | ||
| /// The worker thread that performs the actuall HTTP requests so that they don't block |
stacks-node/src/event_dispatcher.rs
Outdated
| static ALL_WORKERS: Mutex<Vec<Weak<EventDispatcherWorker>>> = Mutex::new(Vec::new()); | ||
|
|
||
| #[cfg(test)] | ||
| pub fn catch_up_all_event_dispatchers() { |
There was a problem hiding this comment.
I'm not sure why this and ALL_WORKERS are necessary? Isn't it the case that an event dispatcher configured with a queue size of zero ought to synchronously deliver events? And, isn't that what all the existing tests expect the event dispatcher to do?
There was a problem hiding this comment.
Ooohh this is an interesting point. You're absolutely right, but there's some history and nuance here.
I originally implemented all this to always be non-blocking. This meant that the e2e tests had to explicitly wait for the dispatcher to catch up in order to be able to assert on certain events.
Then later, per your and Aaron's feedback in this thread, I added back the option to make it blocking and also made that the default behavior.
Indeed that means this catch_up code isn't actually necessary right now.
However, this also means that the asynchronous logic no longer gets any coverage from the integration tests. We could change the config for the integration tests to run with a positive queue size instead, in which case this catching up would be needed again.
The more I think about this though. the less I think this is necessary. Even with a queue size of zero, all the bits and pieces of the asynchronous implementation (DB persistence, channel message, worker thread) still get coverage. And for the tricky bits (like blocking on a full queue), I added unit tests.
Long story short, I tend to agree with you that this can be removed again, and will do that. But let me know if you have any additional thoughts based on this context.
jcnelson
left a comment
There was a problem hiding this comment.
This overall looks good, but I have a few concerns about the threadpool and the handling of clock skew (among other lesser things).
as [Jude points out](stacks-network#6762 (comment)), it's no longer necessary since the default behavior is still blocking
addresses #6543
Checklist
docs/rpc/openapi.yamlandrpc-endpoints.mdfor v2 endpoints,event-dispatcher.mdfor new events)New clarity functions have corresponding PR inclarity-benchmarkingrepo