Skip to content

Commit 30f241f

Browse files
mfijalkoAlexei Starovoitov
authored andcommitted
xsk: Fix immature cq descriptor production
Eryk reported an issue that I have put under Closes: tag, related to umem addrs being prematurely produced onto pool's completion queue. Let us make the skb's destructor responsible for producing all addrs that given skb used. Commit from fixes tag introduced the buggy behavior, it was not broken from day 1, but rather when xsk multi-buffer got introduced. In order to mitigate performance impact as much as possible, mimic the linear and frag parts within skb by storing the first address from XSK descriptor at sk_buff::destructor_arg. For fragments, store them at ::cb via list. The nodes that will go onto list will be allocated via kmem_cache. xsk_destruct_skb() will consume address stored at ::destructor_arg and optionally go through list from ::cb, if count of descriptors associated with this particular skb is bigger than 1. Previous approach where whole array for storing UMEM addresses from XSK descriptors was pre-allocated during first fragment processing yielded too big performance regression for 64b traffic. In current approach impact is much reduced on my tests and for jumbo frames I observed traffic being slower by at most 9%. Magnus suggested to have this way of processing special cased for XDP_SHARED_UMEM, so we would identify this during bind and set different hooks for 'backpressure mechanism' on CQ and for skb destructor, but given that results looked promising on my side I decided to have a single data path for XSK generic Tx. I suppose other auxiliary stuff would have to land as well in order to make it work. Fixes: b7f72a3 ("xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path") Reported-by: Eryk Kubanski <[email protected]> Closes: https://lore.kernel.org/netdev/[email protected]/ Acked-by: Stanislav Fomichev <[email protected]> Signed-off-by: Maciej Fijalkowski <[email protected]> Tested-by: Jason Xing <[email protected]> Reviewed-by: Jason Xing <[email protected]> Link: https://lore.kernel.org/r/[email protected] Signed-off-by: Alexei Starovoitov <[email protected]>
1 parent 6c6f5c1 commit 30f241f

File tree

2 files changed

+111
-14
lines changed

2 files changed

+111
-14
lines changed

net/xdp/xsk.c

Lines changed: 99 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@
3636
#define TX_BATCH_SIZE 32
3737
#define MAX_PER_SOCKET_BUDGET 32
3838

39+
struct xsk_addr_node {
40+
u64 addr;
41+
struct list_head addr_node;
42+
};
43+
44+
struct xsk_addr_head {
45+
u32 num_descs;
46+
struct list_head addrs_list;
47+
};
48+
49+
static struct kmem_cache *xsk_tx_generic_cache;
50+
51+
#define XSKCB(skb) ((struct xsk_addr_head *)((skb)->cb))
52+
3953
void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
4054
{
4155
if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
@@ -532,24 +546,43 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
532546
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
533547
}
534548

535-
static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
549+
static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
536550
{
537551
unsigned long flags;
538552
int ret;
539553

540554
spin_lock_irqsave(&pool->cq_lock, flags);
541-
ret = xskq_prod_reserve_addr(pool->cq, addr);
555+
ret = xskq_prod_reserve(pool->cq);
542556
spin_unlock_irqrestore(&pool->cq_lock, flags);
543557

544558
return ret;
545559
}
546560

547-
static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
561+
static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool,
562+
struct sk_buff *skb)
548563
{
564+
struct xsk_addr_node *pos, *tmp;
565+
u32 descs_processed = 0;
549566
unsigned long flags;
567+
u32 idx;
550568

551569
spin_lock_irqsave(&pool->cq_lock, flags);
552-
xskq_prod_submit_n(pool->cq, n);
570+
idx = xskq_get_prod(pool->cq);
571+
572+
xskq_prod_write_addr(pool->cq, idx,
573+
(u64)(uintptr_t)skb_shinfo(skb)->destructor_arg);
574+
descs_processed++;
575+
576+
if (unlikely(XSKCB(skb)->num_descs > 1)) {
577+
list_for_each_entry_safe(pos, tmp, &XSKCB(skb)->addrs_list, addr_node) {
578+
xskq_prod_write_addr(pool->cq, idx + descs_processed,
579+
pos->addr);
580+
descs_processed++;
581+
list_del(&pos->addr_node);
582+
kmem_cache_free(xsk_tx_generic_cache, pos);
583+
}
584+
}
585+
xskq_prod_submit_n(pool->cq, descs_processed);
553586
spin_unlock_irqrestore(&pool->cq_lock, flags);
554587
}
555588

@@ -562,9 +595,14 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
562595
spin_unlock_irqrestore(&pool->cq_lock, flags);
563596
}
564597

598+
static void xsk_inc_num_desc(struct sk_buff *skb)
599+
{
600+
XSKCB(skb)->num_descs++;
601+
}
602+
565603
static u32 xsk_get_num_desc(struct sk_buff *skb)
566604
{
567-
return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
605+
return XSKCB(skb)->num_descs;
568606
}
569607

570608
static void xsk_destruct_skb(struct sk_buff *skb)
@@ -576,23 +614,33 @@ static void xsk_destruct_skb(struct sk_buff *skb)
576614
*compl->tx_timestamp = ktime_get_tai_fast_ns();
577615
}
578616

579-
xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
617+
xsk_cq_submit_addr_locked(xdp_sk(skb->sk)->pool, skb);
580618
sock_wfree(skb);
581619
}
582620

583-
static void xsk_set_destructor_arg(struct sk_buff *skb)
621+
static void xsk_set_destructor_arg(struct sk_buff *skb, u64 addr)
584622
{
585-
long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
586-
587-
skb_shinfo(skb)->destructor_arg = (void *)num;
623+
BUILD_BUG_ON(sizeof(struct xsk_addr_head) > sizeof(skb->cb));
624+
INIT_LIST_HEAD(&XSKCB(skb)->addrs_list);
625+
XSKCB(skb)->num_descs = 0;
626+
skb_shinfo(skb)->destructor_arg = (void *)(uintptr_t)addr;
588627
}
589628

590629
static void xsk_consume_skb(struct sk_buff *skb)
591630
{
592631
struct xdp_sock *xs = xdp_sk(skb->sk);
632+
u32 num_descs = xsk_get_num_desc(skb);
633+
struct xsk_addr_node *pos, *tmp;
634+
635+
if (unlikely(num_descs > 1)) {
636+
list_for_each_entry_safe(pos, tmp, &XSKCB(skb)->addrs_list, addr_node) {
637+
list_del(&pos->addr_node);
638+
kmem_cache_free(xsk_tx_generic_cache, pos);
639+
}
640+
}
593641

594642
skb->destructor = sock_wfree;
595-
xsk_cq_cancel_locked(xs->pool, xsk_get_num_desc(skb));
643+
xsk_cq_cancel_locked(xs->pool, num_descs);
596644
/* Free skb without triggering the perf drop trace */
597645
consume_skb(skb);
598646
xs->skb = NULL;
@@ -609,6 +657,7 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
609657
{
610658
struct xsk_buff_pool *pool = xs->pool;
611659
u32 hr, len, ts, offset, copy, copied;
660+
struct xsk_addr_node *xsk_addr;
612661
struct sk_buff *skb = xs->skb;
613662
struct page *page;
614663
void *buffer;
@@ -623,6 +672,19 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
623672
return ERR_PTR(err);
624673

625674
skb_reserve(skb, hr);
675+
676+
xsk_set_destructor_arg(skb, desc->addr);
677+
} else {
678+
xsk_addr = kmem_cache_zalloc(xsk_tx_generic_cache, GFP_KERNEL);
679+
if (!xsk_addr)
680+
return ERR_PTR(-ENOMEM);
681+
682+
/* in case of -EOVERFLOW that could happen below,
683+
* xsk_consume_skb() will release this node as whole skb
684+
* would be dropped, which implies freeing all list elements
685+
*/
686+
xsk_addr->addr = desc->addr;
687+
list_add_tail(&xsk_addr->addr_node, &XSKCB(skb)->addrs_list);
626688
}
627689

628690
addr = desc->addr;
@@ -694,8 +756,11 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
694756
err = skb_store_bits(skb, 0, buffer, len);
695757
if (unlikely(err))
696758
goto free_err;
759+
760+
xsk_set_destructor_arg(skb, desc->addr);
697761
} else {
698762
int nr_frags = skb_shinfo(skb)->nr_frags;
763+
struct xsk_addr_node *xsk_addr;
699764
struct page *page;
700765
u8 *vaddr;
701766

@@ -710,12 +775,22 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
710775
goto free_err;
711776
}
712777

778+
xsk_addr = kmem_cache_zalloc(xsk_tx_generic_cache, GFP_KERNEL);
779+
if (!xsk_addr) {
780+
__free_page(page);
781+
err = -ENOMEM;
782+
goto free_err;
783+
}
784+
713785
vaddr = kmap_local_page(page);
714786
memcpy(vaddr, buffer, len);
715787
kunmap_local(vaddr);
716788

717789
skb_add_rx_frag(skb, nr_frags, page, 0, len, PAGE_SIZE);
718790
refcount_add(PAGE_SIZE, &xs->sk.sk_wmem_alloc);
791+
792+
xsk_addr->addr = desc->addr;
793+
list_add_tail(&xsk_addr->addr_node, &XSKCB(skb)->addrs_list);
719794
}
720795

721796
if (first_frag && desc->options & XDP_TX_METADATA) {
@@ -759,7 +834,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
759834
skb->mark = READ_ONCE(xs->sk.sk_mark);
760835
skb->destructor = xsk_destruct_skb;
761836
xsk_tx_metadata_to_compl(meta, &skb_shinfo(skb)->xsk_meta);
762-
xsk_set_destructor_arg(skb);
837+
xsk_inc_num_desc(skb);
763838

764839
return skb;
765840

@@ -769,7 +844,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
769844

770845
if (err == -EOVERFLOW) {
771846
/* Drop the packet */
772-
xsk_set_destructor_arg(xs->skb);
847+
xsk_inc_num_desc(xs->skb);
773848
xsk_drop_skb(xs->skb);
774849
xskq_cons_release(xs->tx);
775850
} else {
@@ -812,7 +887,7 @@ static int __xsk_generic_xmit(struct sock *sk)
812887
* if there is space in it. This avoids having to implement
813888
* any buffering in the Tx path.
814889
*/
815-
err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
890+
err = xsk_cq_reserve_locked(xs->pool);
816891
if (err) {
817892
err = -EAGAIN;
818893
goto out;
@@ -1815,8 +1890,18 @@ static int __init xsk_init(void)
18151890
if (err)
18161891
goto out_pernet;
18171892

1893+
xsk_tx_generic_cache = kmem_cache_create("xsk_generic_xmit_cache",
1894+
sizeof(struct xsk_addr_node),
1895+
0, SLAB_HWCACHE_ALIGN, NULL);
1896+
if (!xsk_tx_generic_cache) {
1897+
err = -ENOMEM;
1898+
goto out_unreg_notif;
1899+
}
1900+
18181901
return 0;
18191902

1903+
out_unreg_notif:
1904+
unregister_netdevice_notifier(&xsk_netdev_notifier);
18201905
out_pernet:
18211906
unregister_pernet_subsys(&xsk_net_ops);
18221907
out_sk:

net/xdp/xsk_queue.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,11 @@ static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
344344

345345
/* Functions for producers */
346346

347+
static inline u32 xskq_get_prod(struct xsk_queue *q)
348+
{
349+
return READ_ONCE(q->ring->producer);
350+
}
351+
347352
static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
348353
{
349354
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
@@ -390,6 +395,13 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
390395
return 0;
391396
}
392397

398+
static inline void xskq_prod_write_addr(struct xsk_queue *q, u32 idx, u64 addr)
399+
{
400+
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
401+
402+
ring->desc[idx & q->ring_mask] = addr;
403+
}
404+
393405
static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
394406
u32 nb_entries)
395407
{

0 commit comments

Comments
 (0)