@@ -6508,6 +6508,10 @@ BENCHMARK_CAPTURE(rpc_uring55, public, networking_route_t::public_k, 256 /* mess
65086508/* *
65096509 * @brief A minimal RPC @b server using @b `io_uring` functionality
65106510 * to setup the UDP socket, and process many requests concurrently.
6511+ *
6512+ * Unlike the `rpc_uring55_server`, this version:
6513+ * - registers buffers and off-loads buffer selection to the kernel
6514+ * - reduces the number of receive operations, using multi-shot receive
65116515 */
65126516class rpc_uring60_server {
65136517
@@ -6537,7 +6541,7 @@ class rpc_uring60_server {
65376541
65386542 // Initialize `io_uring` with one slot for each receive/send operation
65396543 // TODO: |= IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_SUBMIT_ALL
6540- auto io_uring_setup_flags = IORING_SETUP_SQPOLL ;
6544+ auto io_uring_setup_flags = 0 ;
65416545 if (io_uring_queue_init (max_concurrency * 2 , &ring_, io_uring_setup_flags) < 0 )
65426546 raise_system_error (" Failed to initialize io_uring 6.0 server" );
65436547 if (io_uring_register_files (&ring_, &socket_descriptor_, 1 ) < 0 )
@@ -6573,18 +6577,16 @@ class rpc_uring60_server {
65736577 void stop () noexcept { should_stop_.store (true , std::memory_order_seq_cst); }
65746578
65756579 void operator ()() noexcept {
6576- // Submit initial receive operations
6577- for (message_t &message : messages_) {
6578- message.status = status_t ::receiving_k;
6579- memset (&message.peer_address , 0 , sizeof (sockaddr_in));
6580+ // Submit the initial receive operation
6581+ {
6582+ message_t &message = *messages_.begin ();
65806583 struct io_uring_sqe *receive_entry = io_uring_get_sqe (&ring_);
6581- // TODO: Switch to multishot:
6582- // io_uring_prep_recvmsg_multishot( receive_entry, socket_descriptor_, &message.header,
6583- // IOSQE_BUFFER_SELECT | IOSQE_FIXED_FILE) ;
6584- io_uring_prep_recvmsg ( receive_entry, socket_descriptor_, &message. header , 0 ) ;
6584+ io_uring_prep_recvmsg_multishot (receive_entry, socket_descriptor_, &message. header , MSG_TRUNC);
6585+ receive_entry-> flags |= IOSQE_FIXED_FILE;
6586+ receive_entry-> flags |= IOSQE_BUFFER_SELECT ;
6587+ receive_entry-> buf_group = 0 ;
65856588 io_uring_sqe_set_data (receive_entry, &message);
65866589 }
6587-
65886590 io_uring_submit (&ring_);
65896591
65906592 while (!should_stop_.load (std::memory_order_seq_cst)) {
@@ -6600,6 +6602,7 @@ class rpc_uring60_server {
66006602 struct io_uring_sqe *send_entry = io_uring_get_sqe (&ring_);
66016603 message.status = status_t ::sending_k;
66026604 io_uring_prep_sendmsg_zc (send_entry, socket_descriptor_, &message.header , 0 );
6605+ send_entry->flags |= IOSQE_FIXED_FILE;
66036606 io_uring_sqe_set_data (send_entry, &message);
66046607 }
66056608
@@ -6609,6 +6612,7 @@ class rpc_uring60_server {
66096612 message.status = status_t ::receiving_k;
66106613 memset (&message.peer_address , 0 , sizeof (sockaddr_in));
66116614 io_uring_prep_recvmsg (receive_entry, socket_descriptor_, &message.header , 0 );
6615+ receive_entry->flags |= IOSQE_FIXED_FILE;
66126616 io_uring_sqe_set_data (receive_entry, &message);
66136617 }
66146618
@@ -6655,7 +6659,7 @@ class rpc_uring60_client {
66556659 // Initialize io_uring with one slot for each send/receive/timeout operation,
66566660 // as well as a batch-level timeout operation and a cancel operation for the
66576661 // batch-level timeout.
6658- auto io_uring_setup_flags = IORING_SETUP_SQPOLL ;
6662+ auto io_uring_setup_flags = 0 ;
66596663 if (io_uring_queue_init (concurrency * 3 + 1 + 1 , &ring_, io_uring_setup_flags) < 0 )
66606664 raise_system_error (" Failed to initialize io_uring 6.0 client" );
66616665 if (io_uring_register_files (&ring_, &socket_descriptor_, 1 ) < 0 )
0 commit comments