Skip to content

Commit c00c446

Browse files
mfijalkoborkmann
authored andcommitted
xsk: Fix backpressure mechanism on Tx
Commit d678cbd ("xsk: Fix handling of invalid descriptors in XSK TX batching API") fixed batch API usage against set of descriptors with invalid ones but introduced a problem when AF_XDP SW rings are smaller than HW ones. Mismatch of reported Tx'ed frames between HW generator and user space app was observed. It turned out that backpressure mechanism became a bottleneck when the amount of produced descriptors to CQ is lower than what we grabbed from XSK Tx ring. Say that 512 entries had been taken from XSK Tx ring but we had only 490 free entries in CQ. Then callsite (ZC driver) will produce only 490 entries onto HW Tx ring but 512 entries will be released from Tx ring and this is what will be seen by the user space. In order to fix this case, mix XSK Tx/CQ ring interractions by moving around internal functions and changing call order: * pull out xskq_prod_nb_free() from xskq_prod_reserve_addr_batch() up to xsk_tx_peek_release_desc_batch(); ** move xskq_cons_release_n() into xskq_cons_read_desc_batch() After doing so, algorithm can be described as follows: 1. lookup Tx entries 2. use value from 1. to reserve space in CQ (*) 3. Read from Tx ring as much descriptors as value from 2 3a. release descriptors from XSK Tx ring (**) 4. Finally produce addresses to CQ Fixes: d678cbd ("xsk: Fix handling of invalid descriptors in XSK TX batching API") Signed-off-by: Magnus Karlsson <[email protected]> Signed-off-by: Maciej Fijalkowski <[email protected]> Signed-off-by: Daniel Borkmann <[email protected]> Link: https://lore.kernel.org/bpf/[email protected]
1 parent c829dba commit c00c446

File tree

2 files changed

+21
-23
lines changed

2 files changed

+21
-23
lines changed

net/xdp/xsk.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -355,16 +355,15 @@ static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, u32 max_entr
355355
return nb_pkts;
356356
}
357357

358-
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max_entries)
358+
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts)
359359
{
360360
struct xdp_sock *xs;
361-
u32 nb_pkts;
362361

363362
rcu_read_lock();
364363
if (!list_is_singular(&pool->xsk_tx_list)) {
365364
/* Fallback to the non-batched version */
366365
rcu_read_unlock();
367-
return xsk_tx_peek_release_fallback(pool, max_entries);
366+
return xsk_tx_peek_release_fallback(pool, nb_pkts);
368367
}
369368

370369
xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
@@ -373,25 +372,26 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max_entries)
373372
goto out;
374373
}
375374

376-
max_entries = xskq_cons_nb_entries(xs->tx, max_entries);
377-
nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, max_entries);
378-
if (!nb_pkts) {
379-
xs->tx->queue_empty_descs++;
380-
goto out;
381-
}
375+
nb_pkts = xskq_cons_nb_entries(xs->tx, nb_pkts);
382376

383377
/* This is the backpressure mechanism for the Tx path. Try to
384378
* reserve space in the completion queue for all packets, but
385379
* if there are fewer slots available, just process that many
386380
* packets. This avoids having to implement any buffering in
387381
* the Tx path.
388382
*/
389-
nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, pool->tx_descs, nb_pkts);
383+
nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts);
390384
if (!nb_pkts)
391385
goto out;
392386

393-
xskq_cons_release_n(xs->tx, max_entries);
387+
nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, nb_pkts);
388+
if (!nb_pkts) {
389+
xs->tx->queue_empty_descs++;
390+
goto out;
391+
}
392+
394393
__xskq_cons_release(xs->tx);
394+
xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_pkts);
395395
xs->sk.sk_write_space(&xs->sk);
396396

397397
out:

net/xdp/xsk_queue.h

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
205205
return false;
206206
}
207207

208+
static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
209+
{
210+
q->cached_cons += cnt;
211+
}
212+
208213
static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
209214
u32 max)
210215
{
@@ -226,6 +231,8 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff
226231
cached_cons++;
227232
}
228233

234+
/* Release valid plus any invalid entries */
235+
xskq_cons_release_n(q, cached_cons - q->cached_cons);
229236
return nb_entries;
230237
}
231238

@@ -291,11 +298,6 @@ static inline void xskq_cons_release(struct xsk_queue *q)
291298
q->cached_cons++;
292299
}
293300

294-
static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
295-
{
296-
q->cached_cons += cnt;
297-
}
298-
299301
static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
300302
{
301303
/* No barriers needed since data is not accessed */
@@ -350,21 +352,17 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
350352
return 0;
351353
}
352354

353-
static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
354-
u32 max)
355+
static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
356+
u32 nb_entries)
355357
{
356358
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
357-
u32 nb_entries, i, cached_prod;
358-
359-
nb_entries = xskq_prod_nb_free(q, max);
359+
u32 i, cached_prod;
360360

361361
/* A, matches D */
362362
cached_prod = q->cached_prod;
363363
for (i = 0; i < nb_entries; i++)
364364
ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
365365
q->cached_prod = cached_prod;
366-
367-
return nb_entries;
368366
}
369367

370368
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,

0 commit comments

Comments
 (0)