Skip to content

Commit 612a8c8

Browse files
kot-begemot-ukrichardweinberger
authored andcommitted
um: vector: Replace locks guarding queue depth with atomics
UML vector drivers use ring buffer structures which map preallocated skbs onto mmsg vectors for use with sendmmsg and recvmmsg. They are designed around a single consumer, single producer pattern allowing simultaneous enqueue and dequeue. Lock debugging with preemption showed possible races when locking the queue depth. This patch addresses this by removing extra locks, adding barriers and making queue depth inc/dec and access atomic. Signed-off-by: Anton Ivanov <[email protected]> Signed-off-by: Richard Weinberger <[email protected]>
1 parent ec24b98 commit 612a8c8

File tree

2 files changed

+110
-102
lines changed

2 files changed

+110
-102
lines changed

arch/um/drivers/vector_kern.c

Lines changed: 107 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <linux/interrupt.h>
2323
#include <linux/firmware.h>
2424
#include <linux/fs.h>
25+
#include <asm/atomic.h>
2526
#include <uapi/linux/filter.h>
2627
#include <init.h>
2728
#include <irq_kern.h>
@@ -102,18 +103,33 @@ static const struct {
102103

103104
static void vector_reset_stats(struct vector_private *vp)
104105
{
106+
/* We reuse the existing queue locks for stats */
107+
108+
/* RX stats are modified with RX head_lock held
109+
* in vector_poll.
110+
*/
111+
112+
spin_lock(&vp->rx_queue->head_lock);
105113
vp->estats.rx_queue_max = 0;
106114
vp->estats.rx_queue_running_average = 0;
107-
vp->estats.tx_queue_max = 0;
108-
vp->estats.tx_queue_running_average = 0;
109115
vp->estats.rx_encaps_errors = 0;
116+
vp->estats.sg_ok = 0;
117+
vp->estats.sg_linearized = 0;
118+
spin_unlock(&vp->rx_queue->head_lock);
119+
120+
/* TX stats are modified with TX head_lock held
121+
* in vector_send.
122+
*/
123+
124+
spin_lock(&vp->tx_queue->head_lock);
110125
vp->estats.tx_timeout_count = 0;
111126
vp->estats.tx_restart_queue = 0;
112127
vp->estats.tx_kicks = 0;
113128
vp->estats.tx_flow_control_xon = 0;
114129
vp->estats.tx_flow_control_xoff = 0;
115-
vp->estats.sg_ok = 0;
116-
vp->estats.sg_linearized = 0;
130+
vp->estats.tx_queue_max = 0;
131+
vp->estats.tx_queue_running_average = 0;
132+
spin_unlock(&vp->tx_queue->head_lock);
117133
}
118134

119135
static int get_mtu(struct arglist *def)
@@ -232,12 +248,6 @@ static int get_transport_options(struct arglist *def)
232248

233249
static char *drop_buffer;
234250

235-
/* Array backed queues optimized for bulk enqueue/dequeue and
236-
* 1:N (small values of N) or 1:1 enqueuer/dequeuer ratios.
237-
* For more details and full design rationale see
238-
* http://foswiki.cambridgegreys.com/Main/EatYourTailAndEnjoyIt
239-
*/
240-
241251

242252
/*
243253
* Advance the mmsg queue head by n = advance. Resets the queue to
@@ -247,27 +257,13 @@ static char *drop_buffer;
247257

248258
static int vector_advancehead(struct vector_queue *qi, int advance)
249259
{
250-
int queue_depth;
251-
252260
qi->head =
253261
(qi->head + advance)
254262
% qi->max_depth;
255263

256264

257-
spin_lock(&qi->tail_lock);
258-
qi->queue_depth -= advance;
259-
260-
/* we are at 0, use this to
261-
* reset head and tail so we can use max size vectors
262-
*/
263-
264-
if (qi->queue_depth == 0) {
265-
qi->head = 0;
266-
qi->tail = 0;
267-
}
268-
queue_depth = qi->queue_depth;
269-
spin_unlock(&qi->tail_lock);
270-
return queue_depth;
265+
atomic_sub(advance, &qi->queue_depth);
266+
return atomic_read(&qi->queue_depth);
271267
}
272268

273269
/* Advance the queue tail by n = advance.
@@ -277,16 +273,11 @@ static int vector_advancehead(struct vector_queue *qi, int advance)
277273

278274
static int vector_advancetail(struct vector_queue *qi, int advance)
279275
{
280-
int queue_depth;
281-
282276
qi->tail =
283277
(qi->tail + advance)
284278
% qi->max_depth;
285-
spin_lock(&qi->head_lock);
286-
qi->queue_depth += advance;
287-
queue_depth = qi->queue_depth;
288-
spin_unlock(&qi->head_lock);
289-
return queue_depth;
279+
atomic_add(advance, &qi->queue_depth);
280+
return atomic_read(&qi->queue_depth);
290281
}
291282

292283
static int prep_msg(struct vector_private *vp,
@@ -339,9 +330,7 @@ static int vector_enqueue(struct vector_queue *qi, struct sk_buff *skb)
339330
int iov_count;
340331

341332
spin_lock(&qi->tail_lock);
342-
spin_lock(&qi->head_lock);
343-
queue_depth = qi->queue_depth;
344-
spin_unlock(&qi->head_lock);
333+
queue_depth = atomic_read(&qi->queue_depth);
345334

346335
if (skb)
347336
packet_len = skb->len;
@@ -360,6 +349,7 @@ static int vector_enqueue(struct vector_queue *qi, struct sk_buff *skb)
360349
mmsg_vector->msg_hdr.msg_iovlen = iov_count;
361350
mmsg_vector->msg_hdr.msg_name = vp->fds->remote_addr;
362351
mmsg_vector->msg_hdr.msg_namelen = vp->fds->remote_addr_size;
352+
wmb(); /* Make the packet visible to the NAPI poll thread */
363353
queue_depth = vector_advancetail(qi, 1);
364354
} else
365355
goto drop;
@@ -398,7 +388,7 @@ static int consume_vector_skbs(struct vector_queue *qi, int count)
398388
}
399389

400390
/*
401-
* Generic vector deque via sendmmsg with support for forming headers
391+
* Generic vector dequeue via sendmmsg with support for forming headers
402392
* using transport specific callback. Allows GRE, L2TPv3, RAW and
403393
* other transports to use a common dequeue procedure in vector mode
404394
*/
@@ -408,69 +398,64 @@ static int vector_send(struct vector_queue *qi)
408398
{
409399
struct vector_private *vp = netdev_priv(qi->dev);
410400
struct mmsghdr *send_from;
411-
int result = 0, send_len, queue_depth = qi->max_depth;
401+
int result = 0, send_len;
412402

413403
if (spin_trylock(&qi->head_lock)) {
414-
if (spin_trylock(&qi->tail_lock)) {
415-
/* update queue_depth to current value */
416-
queue_depth = qi->queue_depth;
417-
spin_unlock(&qi->tail_lock);
418-
while (queue_depth > 0) {
419-
/* Calculate the start of the vector */
420-
send_len = queue_depth;
421-
send_from = qi->mmsg_vector;
422-
send_from += qi->head;
423-
/* Adjust vector size if wraparound */
424-
if (send_len + qi->head > qi->max_depth)
425-
send_len = qi->max_depth - qi->head;
426-
/* Try to TX as many packets as possible */
427-
if (send_len > 0) {
428-
result = uml_vector_sendmmsg(
429-
vp->fds->tx_fd,
430-
send_from,
431-
send_len,
432-
0
433-
);
434-
vp->in_write_poll =
435-
(result != send_len);
436-
}
437-
/* For some of the sendmmsg error scenarios
438-
* we may end being unsure in the TX success
439-
* for all packets. It is safer to declare
440-
* them all TX-ed and blame the network.
441-
*/
442-
if (result < 0) {
443-
if (net_ratelimit())
444-
netdev_err(vp->dev, "sendmmsg err=%i\n",
445-
result);
446-
vp->in_error = true;
447-
result = send_len;
448-
}
449-
if (result > 0) {
450-
queue_depth =
451-
consume_vector_skbs(qi, result);
452-
/* This is equivalent to an TX IRQ.
453-
* Restart the upper layers to feed us
454-
* more packets.
455-
*/
456-
if (result > vp->estats.tx_queue_max)
457-
vp->estats.tx_queue_max = result;
458-
vp->estats.tx_queue_running_average =
459-
(vp->estats.tx_queue_running_average + result) >> 1;
460-
}
461-
netif_wake_queue(qi->dev);
462-
/* if TX is busy, break out of the send loop,
463-
* poll write IRQ will reschedule xmit for us
404+
/* update queue_depth to current value */
405+
while (atomic_read(&qi->queue_depth) > 0) {
406+
/* Calculate the start of the vector */
407+
send_len = atomic_read(&qi->queue_depth);
408+
send_from = qi->mmsg_vector;
409+
send_from += qi->head;
410+
/* Adjust vector size if wraparound */
411+
if (send_len + qi->head > qi->max_depth)
412+
send_len = qi->max_depth - qi->head;
413+
/* Try to TX as many packets as possible */
414+
if (send_len > 0) {
415+
result = uml_vector_sendmmsg(
416+
vp->fds->tx_fd,
417+
send_from,
418+
send_len,
419+
0
420+
);
421+
vp->in_write_poll =
422+
(result != send_len);
423+
}
424+
/* For some of the sendmmsg error scenarios
425+
* we may end being unsure in the TX success
426+
* for all packets. It is safer to declare
427+
* them all TX-ed and blame the network.
428+
*/
429+
if (result < 0) {
430+
if (net_ratelimit())
431+
netdev_err(vp->dev, "sendmmsg err=%i\n",
432+
result);
433+
vp->in_error = true;
434+
result = send_len;
435+
}
436+
if (result > 0) {
437+
consume_vector_skbs(qi, result);
438+
/* This is equivalent to an TX IRQ.
439+
* Restart the upper layers to feed us
440+
* more packets.
464441
*/
465-
if (result != send_len) {
466-
vp->estats.tx_restart_queue++;
467-
break;
468-
}
442+
if (result > vp->estats.tx_queue_max)
443+
vp->estats.tx_queue_max = result;
444+
vp->estats.tx_queue_running_average =
445+
(vp->estats.tx_queue_running_average + result) >> 1;
446+
}
447+
netif_wake_queue(qi->dev);
448+
/* if TX is busy, break out of the send loop,
449+
* poll write IRQ will reschedule xmit for us.
450+
*/
451+
if (result != send_len) {
452+
vp->estats.tx_restart_queue++;
453+
break;
469454
}
470455
}
471456
spin_unlock(&qi->head_lock);
472457
}
473-
return queue_depth;
458+
return atomic_read(&qi->queue_depth);
474459
}
475460

476461
/* Queue destructor. Deliberately stateless so we can use
@@ -589,7 +574,7 @@ static struct vector_queue *create_queue(
589574
}
590575
spin_lock_init(&result->head_lock);
591576
spin_lock_init(&result->tail_lock);
592-
result->queue_depth = 0;
577+
atomic_set(&result->queue_depth, 0);
593578
result->head = 0;
594579
result->tail = 0;
595580
return result;
@@ -668,18 +653,27 @@ static struct sk_buff *prep_skb(
668653
}
669654

670655

671-
/* Prepare queue for recvmmsg one-shot rx - fill with fresh sk_buffs*/
656+
/* Prepare queue for recvmmsg one-shot rx - fill with fresh sk_buffs */
672657

673658
static void prep_queue_for_rx(struct vector_queue *qi)
674659
{
675660
struct vector_private *vp = netdev_priv(qi->dev);
676661
struct mmsghdr *mmsg_vector = qi->mmsg_vector;
677662
void **skbuff_vector = qi->skbuff_vector;
678-
int i;
663+
int i, queue_depth;
664+
665+
queue_depth = atomic_read(&qi->queue_depth);
679666

680-
if (qi->queue_depth == 0)
667+
if (queue_depth == 0)
681668
return;
682-
for (i = 0; i < qi->queue_depth; i++) {
669+
670+
/* RX is always emptied 100% during each cycle, so we do not
671+
* have to do the tail wraparound math for it.
672+
*/
673+
674+
qi->head = qi->tail = 0;
675+
676+
for (i = 0; i < queue_depth; i++) {
683677
/* it is OK if allocation fails - recvmmsg with NULL data in
684678
* iov argument still performs an RX, just drops the packet
685679
* This allows us stop faffing around with a "drop buffer"
@@ -689,7 +683,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
689683
skbuff_vector++;
690684
mmsg_vector++;
691685
}
692-
qi->queue_depth = 0;
686+
atomic_set(&qi->queue_depth, 0);
693687
}
694688

695689
static struct vector_device *find_device(int n)
@@ -985,7 +979,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
985979
* many do we need to prep the next time prep_queue_for_rx() is called.
986980
*/
987981

988-
qi->queue_depth = packet_count;
982+
atomic_add(packet_count, &qi->queue_depth);
989983

990984
for (i = 0; i < packet_count; i++) {
991985
skb = (*skbuff_vector);
@@ -1172,13 +1166,15 @@ static int vector_poll(struct napi_struct *napi, int budget)
11721166

11731167
if ((vp->options & VECTOR_TX) != 0)
11741168
tx_enqueued = (vector_send(vp->tx_queue) > 0);
1169+
spin_lock(&vp->rx_queue->head_lock);
11751170
if ((vp->options & VECTOR_RX) > 0)
11761171
err = vector_mmsg_rx(vp, budget);
11771172
else {
11781173
err = vector_legacy_rx(vp);
11791174
if (err > 0)
11801175
err = 1;
11811176
}
1177+
spin_unlock(&vp->rx_queue->head_lock);
11821178
if (err > 0)
11831179
work_done += err;
11841180

@@ -1225,7 +1221,7 @@ static int vector_net_open(struct net_device *dev)
12251221
vp->rx_header_size,
12261222
MAX_IOV_SIZE
12271223
);
1228-
vp->rx_queue->queue_depth = get_depth(vp->parsed);
1224+
atomic_set(&vp->rx_queue->queue_depth, get_depth(vp->parsed));
12291225
} else {
12301226
vp->header_rxbuffer = kmalloc(
12311227
vp->rx_header_size,
@@ -1467,7 +1463,17 @@ static void vector_get_ethtool_stats(struct net_device *dev,
14671463
{
14681464
struct vector_private *vp = netdev_priv(dev);
14691465

1466+
/* Stats are modified in the dequeue portions of
1467+
* rx/tx which are protected by the head locks
1468+
* grabbing these locks here ensures they are up
1469+
* to date.
1470+
*/
1471+
1472+
spin_lock(&vp->tx_queue->head_lock);
1473+
spin_lock(&vp->rx_queue->head_lock);
14701474
memcpy(tmp_stats, &vp->estats, sizeof(struct vector_estats));
1475+
spin_unlock(&vp->rx_queue->head_lock);
1476+
spin_unlock(&vp->tx_queue->head_lock);
14711477
}
14721478

14731479
static int vector_get_coalesce(struct net_device *netdev,

arch/um/drivers/vector_kern.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <linux/ctype.h>
1515
#include <linux/workqueue.h>
1616
#include <linux/interrupt.h>
17+
#include <asm/atomic.h>
1718

1819
#include "vector_user.h"
1920

@@ -44,7 +45,8 @@ struct vector_queue {
4445
struct net_device *dev;
4546
spinlock_t head_lock;
4647
spinlock_t tail_lock;
47-
int queue_depth, head, tail, max_depth, max_iov_frags;
48+
atomic_t queue_depth;
49+
int head, tail, max_depth, max_iov_frags;
4850
short options;
4951
};
5052

0 commit comments

Comments
 (0)