-
Notifications
You must be signed in to change notification settings - Fork 7
add uring_context #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
b9c6010
643270c
ec4c068
2ec9cc5
017c56b
017838f
945dca7
9043802
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,321 @@ | ||||||||||
| // include/beman/net/detail/uring_context.hpp -*-C++-*- | ||||||||||
| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception | ||||||||||
|
|
||||||||||
| #ifndef INCLUDED_BEMAN_NET_DETAIL_URING_CONTEXT | ||||||||||
| #define INCLUDED_BEMAN_NET_DETAIL_URING_CONTEXT | ||||||||||
|
|
||||||||||
| #include <beman/net/detail/container.hpp> | ||||||||||
| #include <beman/net/detail/context_base.hpp> | ||||||||||
|
|
||||||||||
| #include <cassert> | ||||||||||
| #include <cstdint> | ||||||||||
| #include <system_error> | ||||||||||
| #include <tuple> | ||||||||||
| #include <liburing.h> | ||||||||||
|
|
||||||||||
| namespace beman::net::detail { | ||||||||||
|
|
||||||||||
| // io_context implementation based on liburing | ||||||||||
| struct uring_context final : context_base { | ||||||||||
| static constexpr unsigned QUEUE_DEPTH = 128; | ||||||||||
| ::io_uring ring; | ||||||||||
| container<native_handle_type> sockets; | ||||||||||
| task* tasks = nullptr; | ||||||||||
| ::std::size_t submitting = 0; // sqes not yet submitted | ||||||||||
| ::std::size_t outstanding = 0; // cqes expected | ||||||||||
|
|
||||||||||
| uring_context() { | ||||||||||
| int flags = 0; | ||||||||||
| int r = ::io_uring_queue_init(QUEUE_DEPTH, &ring, flags); | ||||||||||
| if (r < 0) { | ||||||||||
| throw ::std::system_error(-r, ::std::system_category(), "io_uring_queue_init failed"); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| ~uring_context() override { ::io_uring_queue_exit(&ring); } | ||||||||||
|
|
||||||||||
| auto make_socket(int fd) -> socket_id override { return sockets.insert(fd); } | ||||||||||
|
|
||||||||||
| auto make_socket(int d, int t, int p, ::std::error_code& error) -> socket_id override { | ||||||||||
| int fd(::socket(d, t, p)); | ||||||||||
| if (fd < 0) { | ||||||||||
| error = ::std::error_code(errno, ::std::system_category()); | ||||||||||
| return socket_id::invalid; | ||||||||||
| } | ||||||||||
| return make_socket(fd); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto release(socket_id id, ::std::error_code& error) -> void override { | ||||||||||
| const native_handle_type handle = sockets[id]; | ||||||||||
| sockets.erase(id); | ||||||||||
| if (::close(handle) < 0) { | ||||||||||
| error = ::std::error_code(errno, ::std::system_category()); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto native_handle(socket_id id) -> native_handle_type override { return sockets[id]; } | ||||||||||
|
|
||||||||||
| auto set_option(socket_id id, int level, int name, const void* data, ::socklen_t size, ::std::error_code& error) | ||||||||||
| -> void override { | ||||||||||
| if (::setsockopt(native_handle(id), level, name, data, size) < 0) { | ||||||||||
| error = ::std::error_code(errno, ::std::system_category()); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto bind(socket_id id, const endpoint& ep, ::std::error_code& error) -> void override { | ||||||||||
| if (::bind(native_handle(id), ep.data(), ep.size()) < 0) { | ||||||||||
| error = ::std::error_code(errno, ::std::system_category()); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto listen(socket_id id, int no, ::std::error_code& error) -> void override { | ||||||||||
| if (::listen(native_handle(id), no) < 0) { | ||||||||||
| error = ::std::error_code(errno, ::std::system_category()); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto submit() -> void { | ||||||||||
| int r = ::io_uring_submit(&ring); | ||||||||||
| if (r < 0) { | ||||||||||
| throw ::std::system_error(-r, ::std::system_category(), "io_uring_submit failed"); | ||||||||||
| } | ||||||||||
| assert(submitting >= r); | ||||||||||
| submitting -= r; | ||||||||||
| outstanding += r; | ||||||||||
|
||||||||||
| } | ||||||||||
|
|
||||||||||
| auto get_sqe(io_base* completion) -> ::io_uring_sqe* { | ||||||||||
| auto sqe = ::io_uring_get_sqe(&ring); | ||||||||||
| while (sqe == nullptr) { | ||||||||||
| // if the submission queue is full, flush and try again | ||||||||||
| submit(); | ||||||||||
| sqe = ::io_uring_get_sqe(&ring); | ||||||||||
| } | ||||||||||
|
Comment on lines
+87
to
+91
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This bit I tripped over the last time I tried to review the code thoroughly: this is, effectively, a busy wait for an indefinite amount of time. The assumption would be that it actually isn't hit (especially if the interface eventually gets enhanced to cope with sequences of results with just one submission) so it shouldn't be a problem (and I'm won't push back on this one). I was thinking about alternative approaches. One idea is to keep track of unsubmitted work. The Also, as the submission is communication between the program and the kernel, this loop shouldn't block indefinitely in any case. I may be entirely considering the wrong problem.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in a single-threaded application, i am assuming that if many threads are submitting work, then it's possible for enough
|
||||||||||
| ::io_uring_sqe_set_data(sqe, completion); | ||||||||||
| ++submitting; | ||||||||||
| return sqe; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto wait() -> ::std::tuple<int, io_base*> { | ||||||||||
| ::io_uring_cqe* cqe = nullptr; | ||||||||||
| int r = ::io_uring_wait_cqe(&ring, &cqe); | ||||||||||
| if (r < 0) { | ||||||||||
| throw ::std::system_error(-r, ::std::system_category(), "io_uring_wait_cqe failed"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| assert(outstanding > 0); | ||||||||||
| --outstanding; | ||||||||||
|
|
||||||||||
| const int res = cqe->res; | ||||||||||
| const auto completion = ::io_uring_cqe_get_data(cqe); | ||||||||||
| ::io_uring_cqe_seen(&ring, cqe); | ||||||||||
|
|
||||||||||
| return {res, static_cast<io_base*>(completion)}; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto run_one() -> ::std::size_t override { | ||||||||||
| if (auto count = process_task(); count) { | ||||||||||
| return count; | ||||||||||
| } | ||||||||||
|
Comment on lines
+116
to
+118
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This special treatment of tasks gives the tasks priority over outstanding I/O. I would consider scheduling the tasks as a
If using
If that is believed to give too much priority to I/O operations a variation alternating between preferring I/O or scheduled tasks could be used. Submitting
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if these "tasks" are related to scheduling, is it safe to assume that for example, consider 3 concurrent calls to it's probably worth prioritizing completions over what do you think about the following order?
|
||||||||||
|
|
||||||||||
| if (submitting) { | ||||||||||
| // if we have anything to submit, batch the submit and wait in a | ||||||||||
| // single system call. this allows io_uring_wait_cqe() below to be | ||||||||||
| // served directly from memory | ||||||||||
| unsigned wait_nr = 1; | ||||||||||
| int r = ::io_uring_submit_and_wait(&ring, wait_nr); | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#batching recommends the use of my understanding is that:
|
||||||||||
| if (r < 0) { | ||||||||||
| throw ::std::system_error(-r, ::std::system_category(), "io_uring_submit_and_wait failed"); | ||||||||||
| } | ||||||||||
| assert(submitting >= r); | ||||||||||
| submitting -= r; | ||||||||||
| outstanding += r; | ||||||||||
|
||||||||||
| } | ||||||||||
|
|
||||||||||
| if (!outstanding) { | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think testing Of course, if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
i did apply this change to before the change to since this breaks our implementation either way, maybe it's best to assert or throw on an unexpected 0-return? if we do end up finding cases where this happens, the reproducer may give us more information to guide the design |
||||||||||
| // nothing to submit and nothing to wait on, we're done | ||||||||||
| return 0; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // read the next completion, waiting if necessary | ||||||||||
| auto [res, completion] = wait(); | ||||||||||
|
|
||||||||||
| if (completion) { | ||||||||||
|
||||||||||
| // work() functions depend on res, so pass it in via 'extra' | ||||||||||
| completion->extra.reset(&res); | ||||||||||
| completion->work(*this, completion); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return 1; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| auto cancel(io_base* cancel_op, io_base* op) -> void override { | ||||||||||
| auto sqe = get_sqe(nullptr); | ||||||||||
| int flags = 0; | ||||||||||
| ::io_uring_prep_cancel(sqe, op, flags); | ||||||||||
|
|
||||||||||
| // use io_uring_prep_cancel() for asynchronous cancellation of op. | ||||||||||
| // cancel_op, aka sender_state::cancel_callback, lives inside of op's | ||||||||||
| // operation state. op's completion may race with this cancellation, | ||||||||||
| // causing that sender_state and its cancel_callback to be destroyed. | ||||||||||
| // so we can't pass cancel_op to io_uring_sqe_set_data() and attach a | ||||||||||
| // cancel_op->work() function to handle its completion in run_one(). | ||||||||||
| // instead, we just complete it here without waiting for the result | ||||||||||
| cancel_op->complete(); | ||||||||||
|
||||||||||
| auto complete() -> void override final { | |
| d_callback.reset(); | |
| if (0 == --this->d_outstanding) { | |
| this->d_data.set_value(*this, ::std::move(this->d_receiver)); |
the crash no longer reproduces if i move the d_callback.reset() line inside that if-block. does that look right?
If the logic I tried to outline above doesn't work as described, it is a bug which needs fixing (this wouldn't be part of this pull request: it would be a separate issue).
i added this change as an additional commit to this pr, but i'm happy to raise a separate pr too if you like. similar to the demo::task fix, it only applies to async cancellation which is specific to uring_context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks as if this code is stamped out a few times below (I haven't verified if the blocks are actually mostly identical). I think I would factor most of the logic into a function which may need to take two [lambda] functions (one wrapping the "prep" function and one producing the result) as arguments. Also, I probably wouldn't create variables out of the various op accesses but just pass them to the "prep" function. I do admit that it helps with readability spelling out what the various members are, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I probably wouldn't create variables out of the various op accesses but just pass them to the "prep" function. I do admit that it helps with readability spelling out what the various members are, though.
i do like to give names to function arguments. while it uses some extra vertical space, each of these functions still fits nicely on a single page in the editor 🤷
It looks as if this code is stamped out a few times below (I haven't verified if the blocks are actually mostly identical).
each of the completion lambdas differ slightly in how they map error codes and store return values, if any. each of the prepare blocks differ significantly in the io_uring_prep_* function and arguments
I think I would factor most of the logic into a function which may need to take two [lambda] functions (one wrapping the "prep" function and one producing the result) as arguments.
i don't really see the benefit of a helper function that combines the two, assuming it just looks something like this:
auto prepare_sqe(io_base* op, auto prepare, auto complete) -> submit_result {
op->work = std::move(complete);
return prepare(op);
}
Uh oh!
There was an error while loading. Please reload this page.