77
88use std:: mem;
99use std:: net:: Ipv4Addr ;
10+ use std:: num:: NonZeroUsize ;
1011use std:: sync:: { Arc , Mutex } ;
1112
1213use libc:: EAGAIN ;
@@ -107,9 +108,9 @@ pub struct Net {
107108 pub ( crate ) rx_rate_limiter : RateLimiter ,
108109 pub ( crate ) tx_rate_limiter : RateLimiter ,
109110
110- pub ( crate ) rx_deferred_frame : bool ,
111-
112- rx_bytes_read : usize ,
111+ /// Used to store last RX packet size and
112+ /// rate limit RX queue.
113+ deferred_rx_bytes : Option < NonZeroUsize > ,
113114 rx_frame_buf : [ u8 ; MAX_BUFFER_SIZE ] ,
114115
115116 tx_frame_headers : [ u8 ; frame_hdr_len ( ) ] ,
@@ -175,8 +176,7 @@ impl Net {
175176 queue_evts,
176177 rx_rate_limiter,
177178 tx_rate_limiter,
178- rx_deferred_frame : false ,
179- rx_bytes_read : 0 ,
179+ deferred_rx_bytes : None ,
180180 rx_frame_buf : [ 0u8 ; MAX_BUFFER_SIZE ] ,
181181 tx_frame_headers : [ 0u8 ; frame_hdr_len ( ) ] ,
182182 irq_trigger : IrqTrigger :: new ( ) . map_err ( NetError :: EventFd ) ?,
@@ -298,16 +298,22 @@ impl Net {
298298 // Attempts to copy a single frame into the guest if there is enough
299299 // rate limiting budget.
300300 // Returns true on successful frame delivery.
301- fn rate_limited_rx_single_frame ( & mut self ) -> bool {
302- let rx_queue = & mut self . queues [ RX_INDEX ] ;
303- if !Self :: rate_limiter_consume_op ( & mut self . rx_rate_limiter , self . rx_bytes_read as u64 ) {
304- self . metrics . rx_rate_limiter_throttled . inc ( ) ;
305- return false ;
301+ fn send_deferred_rx_bytes ( & mut self ) -> bool {
302+ match self . deferred_rx_bytes {
303+ Some ( bytes) => {
304+ if Self :: rate_limiter_consume_op ( & mut self . rx_rate_limiter , bytes. get ( ) as u64 ) {
305+ // The packet is good to go, reset `deferred_rx_bytes`.
306+ self . deferred_rx_bytes = None ;
307+ // Attempt frame delivery.
308+ self . rx_buffer . notify_queue ( & mut self . queues [ RX_INDEX ] ) ;
309+ true
310+ } else {
311+ self . metrics . rx_rate_limiter_throttled . inc ( ) ;
312+ false
313+ }
314+ }
315+ None => true ,
306316 }
307-
308- // Attempt frame delivery.
309- self . rx_buffer . notify_queue ( rx_queue) ;
310- true
311317 }
312318
313319 /// Parse available RX `DescriptorChains` from the queue and
@@ -456,6 +462,10 @@ impl Net {
456462
457463 /// Read as many frames as possible.
458464 fn process_rx ( & mut self ) -> Result < ( ) , DeviceError > {
465+ if !self . send_deferred_rx_bytes ( ) {
466+ return Ok ( ( ) ) ;
467+ }
468+
459469 self . parse_rx_descriptors ( ) ;
460470
461471 loop {
@@ -465,11 +475,10 @@ impl Net {
465475 break ;
466476 }
467477 Ok ( count) => {
468- self . rx_bytes_read = count;
478+ self . deferred_rx_bytes = NonZeroUsize :: new ( count) ;
469479 self . metrics . rx_count . inc ( ) ;
470480 self . metrics . rx_packets_count . inc ( ) ;
471- if !self . rate_limited_rx_single_frame ( ) {
472- self . rx_deferred_frame = true ;
481+ if !self . send_deferred_rx_bytes ( ) {
473482 break ;
474483 }
475484 }
@@ -495,26 +504,6 @@ impl Net {
495504 self . try_signal_queue ( NetQueue :: Rx )
496505 }
497506
498- // Process the deferred frame first, then continue reading from tap.
499- fn handle_deferred_frame ( & mut self ) -> Result < ( ) , DeviceError > {
500- if self . rate_limited_rx_single_frame ( ) {
501- self . rx_deferred_frame = false ;
502- // process_rx() was interrupted possibly before consuming all
503- // packets in the tap; try continuing now.
504- return self . process_rx ( ) ;
505- }
506-
507- self . try_signal_queue ( NetQueue :: Rx )
508- }
509-
510- fn resume_rx ( & mut self ) -> Result < ( ) , DeviceError > {
511- if self . rx_deferred_frame {
512- self . handle_deferred_frame ( )
513- } else {
514- self . process_rx ( )
515- }
516- }
517-
518507 fn process_tx ( & mut self ) -> Result < ( ) , DeviceError > {
519508 // This is safe since we checked in the event handler that the device is activated.
520509 let mem = self . device_state . mem ( ) . unwrap ( ) ;
@@ -573,7 +562,7 @@ impl Net {
573562 & self . metrics ,
574563 )
575564 . unwrap_or ( false ) ;
576- if frame_consumed_by_mmds && ! self . rx_deferred_frame {
565+ if frame_consumed_by_mmds {
577566 // MMDS consumed this frame/request, let's also try to process the response.
578567 process_rx_for_mmds = true ;
579568 }
@@ -680,7 +669,7 @@ impl Net {
680669 self . metrics . rx_rate_limiter_throttled . inc ( ) ;
681670 } else {
682671 // If the limiter is not blocked, resume the receiving of bytes.
683- self . resume_rx ( )
672+ self . process_rx ( )
684673 . unwrap_or_else ( |err| report_net_event_fail ( & self . metrics , err) ) ;
685674 }
686675 }
@@ -689,31 +678,14 @@ impl Net {
689678 // This is safe since we checked in the event handler that the device is activated.
690679 self . metrics . rx_tap_event_count . inc ( ) ;
691680
692- // While there are no available RX queue buffers and there's a deferred_frame
693- // don't process any more incoming. Otherwise start processing a frame. In the
694- // process the deferred_frame flag will be set in order to avoid freezing the
695- // RX queue.
696- if self . queues [ RX_INDEX ] . is_empty ( ) && self . rx_deferred_frame {
697- self . metrics . no_rx_avail_buffer . inc ( ) ;
698- return ;
699- }
700-
701681 // While limiter is blocked, don't process any more incoming.
702682 if self . rx_rate_limiter . is_blocked ( ) {
703683 self . metrics . rx_rate_limiter_throttled . inc ( ) ;
704684 return ;
705685 }
706686
707- if self . rx_deferred_frame
708- // Process a deferred frame first if available. Don't read from tap again
709- // until we manage to receive this deferred frame.
710- {
711- self . handle_deferred_frame ( )
712- . unwrap_or_else ( |err| report_net_event_fail ( & self . metrics , err) ) ;
713- } else {
714- self . process_rx ( )
715- . unwrap_or_else ( |err| report_net_event_fail ( & self . metrics , err) ) ;
716- }
687+ self . process_rx ( )
688+ . unwrap_or_else ( |err| report_net_event_fail ( & self . metrics , err) ) ;
717689 }
718690
719691 /// Process a single TX queue event.
@@ -743,7 +715,7 @@ impl Net {
743715 match self . rx_rate_limiter . event_handler ( ) {
744716 Ok ( _) => {
745717 // There might be enough budget now to receive the frame.
746- self . resume_rx ( )
718+ self . process_rx ( )
747719 . unwrap_or_else ( |err| report_net_event_fail ( & self . metrics , err) ) ;
748720 }
749721 Err ( err) => {
@@ -772,7 +744,7 @@ impl Net {
772744
773745 /// Process device virtio queue(s).
774746 pub fn process_virtio_queues ( & mut self ) {
775- let _ = self . resume_rx ( ) ;
747+ let _ = self . process_rx ( ) ;
776748 let _ = self . process_tx ( ) ;
777749 }
778750}
@@ -1164,7 +1136,7 @@ pub mod tests {
11641136 th. rxq . check_used_elem ( 0 , 0 , 0 ) ;
11651137 th. rxq . check_used_elem ( 1 , 3 , 0 ) ;
11661138 // Check that the frame wasn't deferred.
1167- assert ! ( ! th. net( ) . rx_deferred_frame ) ;
1139+ assert ! ( th. net( ) . deferred_rx_bytes . is_none ( ) ) ;
11681140 // Check that the frame has been written successfully to the valid Rx descriptor chain.
11691141 th. rxq
11701142 . check_used_elem ( 2 , 4 , frame. len ( ) . try_into ( ) . unwrap ( ) ) ;
@@ -1198,7 +1170,7 @@ pub mod tests {
11981170 ) ;
11991171
12001172 // Check that the frame wasn't deferred.
1201- assert ! ( ! th. net( ) . rx_deferred_frame ) ;
1173+ assert ! ( th. net( ) . deferred_rx_bytes . is_none ( ) ) ;
12021174 // Check that the used queue has advanced.
12031175 assert_eq ! ( th. rxq. used. idx. get( ) , 1 ) ;
12041176 assert ! ( & th. net( ) . irq_trigger. has_pending_irq( IrqType :: Vring ) ) ;
@@ -1238,7 +1210,7 @@ pub mod tests {
12381210 ) ;
12391211
12401212 // Check that the frames weren't deferred.
1241- assert ! ( ! th. net( ) . rx_deferred_frame ) ;
1213+ assert ! ( th. net( ) . deferred_rx_bytes . is_none ( ) ) ;
12421214 // Check that the used queue has advanced.
12431215 assert_eq ! ( th. rxq. used. idx. get( ) , 2 ) ;
12441216 assert ! ( & th. net( ) . irq_trigger. has_pending_irq( IrqType :: Vring ) ) ;
@@ -1666,20 +1638,7 @@ pub mod tests {
16661638 // SAFETY: its a valid fd
16671639 unsafe { libc:: close ( th. net . lock ( ) . unwrap ( ) . tap . as_raw_fd ( ) ) } ;
16681640
1669- // The RX queue is empty and rx_deffered_frame is set.
1670- th. net ( ) . rx_deferred_frame = true ;
1671- check_metric_after_block ! (
1672- th. net( ) . metrics. no_rx_avail_buffer,
1673- 1 ,
1674- th. simulate_event( NetEvent :: Tap )
1675- ) ;
1676-
1677- // We need to set this here to false, otherwise the device will try to
1678- // handle a deferred frame, it will fail and will never try to read from
1679- // the tap.
1680- th. net ( ) . rx_deferred_frame = false ;
1681-
1682- // Fake an avail buffer; this time, tap reading should error out.
1641+ // Fake an avail buffer; tap reading should error out.
16831642 th. add_desc_chain ( NetQueue :: Rx , 0 , & [ ( 0 , 4096 , VIRTQ_DESC_F_WRITE ) ] ) ;
16841643 check_metric_after_block ! (
16851644 th. net( ) . metrics. tap_read_fails,
@@ -1688,59 +1647,6 @@ pub mod tests {
16881647 ) ;
16891648 }
16901649
1691- #[ test]
1692- fn test_deferred_frame ( ) {
1693- let mem = single_region_mem ( 2 * MAX_BUFFER_SIZE ) ;
1694- let mut th = TestHelper :: get_default ( & mem) ;
1695- th. activate_net ( ) ;
1696-
1697- let rx_packets_count = th. net ( ) . metrics . rx_packets_count . count ( ) ;
1698- let _ = inject_tap_tx_frame ( & th. net ( ) , 1000 ) ;
1699- // Trigger a Tap event that. This should fail since there
1700- // are not any available descriptors in the queue
1701- check_metric_after_block ! (
1702- th. net( ) . metrics. no_rx_avail_buffer,
1703- 1 ,
1704- th. simulate_event( NetEvent :: Tap )
1705- ) ;
1706- // The frame we read from the tap should be deferred now and
1707- // no frames should have been transmitted
1708- assert ! ( th. net( ) . rx_deferred_frame) ;
1709- assert_eq ! ( th. net( ) . metrics. rx_packets_count. count( ) , rx_packets_count) ;
1710-
1711- // Let's add a second frame, which should really have the same
1712- // fate.
1713- let _ = inject_tap_tx_frame ( & th. net ( ) , 1000 ) ;
1714-
1715- // Adding a descriptor in the queue. This should handle the first deferred
1716- // frame. However, this should try to handle the second tap as well and fail
1717- // since there's only one Descriptor Chain in the queue.
1718- th. add_desc_chain ( NetQueue :: Rx , 0 , & [ ( 0 , 4096 , VIRTQ_DESC_F_WRITE ) ] ) ;
1719- check_metric_after_block ! (
1720- th. net( ) . metrics. no_rx_avail_buffer,
1721- 1 ,
1722- th. simulate_event( NetEvent :: Tap )
1723- ) ;
1724- // We should still have a deferred frame
1725- assert ! ( th. net( ) . rx_deferred_frame) ;
1726- // However, we should have delivered the first frame
1727- assert_eq ! (
1728- th. net( ) . metrics. rx_packets_count. count( ) ,
1729- rx_packets_count + 1
1730- ) ;
1731-
1732- // Let's add one more descriptor and try to handle the last frame as well.
1733- th. add_desc_chain ( NetQueue :: Rx , 0 , & [ ( 0 , 4096 , VIRTQ_DESC_F_WRITE ) ] ) ;
1734- check_metric_after_block ! (
1735- th. net( ) . metrics. rx_packets_count,
1736- 1 ,
1737- th. simulate_event( NetEvent :: RxQueue )
1738- ) ;
1739-
1740- // We should be done with any deferred frame
1741- assert ! ( !th. net( ) . rx_deferred_frame) ;
1742- }
1743-
17441650 #[ test]
17451651 fn test_rx_rate_limiter_handling ( ) {
17461652 let mem = single_region_mem ( 2 * MAX_BUFFER_SIZE ) ;
@@ -1853,7 +1759,7 @@ pub mod tests {
18531759 let mut rl = RateLimiter :: new ( 1000 , 0 , 500 , 0 , 0 , 0 ) . unwrap ( ) ;
18541760
18551761 // set up RX
1856- assert ! ( ! th. net( ) . rx_deferred_frame ) ;
1762+ assert ! ( th. net( ) . deferred_rx_bytes . is_none ( ) ) ;
18571763 th. add_desc_chain ( NetQueue :: Rx , 0 , & [ ( 0 , 4096 , VIRTQ_DESC_F_WRITE ) ] ) ;
18581764
18591765 let frame = inject_tap_tx_frame ( & th. net ( ) , 1000 ) ;
@@ -1873,7 +1779,7 @@ pub mod tests {
18731779 // assert that limiter is blocked
18741780 assert ! ( th. net( ) . rx_rate_limiter. is_blocked( ) ) ;
18751781 assert_eq ! ( th. net( ) . metrics. rx_rate_limiter_throttled. count( ) , 1 ) ;
1876- assert ! ( th. net( ) . rx_deferred_frame ) ;
1782+ assert ! ( th. net( ) . deferred_rx_bytes . is_some ( ) ) ;
18771783 // assert that no operation actually completed (limiter blocked it)
18781784 assert ! ( & th. net( ) . irq_trigger. has_pending_irq( IrqType :: Vring ) ) ;
18791785 // make sure the data is still queued for processing
@@ -1971,7 +1877,7 @@ pub mod tests {
19711877 let mut rl = RateLimiter :: new ( 0 , 0 , 0 , 1 , 0 , 500 ) . unwrap ( ) ;
19721878
19731879 // set up RX
1974- assert ! ( ! th. net( ) . rx_deferred_frame ) ;
1880+ assert ! ( th. net( ) . deferred_rx_bytes . is_none ( ) ) ;
19751881 th. add_desc_chain ( NetQueue :: Rx , 0 , & [ ( 0 , 4096 , VIRTQ_DESC_F_WRITE ) ] ) ;
19761882 let frame = inject_tap_tx_frame ( & th. net ( ) , 1234 ) ;
19771883
@@ -1993,7 +1899,7 @@ pub mod tests {
19931899 // assert that limiter is blocked
19941900 assert ! ( th. net( ) . rx_rate_limiter. is_blocked( ) ) ;
19951901 assert ! ( th. net( ) . metrics. rx_rate_limiter_throttled. count( ) >= 1 ) ;
1996- assert ! ( th. net( ) . rx_deferred_frame ) ;
1902+ assert ! ( th. net( ) . deferred_rx_bytes . is_some ( ) ) ;
19971903 // assert that no operation actually completed (limiter blocked it)
19981904 assert ! ( & th. net( ) . irq_trigger. has_pending_irq( IrqType :: Vring ) ) ;
19991905 // make sure the data is still queued for processing
0 commit comments