@@ -5,14 +5,22 @@ use tokio::sync::mpsc::{self};
55use crate :: core:: error:: BluefinError ;
66use crate :: core:: header:: PacketType ;
77use crate :: core:: packet:: BluefinPacket ;
8+ use crate :: core:: Extract ;
89use crate :: net:: ack_handler:: AckBuffer ;
910use crate :: net:: connection:: ConnectionBuffer ;
1011use crate :: net:: { ConnectionManagedBuffers , MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM } ;
1112use crate :: utils:: common:: BluefinResult ;
1213use std:: sync:: { Arc , MutexGuard } ;
1314
15+ /// This is arbitrary number of worker tasks to use if we cannot decide how many worker tasks
16+ /// to spawn.
1417const DEFAULT_NUMBER_OF_TASKS_TO_SPAWN : usize = 3 ;
1518
19+ /// [ConnReaderHandler] is a handle to network read-related functionalities. As the name suggests,
20+ /// we this handler is specific for *connection* reads. That is, this handler can only be used
21+ /// when a Bluefin connection has been established. This reader is fundamentally different from that
22+ /// of the [crate::worker::reader::ReaderRxChannel] as this will only read packets from the wire
23+ /// intended for the connection.
1624pub ( crate ) struct ConnReaderHandler {
1725 socket : Arc < UdpSocket > ,
1826 conn_bufs : Arc < ConnectionManagedBuffers > ,
@@ -23,26 +31,44 @@ impl ConnReaderHandler {
2331 Self { socket, conn_bufs }
2432 }
2533
34+ /// Starts the handler worker jobs. This starts the worker tasks, which busy-polls a connected
35+ /// UDP socket for packets. Upon receiving bytes, these workers will send them to another
36+ /// channel for processing. Then second kind of worker is the processing channel, which receives
37+ /// bytes, attempts to deserialise them into bluefin packets and buffer them in the correct
38+ /// buffer.
2639 pub ( crate ) fn start ( & self ) -> BluefinResult < ( ) > {
2740 let ( tx, rx) = mpsc:: channel :: < Vec < BluefinPacket > > ( 1024 ) ;
28- for _ in 0 ..Self :: get_num_cpu_cores ( ) {
41+
42+ // Spawn n-number of UDP-recv tasks.
43+ for _ in 0 ..Self :: get_number_of_tx_tasks ( ) {
2944 let tx_cloned = tx. clone ( ) ;
3045 let socket_cloned = self . socket . clone ( ) ;
3146 spawn ( async move {
3247 let _ = ConnReaderHandler :: tx_impl ( socket_cloned, tx_cloned) . await ;
3348 } ) ;
3449 }
3550
51+ // Spawn the corresponding rx channel which receives bytes from the tx channel and processes
52+ // bytes and buffers them.
3653 let conn_bufs = self . conn_bufs . clone ( ) ;
3754 spawn ( async move {
3855 let _ = ConnReaderHandler :: rx_impl ( rx, & * conn_bufs) . await ;
3956 } ) ;
4057 Ok ( ( ) )
4158 }
4259
60+ /// For linux, we return the expected number of CPU cores. This lets us take advantage of
61+ /// parallelism. For (silicon) macos, we return one. Experiments on Apple Silicon have shown
62+ /// that SO_REUSEPORT does not behave the same way as it does on Linux
63+ /// (see: https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux)
64+ /// and so we cannot take advantage of running the rx-tasks on multiple threads. For now, running
65+ /// one instance of it is performant enough.
66+ ///
67+ /// For all other operating systems (which is currently unsupported by Bluefine anyways), we
68+ /// return an arbitrary default value.
4369 #[ allow( unreachable_code) ]
4470 #[ inline]
45- fn get_num_cpu_cores ( ) -> usize {
71+ fn get_number_of_tx_tasks ( ) -> usize {
4672 // For linux, let's use all the cpu cores available.
4773 #[ cfg( target_os = "linux" ) ]
4874 {
@@ -63,6 +89,10 @@ impl ConnReaderHandler {
6389 DEFAULT_NUMBER_OF_TASKS_TO_SPAWN
6490 }
6591
92+ /// This represents one tx task or one of the multiple producers in the mpsc channel. This
93+ /// function is a hot-loop; it continuously reads from a connected socket. When bytes are
94+ /// received, we attempt to deserialise them into bluefin packets. If valid packets are
95+ /// produced, them we send them to the consumer channel for processing.
6696 #[ inline]
6797 async fn tx_impl (
6898 socket : Arc < UdpSocket > ,
@@ -73,14 +103,12 @@ impl ConnReaderHandler {
73103 let size = socket. recv ( & mut buf) . await ?;
74104 let packets = BluefinPacket :: from_bytes ( & buf[ ..size] ) ?;
75105
76- if packets. len ( ) == 0 {
77- continue ;
78- }
79-
80106 let _ = tx. send ( packets) . await ;
81107 }
82108 }
83109
110+ /// This is the single consumer in the mpsc channel. This receives bluefin packets from
111+ /// n-producers. We place the packets into the relevant buffer.
84112 #[ inline]
85113 async fn rx_impl (
86114 mut rx : mpsc:: Receiver < Vec < BluefinPacket > > ,
@@ -103,7 +131,14 @@ impl ConnReaderHandler {
103131 return Ok ( ( ) ) ;
104132 }
105133
106- // Peek at the first packet and acquire the buffer.
134+ // Peek at the first packet and acquire the buffer. The assumptions here are:
135+ // 1. An udp datagram contains one or more bluefin packets. However, all the packets
136+ // in the datagram are for the same connection (no mix and matching different connection
137+ // packets in the same datagram).
138+ // 2. An udp datagram contains the same type of packets. This means a udp datagram either
139+ // contains all data-type packets or ack-packets.
140+ // Therefore, with these assumptions, we can just peek at the first packet in the datagram
141+ // and then acquire the appropriate lock before processing.
107142 let p = packets. first ( ) . unwrap ( ) ;
108143 match p. header . type_field {
109144 PacketType :: Ack => {
@@ -142,8 +177,8 @@ impl ConnReaderHandler {
142177 packets : Vec < BluefinPacket > ,
143178 ) -> BluefinResult < ( ) > {
144179 let mut e: Option < BluefinError > = None ;
145- for p in packets {
146- if let Err ( err) = guard. buffer_in_bytes ( p) {
180+ for mut p in packets {
181+ if let Err ( err) = guard. buffer_in_bytes ( p. extract ( ) ) {
147182 e = Some ( err) ;
148183 }
149184 }
0 commit comments