2222#include <linux/interrupt.h>
2323#include <linux/firmware.h>
2424#include <linux/fs.h>
25+ #include <asm/atomic.h>
2526#include <uapi/linux/filter.h>
2627#include <init.h>
2728#include <irq_kern.h>
@@ -102,18 +103,33 @@ static const struct {
102103
103104static void vector_reset_stats (struct vector_private * vp )
104105{
106+ /* We reuse the existing queue locks for stats */
107+
108+ /* RX stats are modified with RX head_lock held
109+ * in vector_poll.
110+ */
111+
112+ spin_lock (& vp -> rx_queue -> head_lock );
105113 vp -> estats .rx_queue_max = 0 ;
106114 vp -> estats .rx_queue_running_average = 0 ;
107- vp -> estats .tx_queue_max = 0 ;
108- vp -> estats .tx_queue_running_average = 0 ;
109115 vp -> estats .rx_encaps_errors = 0 ;
116+ vp -> estats .sg_ok = 0 ;
117+ vp -> estats .sg_linearized = 0 ;
118+ spin_unlock (& vp -> rx_queue -> head_lock );
119+
120+ /* TX stats are modified with TX head_lock held
121+ * in vector_send.
122+ */
123+
124+ spin_lock (& vp -> tx_queue -> head_lock );
110125 vp -> estats .tx_timeout_count = 0 ;
111126 vp -> estats .tx_restart_queue = 0 ;
112127 vp -> estats .tx_kicks = 0 ;
113128 vp -> estats .tx_flow_control_xon = 0 ;
114129 vp -> estats .tx_flow_control_xoff = 0 ;
115- vp -> estats .sg_ok = 0 ;
116- vp -> estats .sg_linearized = 0 ;
130+ vp -> estats .tx_queue_max = 0 ;
131+ vp -> estats .tx_queue_running_average = 0 ;
132+ spin_unlock (& vp -> tx_queue -> head_lock );
117133}
118134
119135static int get_mtu (struct arglist * def )
@@ -232,12 +248,6 @@ static int get_transport_options(struct arglist *def)
232248
233249static char * drop_buffer ;
234250
235- /* Array backed queues optimized for bulk enqueue/dequeue and
236- * 1:N (small values of N) or 1:1 enqueuer/dequeuer ratios.
237- * For more details and full design rationale see
238- * http://foswiki.cambridgegreys.com/Main/EatYourTailAndEnjoyIt
239- */
240-
241251
242252/*
243253 * Advance the mmsg queue head by n = advance. Resets the queue to
@@ -247,27 +257,13 @@ static char *drop_buffer;
247257
248258static int vector_advancehead (struct vector_queue * qi , int advance )
249259{
250- int queue_depth ;
251-
252260 qi -> head =
253261 (qi -> head + advance )
254262 % qi -> max_depth ;
255263
256264
257- spin_lock (& qi -> tail_lock );
258- qi -> queue_depth -= advance ;
259-
260- /* we are at 0, use this to
261- * reset head and tail so we can use max size vectors
262- */
263-
264- if (qi -> queue_depth == 0 ) {
265- qi -> head = 0 ;
266- qi -> tail = 0 ;
267- }
268- queue_depth = qi -> queue_depth ;
269- spin_unlock (& qi -> tail_lock );
270- return queue_depth ;
265+ atomic_sub (advance , & qi -> queue_depth );
266+ return atomic_read (& qi -> queue_depth );
271267}
272268
273269/* Advance the queue tail by n = advance.
@@ -277,16 +273,11 @@ static int vector_advancehead(struct vector_queue *qi, int advance)
277273
278274static int vector_advancetail (struct vector_queue * qi , int advance )
279275{
280- int queue_depth ;
281-
282276 qi -> tail =
283277 (qi -> tail + advance )
284278 % qi -> max_depth ;
285- spin_lock (& qi -> head_lock );
286- qi -> queue_depth += advance ;
287- queue_depth = qi -> queue_depth ;
288- spin_unlock (& qi -> head_lock );
289- return queue_depth ;
279+ atomic_add (advance , & qi -> queue_depth );
280+ return atomic_read (& qi -> queue_depth );
290281}
291282
292283static int prep_msg (struct vector_private * vp ,
@@ -339,9 +330,7 @@ static int vector_enqueue(struct vector_queue *qi, struct sk_buff *skb)
339330 int iov_count ;
340331
341332 spin_lock (& qi -> tail_lock );
342- spin_lock (& qi -> head_lock );
343- queue_depth = qi -> queue_depth ;
344- spin_unlock (& qi -> head_lock );
333+ queue_depth = atomic_read (& qi -> queue_depth );
345334
346335 if (skb )
347336 packet_len = skb -> len ;
@@ -360,6 +349,7 @@ static int vector_enqueue(struct vector_queue *qi, struct sk_buff *skb)
360349 mmsg_vector -> msg_hdr .msg_iovlen = iov_count ;
361350 mmsg_vector -> msg_hdr .msg_name = vp -> fds -> remote_addr ;
362351 mmsg_vector -> msg_hdr .msg_namelen = vp -> fds -> remote_addr_size ;
352+ wmb (); /* Make the packet visible to the NAPI poll thread */
363353 queue_depth = vector_advancetail (qi , 1 );
364354 } else
365355 goto drop ;
@@ -398,7 +388,7 @@ static int consume_vector_skbs(struct vector_queue *qi, int count)
398388}
399389
400390/*
401- * Generic vector deque via sendmmsg with support for forming headers
391+ * Generic vector dequeue via sendmmsg with support for forming headers
402392 * using transport specific callback. Allows GRE, L2TPv3, RAW and
403393 * other transports to use a common dequeue procedure in vector mode
404394 */
@@ -408,69 +398,64 @@ static int vector_send(struct vector_queue *qi)
408398{
409399 struct vector_private * vp = netdev_priv (qi -> dev );
410400 struct mmsghdr * send_from ;
411- int result = 0 , send_len , queue_depth = qi -> max_depth ;
401+ int result = 0 , send_len ;
412402
413403 if (spin_trylock (& qi -> head_lock )) {
414- if (spin_trylock (& qi -> tail_lock )) {
415- /* update queue_depth to current value */
416- queue_depth = qi -> queue_depth ;
417- spin_unlock (& qi -> tail_lock );
418- while (queue_depth > 0 ) {
419- /* Calculate the start of the vector */
420- send_len = queue_depth ;
421- send_from = qi -> mmsg_vector ;
422- send_from += qi -> head ;
423- /* Adjust vector size if wraparound */
424- if (send_len + qi -> head > qi -> max_depth )
425- send_len = qi -> max_depth - qi -> head ;
426- /* Try to TX as many packets as possible */
427- if (send_len > 0 ) {
428- result = uml_vector_sendmmsg (
429- vp -> fds -> tx_fd ,
430- send_from ,
431- send_len ,
432- 0
433- );
434- vp -> in_write_poll =
435- (result != send_len );
436- }
437- /* For some of the sendmmsg error scenarios
438- * we may end being unsure in the TX success
439- * for all packets. It is safer to declare
440- * them all TX-ed and blame the network.
441- */
442- if (result < 0 ) {
443- if (net_ratelimit ())
444- netdev_err (vp -> dev , "sendmmsg err=%i\n" ,
445- result );
446- vp -> in_error = true;
447- result = send_len ;
448- }
449- if (result > 0 ) {
450- queue_depth =
451- consume_vector_skbs (qi , result );
452- /* This is equivalent to an TX IRQ.
453- * Restart the upper layers to feed us
454- * more packets.
455- */
456- if (result > vp -> estats .tx_queue_max )
457- vp -> estats .tx_queue_max = result ;
458- vp -> estats .tx_queue_running_average =
459- (vp -> estats .tx_queue_running_average + result ) >> 1 ;
460- }
461- netif_wake_queue (qi -> dev );
462- /* if TX is busy, break out of the send loop,
463- * poll write IRQ will reschedule xmit for us
404+ /* update queue_depth to current value */
405+ while (atomic_read (& qi -> queue_depth ) > 0 ) {
406+ /* Calculate the start of the vector */
407+ send_len = atomic_read (& qi -> queue_depth );
408+ send_from = qi -> mmsg_vector ;
409+ send_from += qi -> head ;
410+ /* Adjust vector size if wraparound */
411+ if (send_len + qi -> head > qi -> max_depth )
412+ send_len = qi -> max_depth - qi -> head ;
413+ /* Try to TX as many packets as possible */
414+ if (send_len > 0 ) {
415+ result = uml_vector_sendmmsg (
416+ vp -> fds -> tx_fd ,
417+ send_from ,
418+ send_len ,
419+ 0
420+ );
421+ vp -> in_write_poll =
422+ (result != send_len );
423+ }
424+ /* For some of the sendmmsg error scenarios
425+ * we may end being unsure in the TX success
426+ * for all packets. It is safer to declare
427+ * them all TX-ed and blame the network.
428+ */
429+ if (result < 0 ) {
430+ if (net_ratelimit ())
431+ netdev_err (vp -> dev , "sendmmsg err=%i\n" ,
432+ result );
433+ vp -> in_error = true;
434+ result = send_len ;
435+ }
436+ if (result > 0 ) {
437+ consume_vector_skbs (qi , result );
438+ /* This is equivalent to an TX IRQ.
439+ * Restart the upper layers to feed us
440+ * more packets.
464441 */
465- if (result != send_len ) {
466- vp -> estats .tx_restart_queue ++ ;
467- break ;
468- }
442+ if (result > vp -> estats .tx_queue_max )
443+ vp -> estats .tx_queue_max = result ;
444+ vp -> estats .tx_queue_running_average =
445+ (vp -> estats .tx_queue_running_average + result ) >> 1 ;
446+ }
447+ netif_wake_queue (qi -> dev );
448+ /* if TX is busy, break out of the send loop,
449+ * poll write IRQ will reschedule xmit for us.
450+ */
451+ if (result != send_len ) {
452+ vp -> estats .tx_restart_queue ++ ;
453+ break ;
469454 }
470455 }
471456 spin_unlock (& qi -> head_lock );
472457 }
473- return queue_depth ;
458+ return atomic_read ( & qi -> queue_depth ) ;
474459}
475460
476461/* Queue destructor. Deliberately stateless so we can use
@@ -589,7 +574,7 @@ static struct vector_queue *create_queue(
589574 }
590575 spin_lock_init (& result -> head_lock );
591576 spin_lock_init (& result -> tail_lock );
592- result -> queue_depth = 0 ;
577+ atomic_set ( & result -> queue_depth , 0 ) ;
593578 result -> head = 0 ;
594579 result -> tail = 0 ;
595580 return result ;
@@ -668,18 +653,27 @@ static struct sk_buff *prep_skb(
668653}
669654
670655
671- /* Prepare queue for recvmmsg one-shot rx - fill with fresh sk_buffs*/
656+ /* Prepare queue for recvmmsg one-shot rx - fill with fresh sk_buffs */
672657
673658static void prep_queue_for_rx (struct vector_queue * qi )
674659{
675660 struct vector_private * vp = netdev_priv (qi -> dev );
676661 struct mmsghdr * mmsg_vector = qi -> mmsg_vector ;
677662 void * * skbuff_vector = qi -> skbuff_vector ;
678- int i ;
663+ int i , queue_depth ;
664+
665+ queue_depth = atomic_read (& qi -> queue_depth );
679666
680- if (qi -> queue_depth == 0 )
667+ if (queue_depth == 0 )
681668 return ;
682- for (i = 0 ; i < qi -> queue_depth ; i ++ ) {
669+
670+ /* RX is always emptied 100% during each cycle, so we do not
671+ * have to do the tail wraparound math for it.
672+ */
673+
674+ qi -> head = qi -> tail = 0 ;
675+
676+ for (i = 0 ; i < queue_depth ; i ++ ) {
683677 /* it is OK if allocation fails - recvmmsg with NULL data in
684678 * iov argument still performs an RX, just drops the packet
685679 * This allows us stop faffing around with a "drop buffer"
@@ -689,7 +683,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
689683 skbuff_vector ++ ;
690684 mmsg_vector ++ ;
691685 }
692- qi -> queue_depth = 0 ;
686+ atomic_set ( & qi -> queue_depth , 0 ) ;
693687}
694688
695689static struct vector_device * find_device (int n )
@@ -985,7 +979,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
985979 * many do we need to prep the next time prep_queue_for_rx() is called.
986980 */
987981
988- qi -> queue_depth = packet_count ;
982+ atomic_add ( packet_count , & qi -> queue_depth ) ;
989983
990984 for (i = 0 ; i < packet_count ; i ++ ) {
991985 skb = (* skbuff_vector );
@@ -1172,13 +1166,15 @@ static int vector_poll(struct napi_struct *napi, int budget)
11721166
11731167 if ((vp -> options & VECTOR_TX ) != 0 )
11741168 tx_enqueued = (vector_send (vp -> tx_queue ) > 0 );
1169+ spin_lock (& vp -> rx_queue -> head_lock );
11751170 if ((vp -> options & VECTOR_RX ) > 0 )
11761171 err = vector_mmsg_rx (vp , budget );
11771172 else {
11781173 err = vector_legacy_rx (vp );
11791174 if (err > 0 )
11801175 err = 1 ;
11811176 }
1177+ spin_unlock (& vp -> rx_queue -> head_lock );
11821178 if (err > 0 )
11831179 work_done += err ;
11841180
@@ -1225,7 +1221,7 @@ static int vector_net_open(struct net_device *dev)
12251221 vp -> rx_header_size ,
12261222 MAX_IOV_SIZE
12271223 );
1228- vp -> rx_queue -> queue_depth = get_depth (vp -> parsed );
1224+ atomic_set ( & vp -> rx_queue -> queue_depth , get_depth (vp -> parsed ) );
12291225 } else {
12301226 vp -> header_rxbuffer = kmalloc (
12311227 vp -> rx_header_size ,
@@ -1467,7 +1463,17 @@ static void vector_get_ethtool_stats(struct net_device *dev,
14671463{
14681464 struct vector_private * vp = netdev_priv (dev );
14691465
1466+ /* Stats are modified in the dequeue portions of
1467+ * rx/tx which are protected by the head locks
1468+ * grabbing these locks here ensures they are up
1469+ * to date.
1470+ */
1471+
1472+ spin_lock (& vp -> tx_queue -> head_lock );
1473+ spin_lock (& vp -> rx_queue -> head_lock );
14701474 memcpy (tmp_stats , & vp -> estats , sizeof (struct vector_estats ));
1475+ spin_unlock (& vp -> rx_queue -> head_lock );
1476+ spin_unlock (& vp -> tx_queue -> head_lock );
14711477}
14721478
14731479static int vector_get_coalesce (struct net_device * netdev ,
0 commit comments