@@ -6521,6 +6521,10 @@ BENCHMARK_CAPTURE(rpc_uring55, public, networking_route_t::public_k, 256 /* mess
65216521/* *
65226522 * @brief A minimal RPC @b server using @b `io_uring` functionality
65236523 * to setup the UDP socket, and process many requests concurrently.
6524+ *
6525+ * Unlike the `rpc_uring55_server`, this version:
6526+ * - registers buffers and off-loads buffer selection to the kernel
6527+ * - reduces the number of receive operations, using multi-shot receive
65246528 */
65256529class rpc_uring60_server {
65266530
@@ -6550,7 +6554,7 @@ class rpc_uring60_server {
65506554
65516555 // Initialize `io_uring` with one slot for each receive/send operation
65526556 // TODO: |= IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_SUBMIT_ALL
6553- auto io_uring_setup_flags = IORING_SETUP_SQPOLL ;
6557+ auto io_uring_setup_flags = 0 ;
65546558 if (io_uring_queue_init (max_concurrency * 2 , &ring_, io_uring_setup_flags) < 0 )
65556559 raise_system_error (" Failed to initialize io_uring 6.0 server" );
65566560 if (io_uring_register_files (&ring_, &socket_descriptor_, 1 ) < 0 )
@@ -6586,18 +6590,16 @@ class rpc_uring60_server {
65866590 void stop () noexcept { should_stop_.store (true , std::memory_order_seq_cst); }
65876591
65886592 void operator ()() noexcept {
6589- // Submit initial receive operations
6590- for (message_t &message : messages_) {
6591- message.status = status_t ::receiving_k;
6592- memset (&message.peer_address , 0 , sizeof (sockaddr_in));
6593+ // Submit the initial receive operation
6594+ {
6595+ message_t &message = *messages_.begin ();
65936596 struct io_uring_sqe *receive_entry = io_uring_get_sqe (&ring_);
6594- // TODO: Switch to multishot:
6595- // io_uring_prep_recvmsg_multishot( receive_entry, socket_descriptor_, &message.header,
6596- // IOSQE_BUFFER_SELECT | IOSQE_FIXED_FILE) ;
6597- io_uring_prep_recvmsg ( receive_entry, socket_descriptor_, &message. header , 0 ) ;
6597+ io_uring_prep_recvmsg_multishot (receive_entry, socket_descriptor_, &message. header , MSG_TRUNC);
6598+ receive_entry-> flags |= IOSQE_FIXED_FILE;
6599+ receive_entry-> flags |= IOSQE_BUFFER_SELECT ;
6600+ receive_entry-> buf_group = 0 ;
65986601 io_uring_sqe_set_data (receive_entry, &message);
65996602 }
6600-
66016603 io_uring_submit (&ring_);
66026604
66036605 while (!should_stop_.load (std::memory_order_seq_cst)) {
@@ -6613,6 +6615,7 @@ class rpc_uring60_server {
66136615 struct io_uring_sqe *send_entry = io_uring_get_sqe (&ring_);
66146616 message.status = status_t ::sending_k;
66156617 io_uring_prep_sendmsg_zc (send_entry, socket_descriptor_, &message.header , 0 );
6618+ send_entry->flags |= IOSQE_FIXED_FILE;
66166619 io_uring_sqe_set_data (send_entry, &message);
66176620 }
66186621
@@ -6622,6 +6625,7 @@ class rpc_uring60_server {
66226625 message.status = status_t ::receiving_k;
66236626 memset (&message.peer_address , 0 , sizeof (sockaddr_in));
66246627 io_uring_prep_recvmsg (receive_entry, socket_descriptor_, &message.header , 0 );
6628+ receive_entry->flags |= IOSQE_FIXED_FILE;
66256629 io_uring_sqe_set_data (receive_entry, &message);
66266630 }
66276631
@@ -6668,7 +6672,7 @@ class rpc_uring60_client {
66686672 // Initialize io_uring with one slot for each send/receive/timeout operation,
66696673 // as well as a batch-level timeout operation and a cancel operation for the
66706674 // batch-level timeout.
6671- auto io_uring_setup_flags = IORING_SETUP_SQPOLL ;
6675+ auto io_uring_setup_flags = 0 ;
66726676 if (io_uring_queue_init (concurrency * 3 + 1 + 1 , &ring_, io_uring_setup_flags) < 0 )
66736677 raise_system_error (" Failed to initialize io_uring 6.0 client" );
66746678 if (io_uring_register_files (&ring_, &socket_descriptor_, 1 ) < 0 )
@@ -6832,86 +6836,105 @@ BENCHMARK_CAPTURE(rpc_uring60, public, networking_route_t::public_k, 256 /* mess
68326836
68336837class rpc_asio_server {
68346838
6835- asio::io_context & context_;
6839+ asio::io_context context_;
68366840 asio::ip::udp::socket socket_;
6841+ std::thread context_thread_;
68376842
68386843 // / @brief Buffers, one per concurrent request
68396844 std::vector<rpc_buffer_t > buffers_;
68406845 // / @brief Where did the packets come from
68416846 std::vector<asio::ip::udp::endpoint> clients_;
6847+ // / @brief Which buffers are available?
6848+ std::vector<std::size_t > buffers_available_;
68426849 // / @brief Flag to stop the server without corrupting the state
68436850 std::atomic_bool should_stop_;
6844- // / @brief Maximum time for this entire batch
6845- std::chrono::microseconds max_cycle_duration_ ;
6851+ // Use a work guard so the io_context doesn’t run out of work and exit.
6852+ asio::executor_work_guard<asio::io_context::executor_type> work_guard_ ;
68466853
68476854 std::size_t failed_receptions_ = 0 ;
68486855 std::size_t failed_responses_ = 0 ;
68496856
68506857 public:
6851- rpc_asio_server ( //
6852- asio::io_context &ctx, std::string const &address, std::uint16_t port, //
6853- std::size_t max_concurrency, std::chrono::microseconds max_cycle_duration)
6854- : context_(ctx), socket_(context_, asio::ip::udp::endpoint(asio::ip::make_address(address), port)),
6855- buffers_ (max_concurrency), clients_(max_concurrency), max_cycle_duration_(max_cycle_duration) {}
6858+ rpc_asio_server (std::string const &address, std::uint16_t port, std::size_t max_concurrency)
6859+ : context_(), socket_(context_), buffers_(max_concurrency), clients_(max_concurrency),
6860+ work_guard_ (asio::make_work_guard(context_)) {
6861+ // Use your helper function to create and bind the native socket.
6862+ auto server = rpc_server_socket (port, address);
6863+ // Now assign the native socket to the ASIO socket.
6864+ socket_.assign (asio::ip::udp::v4 (), server.socket_descriptor );
6865+ }
68566866
68576867 void stop () { should_stop_.store (true , std::memory_order_seq_cst); }
6868+ void close () {
6869+ socket_.cancel ();
6870+ context_.stop ();
6871+ if (context_thread_.joinable ()) context_thread_.join ();
6872+ }
68586873
68596874 void operator ()() {
6860- while (!should_stop_.load (std::memory_order_seq_cst)) one_batch ();
6875+ // For per-operation cancellations we could use the `asio::cancellation_signal`.
6876+ // Let's issue a receive operation for each buffer, which will call a chain of
6877+ // operations to process the packet and send a response, and repeat again.
6878+ for (std::size_t job = 0 ; job < buffers_.size (); ++job) reuse_buffer (job);
6879+ // Start listening for incoming packets.
6880+ context_thread_ = std::thread ([this ] { context_.run (); });
68616881 }
68626882
6863- void one_batch () {
6864- // For per-operation cancellations we could use the `asio::cancellation_signal`,
6865- // but this is the simple lucky case when we only want to cancel all the outstanding
6866- // transfers at once.
6867- std::atomic<std::size_t > remaining = 0 ;
6868- for (std::size_t job = 0 ; job < buffers_.size (); ++job, ++remaining) {
6869- auto finalize = [this , &remaining](std::error_code error, std::size_t ) {
6870- remaining--;
6871- if (error) failed_responses_++;
6872- };
6873- auto respond = [this , job, finalize, &remaining](std::error_code error, std::size_t bytes) {
6874- if (error) { remaining--; }
6875- else { socket_.async_send_to (asio::buffer (buffers_[job], bytes), clients_[job], finalize); }
6876- };
6877- socket_.async_receive_from (asio::buffer (buffers_[job]), clients_[job], respond);
6878- }
6879- std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now () + max_cycle_duration_;
6880- asio::steady_timer timer (context_, expiry);
6881- timer.wait ();
6882- if (remaining) socket_.cancel (); // Forcibly abort all ops on this socket
6883+ private:
6884+ void reuse_buffer (std::size_t job) {
6885+ auto finalize = [this , job](std::error_code error, std::size_t ) {
6886+ if (error) failed_responses_++;
6887+ if (should_stop_.load (std::memory_order_seq_cst)) return ;
6888+ reuse_buffer (job);
6889+ };
6890+ auto respond = [this , finalize, job](std::error_code error, std::size_t bytes) {
6891+ if (error) { reuse_buffer (job); }
6892+ else { socket_.async_send_to (asio::buffer (buffers_[job], bytes), clients_[job], finalize); }
6893+ };
6894+ socket_.async_receive_from (asio::buffer (buffers_[job]), clients_[job], respond);
68836895 }
68846896};
68856897
68866898class rpc_asio_client {
68876899
6888- asio::io_context & context_;
6900+ asio::io_context context_;
68896901 asio::ip::udp::socket socket_;
68906902 asio::ip::udp::endpoint server_;
6903+ std::thread context_thread_;
68916904
68926905 // / @brief Buffers, one per concurrent request
68936906 std::vector<rpc_buffer_t > buffers_;
68946907 // / @brief Track the send timestamps for each slot to measure latency
68956908 std::vector<std::chrono::steady_clock::time_point> send_times_;
6896- // / @brief Maximum time for this entire batch
6897- std::chrono::microseconds max_cycle_duration_ ;
6909+ // Work guard to keep the io_context running.
6910+ asio::executor_work_guard<asio::io_context::executor_type> work_guard_ ;
68986911
68996912 public:
6900- rpc_asio_client ( //
6901- asio::io_context &ctx, std::string const &server_addr, std::uint16_t port, //
6902- std::size_t concurrency, std::chrono::microseconds max_cycle_duration)
6903- : context_(ctx), socket_(ctx, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0 )), buffers_(concurrency),
6904- send_times_ (concurrency), max_cycle_duration_(max_cycle_duration) {
6905-
6906- // Resolve the server address
6907- asio::ip::udp::resolver resolver (context_);
6908- asio::ip::udp::resolver::results_type endpoints = resolver.resolve (server_addr, std::to_string (port));
6909- server_ = *endpoints.begin (); // Take the first resolved endpoint
6913+ rpc_asio_client (std::string const &server_addr, std::uint16_t port, std::size_t concurrency)
6914+ : context_(), socket_(context_), buffers_(concurrency), send_times_(concurrency),
6915+ work_guard_ (asio::make_work_guard(context_)) {
6916+
6917+ // Use the helper function to create the native client socket.
6918+ auto client = rpc_client_socket (server_addr, port);
6919+ // Assign the native socket to the ASIO socket.
6920+ socket_.assign (asio::ip::udp::v4 (), client.socket_descriptor );
6921+ // Convert the native `sockaddr_in` from our `rpc_client_socket` to an ASIO endpoint.
6922+ server_ = asio::ip::udp::endpoint ( //
6923+ asio::ip::address_v4 (ntohl (client.server_address .sin_addr .s_addr )), //
6924+ ntohs (client.server_address .sin_port ));
6925+ // Start listening for incoming packets.
6926+ context_thread_ = std::thread ([this ] { context_.run (); });
69106927
69116928 // Fill each buffer with some pattern (just 'X's, for example)
69126929 for (auto &buf : buffers_) buf.fill (' X' );
69136930 }
69146931
6932+ ~rpc_asio_client () {
6933+ socket_.cancel ();
6934+ context_.stop ();
6935+ if (context_thread_.joinable ()) context_thread_.join ();
6936+ }
6937+
69156938 rpc_batch_result operator ()() { return one_batch (); }
69166939
69176940 private:
@@ -6942,56 +6965,30 @@ class rpc_asio_client {
69426965 socket_.async_send_to (asio::buffer (buffers_[job]), server_, receive);
69436966 result.sent_packets ++;
69446967 }
6945- std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now () + max_cycle_duration_;
6968+
6969+ std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now () + rpc_batch_timeout_k;
69466970 asio::steady_timer timer (context_, expiry);
69476971 timer.wait ();
69486972 if (remaining) socket_.cancel (); // Forcibly abort all ops on this socket
69496973 return result;
69506974 }
69516975};
69526976
6953- static void rpc_asio ( //
6954- bm::State &state, std::string const &address, //
6955- std::size_t batch_size, std::size_t packet_size, std::chrono::microseconds timeout) {
6956-
6957- constexpr std::uint16_t rpc_port_k = 12345 ;
6958-
6959- // Create server and client
6960- asio::io_context server_context;
6961- asio::io_context client_context;
6962-
6963- rpc_asio_server server (server_context, address, rpc_port_k, batch_size, timeout);
6964- rpc_asio_client client (client_context, address, rpc_port_k, batch_size, timeout);
6965-
6966- // The order of the following thread-initializations is important
6967- std::thread server_context_thread ([&]() { server_context.run (); });
6968- std::thread client_context_thread ([&]() { client_context.run (); });
6969- std::thread server_thread (std::ref (server));
6970- std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
6971-
6972- // Benchmark the round-trip time
6973- rpc_batch_result stats;
6974- for (auto _ : state) stats += client ();
6975-
6976- // Stop contexts and wait for threads to finish
6977- server.stop ();
6978- server_context.stop (); // Stops the server context
6979- client_context.stop (); // Stops the client context
6980- server_thread.join (); // Wait for the server itself to finish
6981- server_context_thread.join (); // Wait for the server thread to finish
6982- client_context_thread.join (); // Wait for the client thread to finish
6983-
6984- // Process and report stats
6985- stats.batch_latency =
6986- stats.received_packets ? stats.batch_latency / state.iterations () : std::chrono::nanoseconds::zero ();
6987- state.SetItemsProcessed (stats.sent_packets );
6988- state.SetBytesProcessed (stats.sent_packets * packet_size);
6989- state.counters [" drop,%" ] = 100.0 * (stats.sent_packets - stats.received_packets ) / stats.sent_packets ;
6990- state.counters [" batch_latency,ns" ] = stats.batch_latency .count ();
6991- state.counters [" max_packet_latency,ns" ] = stats.max_packet_latency .count ();
6977+ static void rpc_asio (bm::State &state, networking_route_t route, std::size_t batch_size, std::size_t packet_size) {
6978+ return rpc<rpc_asio_server, rpc_asio_client>(state, route, batch_size, packet_size);
69926979}
69936980
6994- BENCHMARK_CAPTURE (rpc_asio, local, " 127.0.0.1" , 32 , 1024 , std::chrono::microseconds(50'000 ))->MinTime(2 )->UseRealTime();
6981+ BENCHMARK_CAPTURE (rpc_asio, loopback, networking_route_t ::loopback_k, 256 /* messages per batch */ ,
6982+ 1024 /* bytes per packet */ )
6983+ ->MinTime(2 )
6984+ ->UseManualTime()
6985+ ->Unit(benchmark::kMicrosecond );
6986+
6987+ BENCHMARK_CAPTURE (rpc_asio, public, networking_route_t ::public_k, 256 /* messages per batch */ ,
6988+ 1024 /* bytes per packet */ )
6989+ ->MinTime(2 )
6990+ ->UseManualTime()
6991+ ->Unit(benchmark::kMicrosecond );
69956992
69966993#pragma endregion // ASIO
69976994
0 commit comments