1- use std:: {
2- cell:: Cell ,
3- collections:: { BinaryHeap , VecDeque } ,
4- time:: Duration ,
5- } ;
1+ use std:: { collections:: VecDeque , time:: Duration } ;
62
73use crate :: clock:: Timestamp ;
84
95pub struct Connection < T > {
106 bandwidth_bps : Option < u64 > ,
117 latency : Duration ,
12- // the messages with the fewest bytes left will be received sooner
13- bandwidth_queue : BinaryHeap < Envelope < T > > ,
8+ // messages go across the channel one-at-a-time, so this is FIFO
9+ bandwidth_queue : VecDeque < ( T , u64 ) > ,
1410 // every message has the same latency, so this is FIFO
1511 latency_queue : VecDeque < ( T , Timestamp ) > ,
16- next_id : u64 ,
1712 last_event : Timestamp ,
1813}
1914
@@ -22,9 +17,8 @@ impl<T> Connection<T> {
2217 Self {
2318 bandwidth_bps,
2419 latency,
25- bandwidth_queue : BinaryHeap :: new ( ) ,
20+ bandwidth_queue : VecDeque :: new ( ) ,
2621 latency_queue : VecDeque :: new ( ) ,
27- next_id : 0 ,
2822 last_event : Timestamp :: zero ( ) ,
2923 }
3024 }
@@ -34,25 +28,18 @@ impl<T> Connection<T> {
3428 self . latency_queue . push_back ( ( message, now + self . latency ) ) ;
3529 } else {
3630 self . update_bandwidth_queue ( now) ;
37- let id = self . next_id ;
38- self . next_id += 1 ;
39- self . bandwidth_queue . push ( Envelope {
40- id,
41- body : message,
42- bytes_left : Cell :: new ( bytes) ,
43- } ) ;
31+ self . bandwidth_queue . push_back ( ( message, bytes) ) ;
4432 }
4533 }
4634
4735 pub fn next_arrival_time ( & self ) -> Option < Timestamp > {
4836 if let Some ( ( _, timestamp) ) = self . latency_queue . front ( ) {
4937 return Some ( * timestamp) ;
5038 }
51- let next_bandwidther = self . bandwidth_queue . peek ( ) ?;
52- let bps = self . bandwidth_bps ? / self . bandwidth_queue . len ( ) as u64 ;
39+ let ( _, bytes_left) = self . bandwidth_queue . front ( ) ?;
5340 Some (
5441 self . last_event
55- + compute_bandwidth_delay ( bps , next_bandwidther . bytes_left . get ( ) )
42+ + compute_bandwidth_delay ( self . bandwidth_bps ? , * bytes_left)
5643 + self . latency ,
5744 )
5845 }
@@ -69,79 +56,36 @@ impl<T> Connection<T> {
6956 }
7057
7158 fn update_bandwidth_queue ( & mut self , now : Timestamp ) {
72- let Some ( total_bandwidth ) = self . bandwidth_bps else {
59+ let Some ( bps ) = self . bandwidth_bps else {
7360 return ;
7461 } ;
7562
7663 if self . last_event == now {
7764 return ;
7865 }
7966
80- let mut bytes_consumed = 0 ;
81- while let Some ( envelope) = self . bandwidth_queue . peek ( ) {
82- let bps = total_bandwidth / self . bandwidth_queue . len ( ) as u64 ;
83- let next_event = self . last_event
84- + compute_bandwidth_delay ( bps, envelope. bytes_left . get ( ) - bytes_consumed) ;
67+ while let Some ( ( _, bytes) ) = self . bandwidth_queue . front_mut ( ) {
68+ let next_event = self . last_event + compute_bandwidth_delay ( bps, * bytes) ;
8569 if next_event <= now {
86- let envelope = self . bandwidth_queue . pop ( ) . unwrap ( ) ;
87- bytes_consumed = envelope. bytes_left . get ( ) ;
70+ let ( message, _) = self . bandwidth_queue . pop_front ( ) . unwrap ( ) ;
8871 let arrival_time = next_event + self . latency ;
89- self . latency_queue . push_back ( ( envelope . body , arrival_time) ) ;
72+ self . latency_queue . push_back ( ( message , arrival_time) ) ;
9073 self . last_event = next_event;
9174 } else {
9275 let elapsed = now - self . last_event ;
93- bytes_consumed + = elapsed. as_micros ( ) as u64 * bps / 1_000_000 ;
76+ * bytes - = elapsed. as_micros ( ) as u64 * bps / 1_000_000 ;
9477 break ;
9578 }
9679 }
9780
9881 self . last_event = now;
99-
100- // update how many bytes are left for remaining queue items
101- // this updates every item by the same amount,
102- // so it doesn't violate BinaryHeap invariants.
103- if bytes_consumed == 0 {
104- return ;
105- }
106- for envelope in self . bandwidth_queue . iter ( ) {
107- let bytes_left = envelope. bytes_left . get ( ) ;
108- envelope. bytes_left . set ( bytes_left - bytes_consumed) ;
109- }
11082 }
11183}
11284
11385fn compute_bandwidth_delay ( bps : u64 , bytes : u64 ) -> Duration {
11486 Duration :: from_micros ( ( bytes * 1_000_000 ) / bps)
11587}
11688
117- // Ordering is by fewest bytes left, then by lowest id.
118- struct Envelope < T > {
119- id : u64 ,
120- body : T ,
121- bytes_left : Cell < u64 > ,
122- }
123-
124- impl < T > PartialEq for Envelope < T > {
125- fn eq ( & self , other : & Self ) -> bool {
126- self . id . eq ( & other. id )
127- }
128- }
129- impl < T > Eq for Envelope < T > { }
130-
131- impl < T > PartialOrd for Envelope < T > {
132- fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
133- Some ( self . cmp ( other) )
134- }
135- }
136- impl < T > Ord for Envelope < T > {
137- fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
138- self . bytes_left
139- . cmp ( & other. bytes_left )
140- . then ( self . id . cmp ( & other. id ) )
141- . reverse ( )
142- }
143- }
144-
14589#[ cfg( test) ]
14690mod tests {
14791 use std:: time:: Duration ;
@@ -214,7 +158,7 @@ mod tests {
214158 }
215159
216160 #[ test]
217- fn should_split_bandwidth_between_scheduled_messages ( ) {
161+ fn should_use_all_bandwidth_for_one_message_at_a_time ( ) {
218162 let latency = Duration :: ZERO ;
219163 let bandwidth_bps = Some ( 1000 ) ;
220164 let mut conn = Connection :: new ( latency, bandwidth_bps) ;
@@ -224,26 +168,7 @@ mod tests {
224168 conn. send ( "message 1" , 1000 , start) ;
225169 conn. send ( "message 2" , 1000 , start) ;
226170
227- let arrival_time = start + Duration :: from_secs ( 2 ) ;
228- assert_eq ! ( conn. next_arrival_time( ) , Some ( arrival_time) ) ;
229- assert_eq ! ( conn. recv( arrival_time) , "message 1" ) ;
230- assert_eq ! ( conn. next_arrival_time( ) , Some ( arrival_time) ) ;
231- assert_eq ! ( conn. recv( arrival_time) , "message 2" ) ;
232- assert_eq ! ( conn. next_arrival_time( ) , None ) ;
233- }
234-
235- #[ test]
236- fn should_stop_splitting_bandwidth_when_one_message_goes_through ( ) {
237- let latency = Duration :: ZERO ;
238- let bandwidth_bps = Some ( 1000 ) ;
239- let mut conn = Connection :: new ( latency, bandwidth_bps) ;
240- assert_eq ! ( conn. next_arrival_time( ) , None ) ;
241-
242- let start = Timestamp :: zero ( ) + Duration :: from_secs ( 1 ) ;
243- conn. send ( "message 1" , 1000 , start) ;
244- conn. send ( "message 2" , 2000 , start) ;
245-
246- let first_arrival_time = start + Duration :: from_secs ( 2 ) ;
171+ let first_arrival_time = start + Duration :: from_secs ( 1 ) ;
247172 assert_eq ! ( conn. next_arrival_time( ) , Some ( first_arrival_time) ) ;
248173 assert_eq ! ( conn. recv( first_arrival_time) , "message 1" ) ;
249174 let second_arrival_time = first_arrival_time + Duration :: from_secs ( 1 ) ;
@@ -253,24 +178,23 @@ mod tests {
253178 }
254179
255180 #[ test]
256- fn should_start_splitting_bandwidth_when_second_message_is_sent ( ) {
181+ fn should_delay_second_message_if_first_one_is_in_flight ( ) {
257182 let latency = Duration :: ZERO ;
258183 let bandwidth_bps = Some ( 1000 ) ;
259184 let mut conn = Connection :: new ( latency, bandwidth_bps) ;
260185 assert_eq ! ( conn. next_arrival_time( ) , None ) ;
261186
262187 let start = Timestamp :: zero ( ) + Duration :: from_secs ( 1 ) ;
263- let og_first_arrival_time = start + Duration :: from_secs ( 1 ) ;
188+ let first_arrival_time = start + Duration :: from_secs ( 1 ) ;
264189 conn. send ( "message 1" , 1000 , start) ;
265- assert_eq ! ( conn. next_arrival_time( ) , Some ( og_first_arrival_time ) ) ;
190+ assert_eq ! ( conn. next_arrival_time( ) , Some ( first_arrival_time ) ) ;
266191
267192 let second_start = start + Duration :: from_millis ( 500 ) ;
268193 conn. send ( "message 2" , 1000 , second_start) ;
269194
270- let first_arrival_time = start + Duration :: from_millis ( 1500 ) ;
271195 assert_eq ! ( conn. next_arrival_time( ) , Some ( first_arrival_time) ) ;
272196 assert_eq ! ( conn. recv( first_arrival_time) , "message 1" ) ;
273- let second_arrival_time = first_arrival_time + Duration :: from_millis ( 500 ) ;
197+ let second_arrival_time = first_arrival_time + Duration :: from_millis ( 1000 ) ;
274198 assert_eq ! ( conn. next_arrival_time( ) , Some ( second_arrival_time) ) ;
275199 assert_eq ! ( conn. recv( second_arrival_time) , "message 2" ) ;
276200 assert_eq ! ( conn. next_arrival_time( ) , None ) ;
0 commit comments