@@ -6817,86 +6817,105 @@ BENCHMARK_CAPTURE(rpc_uring60, public, networking_route_t::public_k, 256 /* mess
68176817
68186818class rpc_asio_server {
68196819
6820- asio::io_context & context_;
6820+ asio::io_context context_;
68216821 asio::ip::udp::socket socket_;
6822+ std::thread context_thread_;
68226823
68236824 // / @brief Buffers, one per concurrent request
68246825 std::vector<rpc_buffer_t > buffers_;
68256826 // / @brief Where did the packets come from
68266827 std::vector<asio::ip::udp::endpoint> clients_;
6828+ // / @brief Which buffers are available?
6829+ std::vector<std::size_t > buffers_available_;
68276830 // / @brief Flag to stop the server without corrupting the state
68286831 std::atomic_bool should_stop_;
6829- // / @brief Maximum time for this entire batch
6830- std::chrono::microseconds max_cycle_duration_ ;
6832+ // Use a work guard so the io_context doesn’t run out of work and exit.
6833+ asio::executor_work_guard<asio::io_context::executor_type> work_guard_ ;
68316834
68326835 std::size_t failed_receptions_ = 0 ;
68336836 std::size_t failed_responses_ = 0 ;
68346837
68356838 public:
6836- rpc_asio_server ( //
6837- asio::io_context &ctx, std::string const &address, std::uint16_t port, //
6838- std::size_t max_concurrency, std::chrono::microseconds max_cycle_duration)
6839- : context_(ctx), socket_(context_, asio::ip::udp::endpoint(asio::ip::make_address(address), port)),
6840- buffers_ (max_concurrency), clients_(max_concurrency), max_cycle_duration_(max_cycle_duration) {}
6839+ rpc_asio_server (std::string const &address, std::uint16_t port, std::size_t max_concurrency)
6840+ : context_(), socket_(context_), buffers_(max_concurrency), clients_(max_concurrency),
6841+ work_guard_ (asio::make_work_guard(context_)) {
6842+ // Use your helper function to create and bind the native socket.
6843+ auto server = rpc_server_socket (port, address);
6844+ // Now assign the native socket to the ASIO socket.
6845+ socket_.assign (asio::ip::udp::v4 (), server.socket_descriptor );
6846+ }
68416847
68426848 void stop () { should_stop_.store (true , std::memory_order_seq_cst); }
6849+ void close () {
6850+ socket_.cancel ();
6851+ context_.stop ();
6852+ if (context_thread_.joinable ()) context_thread_.join ();
6853+ }
68436854
68446855 void operator ()() {
6845- while (!should_stop_.load (std::memory_order_seq_cst)) one_batch ();
6856+ // For per-operation cancellations we could use the `asio::cancellation_signal`.
6857+ // Let's issue a receive operation for each buffer, which will call a chain of
6858+ // operations to process the packet and send a response, and repeat again.
6859+ for (std::size_t job = 0 ; job < buffers_.size (); ++job) reuse_buffer (job);
6860+ // Start listening for incoming packets.
6861+ context_thread_ = std::thread ([this ] { context_.run (); });
68466862 }
68476863
6848- void one_batch () {
6849- // For per-operation cancellations we could use the `asio::cancellation_signal`,
6850- // but this is the simple lucky case when we only want to cancel all the outstanding
6851- // transfers at once.
6852- std::atomic<std::size_t > remaining = 0 ;
6853- for (std::size_t job = 0 ; job < buffers_.size (); ++job, ++remaining) {
6854- auto finalize = [this , &remaining](std::error_code error, std::size_t ) {
6855- remaining--;
6856- if (error) failed_responses_++;
6857- };
6858- auto respond = [this , job, finalize, &remaining](std::error_code error, std::size_t bytes) {
6859- if (error) { remaining--; }
6860- else { socket_.async_send_to (asio::buffer (buffers_[job], bytes), clients_[job], finalize); }
6861- };
6862- socket_.async_receive_from (asio::buffer (buffers_[job]), clients_[job], respond);
6863- }
6864- std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now () + max_cycle_duration_;
6865- asio::steady_timer timer (context_, expiry);
6866- timer.wait ();
6867- if (remaining) socket_.cancel (); // Forcibly abort all ops on this socket
6864+ private:
6865+ void reuse_buffer (std::size_t job) {
6866+ auto finalize = [this , job](std::error_code error, std::size_t ) {
6867+ if (error) failed_responses_++;
6868+ if (should_stop_.load (std::memory_order_seq_cst)) return ;
6869+ reuse_buffer (job);
6870+ };
6871+ auto respond = [this , finalize, job](std::error_code error, std::size_t bytes) {
6872+ if (error) { reuse_buffer (job); }
6873+ else { socket_.async_send_to (asio::buffer (buffers_[job], bytes), clients_[job], finalize); }
6874+ };
6875+ socket_.async_receive_from (asio::buffer (buffers_[job]), clients_[job], respond);
68686876 }
68696877};
68706878
68716879class rpc_asio_client {
68726880
6873- asio::io_context & context_;
6881+ asio::io_context context_;
68746882 asio::ip::udp::socket socket_;
68756883 asio::ip::udp::endpoint server_;
6884+ std::thread context_thread_;
68766885
68776886 // / @brief Buffers, one per concurrent request
68786887 std::vector<rpc_buffer_t > buffers_;
68796888 // / @brief Track the send timestamps for each slot to measure latency
68806889 std::vector<std::chrono::steady_clock::time_point> send_times_;
6881- // / @brief Maximum time for this entire batch
6882- std::chrono::microseconds max_cycle_duration_ ;
6890+ // Work guard to keep the io_context running.
6891+ asio::executor_work_guard<asio::io_context::executor_type> work_guard_ ;
68836892
68846893 public:
6885- rpc_asio_client ( //
6886- asio::io_context &ctx, std::string const &server_addr, std::uint16_t port, //
6887- std::size_t concurrency, std::chrono::microseconds max_cycle_duration)
6888- : context_(ctx), socket_(ctx, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0 )), buffers_(concurrency),
6889- send_times_ (concurrency), max_cycle_duration_(max_cycle_duration) {
6890-
6891- // Resolve the server address
6892- asio::ip::udp::resolver resolver (context_);
6893- asio::ip::udp::resolver::results_type endpoints = resolver.resolve (server_addr, std::to_string (port));
6894- server_ = *endpoints.begin (); // Take the first resolved endpoint
6894+ rpc_asio_client (std::string const &server_addr, std::uint16_t port, std::size_t concurrency)
6895+ : context_(), socket_(context_), buffers_(concurrency), send_times_(concurrency),
6896+ work_guard_ (asio::make_work_guard(context_)) {
6897+
6898+ // Use the helper function to create the native client socket.
6899+ auto client = rpc_client_socket (server_addr, port);
6900+ // Assign the native socket to the ASIO socket.
6901+ socket_.assign (asio::ip::udp::v4 (), client.socket_descriptor );
6902+ // Convert the native `sockaddr_in` from our `rpc_client_socket` to an ASIO endpoint.
6903+ server_ = asio::ip::udp::endpoint ( //
6904+ asio::ip::address_v4 (ntohl (client.server_address .sin_addr .s_addr )), //
6905+ ntohs (client.server_address .sin_port ));
6906+ // Start listening for incoming packets.
6907+ context_thread_ = std::thread ([this ] { context_.run (); });
68956908
68966909 // Fill each buffer with some pattern (just 'X's, for example)
68976910 for (auto &buf : buffers_) buf.fill (' X' );
68986911 }
68996912
6913+ ~rpc_asio_client () {
6914+ socket_.cancel ();
6915+ context_.stop ();
6916+ if (context_thread_.joinable ()) context_thread_.join ();
6917+ }
6918+
69006919 rpc_batch_result operator ()() { return one_batch (); }
69016920
69026921 private:
@@ -6927,56 +6946,30 @@ class rpc_asio_client {
69276946 socket_.async_send_to (asio::buffer (buffers_[job]), server_, receive);
69286947 result.sent_packets ++;
69296948 }
6930- std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now () + max_cycle_duration_;
6949+
6950+ std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now () + rpc_batch_timeout_k;
69316951 asio::steady_timer timer (context_, expiry);
69326952 timer.wait ();
69336953 if (remaining) socket_.cancel (); // Forcibly abort all ops on this socket
69346954 return result;
69356955 }
69366956};
69376957
6938- static void rpc_asio ( //
6939- bm::State &state, std::string const &address, //
6940- std::size_t batch_size, std::size_t packet_size, std::chrono::microseconds timeout) {
6941-
6942- constexpr std::uint16_t rpc_port_k = 12345 ;
6943-
6944- // Create server and client
6945- asio::io_context server_context;
6946- asio::io_context client_context;
6947-
6948- rpc_asio_server server (server_context, address, rpc_port_k, batch_size, timeout);
6949- rpc_asio_client client (client_context, address, rpc_port_k, batch_size, timeout);
6950-
6951- // The order of the following thread-initializations is important
6952- std::thread server_context_thread ([&]() { server_context.run (); });
6953- std::thread client_context_thread ([&]() { client_context.run (); });
6954- std::thread server_thread (std::ref (server));
6955- std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
6956-
6957- // Benchmark the round-trip time
6958- rpc_batch_result stats;
6959- for (auto _ : state) stats += client ();
6960-
6961- // Stop contexts and wait for threads to finish
6962- server.stop ();
6963- server_context.stop (); // Stops the server context
6964- client_context.stop (); // Stops the client context
6965- server_thread.join (); // Wait for the server itself to finish
6966- server_context_thread.join (); // Wait for the server thread to finish
6967- client_context_thread.join (); // Wait for the client thread to finish
6968-
6969- // Process and report stats
6970- stats.batch_latency =
6971- stats.received_packets ? stats.batch_latency / state.iterations () : std::chrono::nanoseconds::zero ();
6972- state.SetItemsProcessed (stats.sent_packets );
6973- state.SetBytesProcessed (stats.sent_packets * packet_size);
6974- state.counters [" drop,%" ] = 100.0 * (stats.sent_packets - stats.received_packets ) / stats.sent_packets ;
6975- state.counters [" batch_latency,ns" ] = stats.batch_latency .count ();
6976- state.counters [" max_packet_latency,ns" ] = stats.max_packet_latency .count ();
6958+ static void rpc_asio (bm::State &state, networking_route_t route, std::size_t batch_size, std::size_t packet_size) {
6959+ return rpc<rpc_asio_server, rpc_asio_client>(state, route, batch_size, packet_size);
69776960}
69786961
6979- BENCHMARK_CAPTURE (rpc_asio, local, " 127.0.0.1" , 32 , 1024 , std::chrono::microseconds(50'000 ))->MinTime(2 )->UseRealTime();
6962+ BENCHMARK_CAPTURE (rpc_asio, loopback, networking_route_t ::loopback_k, 256 /* messages per batch */ ,
6963+ 1024 /* bytes per packet */ )
6964+ ->MinTime(2 )
6965+ ->UseManualTime()
6966+ ->Unit(benchmark::kMicrosecond );
6967+
6968+ BENCHMARK_CAPTURE (rpc_asio, public, networking_route_t ::public_k, 256 /* messages per batch */ ,
6969+ 1024 /* bytes per packet */ )
6970+ ->MinTime(2 )
6971+ ->UseManualTime()
6972+ ->Unit(benchmark::kMicrosecond );
69806973
69816974#pragma endregion // ASIO
69826975
0 commit comments