Skip to content

Commit dd9de52

Browse files
mfijalkoborkmann
authored andcommitted
xsk: Fix immature cq descriptor production
Eryk reported an issue that I have put under Closes: tag, related to umem addrs being prematurely produced onto pool's completion queue. Let us make the skb's destructor responsible for producing all addrs that given skb used. Introduce struct xsk_addrs which will carry descriptor count with array of addresses taken from processed descriptors that will be carried via skb_shared_info::destructor_arg. This way we can refer to it within xsk_destruct_skb(). In order to mitigate the overhead that will be coming from memory allocations, let us introduce kmem_cache of xsk_addrs. There will be a single kmem_cache for xsk generic xmit on the system. Commit from fixes tag introduced the buggy behavior, it was not broken from day 1, but rather when xsk multi-buffer got introduced. Fixes: b7f72a3 ("xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path") Reported-by: Eryk Kubanski <[email protected]> Signed-off-by: Maciej Fijalkowski <[email protected]> Signed-off-by: Daniel Borkmann <[email protected]> Acked-by: Magnus Karlsson <[email protected]> Acked-by: Stanislav Fomichev <[email protected]> Closes: https://lore.kernel.org/netdev/[email protected] Link: https://lore.kernel.org/bpf/[email protected]
1 parent 27861fc commit dd9de52

File tree

2 files changed

+91
-16
lines changed

2 files changed

+91
-16
lines changed

net/xdp/xsk.c

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@
3636
#define TX_BATCH_SIZE 32
3737
#define MAX_PER_SOCKET_BUDGET 32
3838

39+
struct xsk_addrs {
40+
u32 num_descs;
41+
u64 addrs[MAX_SKB_FRAGS + 1];
42+
};
43+
44+
static struct kmem_cache *xsk_tx_generic_cache;
45+
3946
void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
4047
{
4148
if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
@@ -532,25 +539,39 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
532539
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
533540
}
534541

535-
static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
542+
static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
536543
{
537544
unsigned long flags;
538545
int ret;
539546

540547
spin_lock_irqsave(&pool->cq_lock, flags);
541-
ret = xskq_prod_reserve_addr(pool->cq, addr);
548+
ret = xskq_prod_reserve(pool->cq);
542549
spin_unlock_irqrestore(&pool->cq_lock, flags);
543550

544551
return ret;
545552
}
546553

547-
static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
554+
static void xsk_cq_submit_addr_locked(struct xdp_sock *xs,
555+
struct sk_buff *skb)
548556
{
557+
struct xsk_buff_pool *pool = xs->pool;
558+
struct xsk_addrs *xsk_addrs;
549559
unsigned long flags;
560+
u32 num_desc, i;
561+
u32 idx;
562+
563+
xsk_addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
564+
num_desc = xsk_addrs->num_descs;
550565

551566
spin_lock_irqsave(&pool->cq_lock, flags);
552-
xskq_prod_submit_n(pool->cq, n);
567+
idx = xskq_get_prod(pool->cq);
568+
569+
for (i = 0; i < num_desc; i++)
570+
xskq_prod_write_addr(pool->cq, idx + i, xsk_addrs->addrs[i]);
571+
xskq_prod_submit_n(pool->cq, num_desc);
572+
553573
spin_unlock_irqrestore(&pool->cq_lock, flags);
574+
kmem_cache_free(xsk_tx_generic_cache, xsk_addrs);
554575
}
555576

556577
static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
@@ -562,11 +583,6 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
562583
spin_unlock_irqrestore(&pool->cq_lock, flags);
563584
}
564585

565-
static u32 xsk_get_num_desc(struct sk_buff *skb)
566-
{
567-
return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
568-
}
569-
570586
static void xsk_destruct_skb(struct sk_buff *skb)
571587
{
572588
struct xsk_tx_metadata_compl *compl = &skb_shinfo(skb)->xsk_meta;
@@ -576,21 +592,37 @@ static void xsk_destruct_skb(struct sk_buff *skb)
576592
*compl->tx_timestamp = ktime_get_tai_fast_ns();
577593
}
578594

579-
xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
595+
xsk_cq_submit_addr_locked(xdp_sk(skb->sk), skb);
580596
sock_wfree(skb);
581597
}
582598

583-
static void xsk_set_destructor_arg(struct sk_buff *skb)
599+
static u32 xsk_get_num_desc(struct sk_buff *skb)
584600
{
585-
long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
601+
struct xsk_addrs *addrs;
586602

587-
skb_shinfo(skb)->destructor_arg = (void *)num;
603+
addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
604+
return addrs->num_descs;
605+
}
606+
607+
static void xsk_set_destructor_arg(struct sk_buff *skb, struct xsk_addrs *addrs)
608+
{
609+
skb_shinfo(skb)->destructor_arg = (void *)addrs;
610+
}
611+
612+
static void xsk_inc_skb_descs(struct sk_buff *skb)
613+
{
614+
struct xsk_addrs *addrs;
615+
616+
addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
617+
addrs->num_descs++;
588618
}
589619

590620
static void xsk_consume_skb(struct sk_buff *skb)
591621
{
592622
struct xdp_sock *xs = xdp_sk(skb->sk);
593623

624+
kmem_cache_free(xsk_tx_generic_cache,
625+
(struct xsk_addrs *)skb_shinfo(skb)->destructor_arg);
594626
skb->destructor = sock_wfree;
595627
xsk_cq_cancel_locked(xs->pool, xsk_get_num_desc(skb));
596628
/* Free skb without triggering the perf drop trace */
@@ -609,6 +641,7 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
609641
{
610642
struct xsk_buff_pool *pool = xs->pool;
611643
u32 hr, len, ts, offset, copy, copied;
644+
struct xsk_addrs *addrs = NULL;
612645
struct sk_buff *skb = xs->skb;
613646
struct page *page;
614647
void *buffer;
@@ -623,6 +656,14 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
623656
return ERR_PTR(err);
624657

625658
skb_reserve(skb, hr);
659+
660+
addrs = kmem_cache_zalloc(xsk_tx_generic_cache, GFP_KERNEL);
661+
if (!addrs) {
662+
kfree(skb);
663+
return ERR_PTR(-ENOMEM);
664+
}
665+
666+
xsk_set_destructor_arg(skb, addrs);
626667
}
627668

628669
addr = desc->addr;
@@ -662,6 +703,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
662703
{
663704
struct xsk_tx_metadata *meta = NULL;
664705
struct net_device *dev = xs->dev;
706+
struct xsk_addrs *addrs = NULL;
665707
struct sk_buff *skb = xs->skb;
666708
bool first_frag = false;
667709
int err;
@@ -694,6 +736,15 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
694736
err = skb_store_bits(skb, 0, buffer, len);
695737
if (unlikely(err))
696738
goto free_err;
739+
740+
addrs = kmem_cache_zalloc(xsk_tx_generic_cache, GFP_KERNEL);
741+
if (!addrs) {
742+
err = -ENOMEM;
743+
goto free_err;
744+
}
745+
746+
xsk_set_destructor_arg(skb, addrs);
747+
697748
} else {
698749
int nr_frags = skb_shinfo(skb)->nr_frags;
699750
struct page *page;
@@ -759,7 +810,9 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
759810
skb->mark = READ_ONCE(xs->sk.sk_mark);
760811
skb->destructor = xsk_destruct_skb;
761812
xsk_tx_metadata_to_compl(meta, &skb_shinfo(skb)->xsk_meta);
762-
xsk_set_destructor_arg(skb);
813+
814+
addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
815+
addrs->addrs[addrs->num_descs++] = desc->addr;
763816

764817
return skb;
765818

@@ -769,7 +822,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
769822

770823
if (err == -EOVERFLOW) {
771824
/* Drop the packet */
772-
xsk_set_destructor_arg(xs->skb);
825+
xsk_inc_skb_descs(xs->skb);
773826
xsk_drop_skb(xs->skb);
774827
xskq_cons_release(xs->tx);
775828
} else {
@@ -812,7 +865,7 @@ static int __xsk_generic_xmit(struct sock *sk)
812865
* if there is space in it. This avoids having to implement
813866
* any buffering in the Tx path.
814867
*/
815-
err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
868+
err = xsk_cq_reserve_locked(xs->pool);
816869
if (err) {
817870
err = -EAGAIN;
818871
goto out;
@@ -1815,8 +1868,18 @@ static int __init xsk_init(void)
18151868
if (err)
18161869
goto out_pernet;
18171870

1871+
xsk_tx_generic_cache = kmem_cache_create("xsk_generic_xmit_cache",
1872+
sizeof(struct xsk_addrs), 0,
1873+
SLAB_HWCACHE_ALIGN, NULL);
1874+
if (!xsk_tx_generic_cache) {
1875+
err = -ENOMEM;
1876+
goto out_unreg_notif;
1877+
}
1878+
18181879
return 0;
18191880

1881+
out_unreg_notif:
1882+
unregister_netdevice_notifier(&xsk_netdev_notifier);
18201883
out_pernet:
18211884
unregister_pernet_subsys(&xsk_net_ops);
18221885
out_sk:

net/xdp/xsk_queue.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,11 @@ static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
344344

345345
/* Functions for producers */
346346

347+
static inline u32 xskq_get_prod(struct xsk_queue *q)
348+
{
349+
return READ_ONCE(q->ring->producer);
350+
}
351+
347352
static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
348353
{
349354
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
@@ -390,6 +395,13 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
390395
return 0;
391396
}
392397

398+
static inline void xskq_prod_write_addr(struct xsk_queue *q, u32 idx, u64 addr)
399+
{
400+
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
401+
402+
ring->desc[idx & q->ring_mask] = addr;
403+
}
404+
393405
static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
394406
u32 nb_entries)
395407
{

0 commit comments

Comments
 (0)