Skip to content

xsk: fix immature cq descriptor production #5737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/net/xdp_sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct xdp_sock {
XSK_BOUND,
XSK_UNBOUND,
} state;
struct kmem_cache *generic_cache;

struct xsk_queue *tx ____cacheline_aligned_in_smp;
struct list_head tx_list;
Expand Down
140 changes: 122 additions & 18 deletions net/xdp/xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@
#define TX_BATCH_SIZE 32
#define MAX_PER_SOCKET_BUDGET 32

struct xsk_addrs {
u32 num_descs;
u64 addrs[MAX_SKB_FRAGS + 1];
};

struct xsk_generic_cache {
struct kmem_cache *cache;
refcount_t users;
};

DEFINE_PER_CPU(struct xsk_generic_cache, system_xsk_generic_cache);

void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
{
if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
Expand Down Expand Up @@ -532,25 +544,39 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
}

static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
{
unsigned long flags;
int ret;

spin_lock_irqsave(&pool->cq_lock, flags);
ret = xskq_prod_reserve_addr(pool->cq, addr);
ret = xskq_prod_reserve(pool->cq);
spin_unlock_irqrestore(&pool->cq_lock, flags);

return ret;
}

static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
static void xsk_cq_submit_addr_locked(struct xdp_sock *xs,
struct sk_buff *skb)
{
struct xsk_buff_pool *pool = xs->pool;
struct xsk_addrs *xsk_addrs;
unsigned long flags;
u32 num_desc, i;
u32 idx;

xsk_addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
num_desc = xsk_addrs->num_descs;

spin_lock_irqsave(&pool->cq_lock, flags);
xskq_prod_submit_n(pool->cq, n);
idx = xskq_get_prod(pool->cq);

for (i = 0; i < num_desc; i++)
xskq_prod_write_addr(pool->cq, idx + i, xsk_addrs->addrs[i]);
xskq_prod_submit_n(pool->cq, num_desc);

spin_unlock_irqrestore(&pool->cq_lock, flags);
kmem_cache_free(xs->generic_cache, xsk_addrs);
}

static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
Expand All @@ -562,11 +588,6 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
spin_unlock_irqrestore(&pool->cq_lock, flags);
}

static u32 xsk_get_num_desc(struct sk_buff *skb)
{
return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
}

static void xsk_destruct_skb(struct sk_buff *skb)
{
struct xsk_tx_metadata_compl *compl = &skb_shinfo(skb)->xsk_meta;
Expand All @@ -576,21 +597,37 @@ static void xsk_destruct_skb(struct sk_buff *skb)
*compl->tx_timestamp = ktime_get_tai_fast_ns();
}

xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
xsk_cq_submit_addr_locked(xdp_sk(skb->sk), skb);
sock_wfree(skb);
}

static void xsk_set_destructor_arg(struct sk_buff *skb)
static u32 xsk_get_num_desc(struct sk_buff *skb)
{
long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
struct xsk_addrs *addrs;

addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
return addrs->num_descs;
}

skb_shinfo(skb)->destructor_arg = (void *)num;
static void xsk_set_destructor_arg(struct sk_buff *skb, struct xsk_addrs *addrs)
{
skb_shinfo(skb)->destructor_arg = (void *)addrs;
}

static void xsk_inc_skb_descs(struct sk_buff *skb)
{
struct xsk_addrs *addrs;

addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
addrs->num_descs++;
}

static void xsk_consume_skb(struct sk_buff *skb)
{
struct xdp_sock *xs = xdp_sk(skb->sk);

kmem_cache_free(xs->generic_cache,
(struct xsk_addrs *)skb_shinfo(skb)->destructor_arg);
skb->destructor = sock_wfree;
xsk_cq_cancel_locked(xs->pool, xsk_get_num_desc(skb));
/* Free skb without triggering the perf drop trace */
Expand All @@ -605,10 +642,12 @@ static void xsk_drop_skb(struct sk_buff *skb)
}

static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
struct xdp_desc *desc)
struct xdp_desc *desc,
struct kmem_cache *cache)
{
struct xsk_buff_pool *pool = xs->pool;
u32 hr, len, ts, offset, copy, copied;
struct xsk_addrs *addrs = NULL;
struct sk_buff *skb = xs->skb;
struct page *page;
void *buffer;
Expand All @@ -623,6 +662,12 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
return ERR_PTR(err);

skb_reserve(skb, hr);

addrs = kmem_cache_zalloc(cache, GFP_KERNEL);
if (!addrs)
return ERR_PTR(-ENOMEM);

xsk_set_destructor_arg(skb, addrs);
}

addr = desc->addr;
Expand Down Expand Up @@ -662,12 +707,13 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
{
struct xsk_tx_metadata *meta = NULL;
struct net_device *dev = xs->dev;
struct xsk_addrs *addrs = NULL;
struct sk_buff *skb = xs->skb;
bool first_frag = false;
int err;

if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
skb = xsk_build_skb_zerocopy(xs, desc);
skb = xsk_build_skb_zerocopy(xs, desc, xs->generic_cache);
if (IS_ERR(skb)) {
err = PTR_ERR(skb);
goto free_err;
Expand All @@ -694,6 +740,15 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err))
goto free_err;

addrs = kmem_cache_zalloc(xs->generic_cache, GFP_KERNEL);
if (!addrs) {
err = -ENOMEM;
goto free_err;
}

xsk_set_destructor_arg(skb, addrs);

} else {
int nr_frags = skb_shinfo(skb)->nr_frags;
struct page *page;
Expand Down Expand Up @@ -759,7 +814,9 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
skb->mark = READ_ONCE(xs->sk.sk_mark);
skb->destructor = xsk_destruct_skb;
xsk_tx_metadata_to_compl(meta, &skb_shinfo(skb)->xsk_meta);
xsk_set_destructor_arg(skb);

addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
addrs->addrs[addrs->num_descs++] = desc->addr;

return skb;

Expand All @@ -769,7 +826,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,

if (err == -EOVERFLOW) {
/* Drop the packet */
xsk_set_destructor_arg(xs->skb);
xsk_inc_skb_descs(xs->skb);
xsk_drop_skb(xs->skb);
xskq_cons_release(xs->tx);
} else {
Expand Down Expand Up @@ -812,7 +869,7 @@ static int __xsk_generic_xmit(struct sock *sk)
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
err = xsk_cq_reserve_locked(xs->pool);
if (err) {
err = -EAGAIN;
goto out;
Expand Down Expand Up @@ -1095,6 +1152,7 @@ static void xsk_delete_from_maps(struct xdp_sock *xs)

static int xsk_release(struct socket *sock)
{
struct xsk_generic_cache *pcpu_cache;
struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);
struct net *net;
Expand Down Expand Up @@ -1123,6 +1181,15 @@ static int xsk_release(struct socket *sock)
xskq_destroy(xs->fq_tmp);
xskq_destroy(xs->cq_tmp);

pcpu_cache = per_cpu_ptr(&system_xsk_generic_cache, xs->queue_id);
if (pcpu_cache->cache) {
if (refcount_dec_and_test(&pcpu_cache->users)) {
kmem_cache_destroy(pcpu_cache->cache);
pcpu_cache->cache = NULL;
xs->generic_cache = NULL;
}
}

sock_orphan(sk);
sock->sk = NULL;

Expand Down Expand Up @@ -1153,6 +1220,33 @@ static bool xsk_validate_queues(struct xdp_sock *xs)
return xs->fq_tmp && xs->cq_tmp;
}

static int xsk_alloc_generic_xmit_cache(struct xdp_sock *xs, u16 qid)
{
struct xsk_generic_cache *pcpu_cache =
per_cpu_ptr(&system_xsk_generic_cache, qid);
struct kmem_cache *cache;
char cache_name[32];

if (refcount_read(&pcpu_cache->users) > 0) {
refcount_inc(&pcpu_cache->users);
xs->generic_cache = pcpu_cache->cache;
return 0;
}

snprintf(cache_name, sizeof(cache_name),
"xsk_generic_xmit_cache%d", qid);
cache = kmem_cache_create(cache_name, sizeof(struct xsk_addrs), 0,
SLAB_HWCACHE_ALIGN, NULL);
if (!cache)
return -ENOMEM;

refcount_set(&pcpu_cache->users, 1);
pcpu_cache->cache = cache;
xs->generic_cache = pcpu_cache->cache;

return 0;
}

static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
{
struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr;
Expand Down Expand Up @@ -1306,6 +1400,16 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
xs->zc = xs->umem->zc;
xs->sg = !!(xs->umem->flags & XDP_UMEM_SG_FLAG);
xs->queue_id = qid;

if (!xs->zc) {
err = xsk_alloc_generic_xmit_cache(xs, qid);
if (err) {
xp_destroy(xs->pool);
xs->pool = NULL;
goto out_unlock;
}
}

xp_add_xsk(xs->pool, xs);

if (qid < dev->real_num_rx_queues) {
Expand Down
12 changes: 12 additions & 0 deletions net/xdp/xsk_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ static inline u32 xskq_cons_present_entries(struct xsk_queue *q)

/* Functions for producers */

static inline u32 xskq_get_prod(struct xsk_queue *q)
{
return READ_ONCE(q->ring->producer);
}

static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
{
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
Expand Down Expand Up @@ -390,6 +395,13 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
return 0;
}

static inline void xskq_prod_write_addr(struct xsk_queue *q, u32 idx, u64 addr)
{
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;

ring->desc[idx & q->ring_mask] = addr;
}

static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
u32 nb_entries)
{
Expand Down
Loading