Skip to content

Commit 29a1d6a

Browse files
rjarrymaxime-leroy
authored andcommitted
infra: add TX queue sharing support
Some network devices report fewer TX queues than the number of datapath workers. Cap the TX queue count to the device maximum and distribute workers across available queues using modulo assignment. When multiple workers share the same TX queue, protect rte_eth_tx_burst() with a per-queue spinlock stored in iface_info_port. The lock pointer in tx_node_ctx is only set for shared queues to avoid locking overhead when a worker has exclusive access. Update unit tests to set max_tx_queues in the mock device info and adjust expected TX queue assignments for modulo wrapping. Signed-off-by: Robin Jarry <rjarry@redhat.com>
1 parent 4bbfb62 commit 29a1d6a

File tree

7 files changed

+58
-14
lines changed

7 files changed

+58
-14
lines changed

modules/infra/control/gr_port.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <rte_ethdev.h>
1010
#include <rte_ether.h>
1111
#include <rte_mempool.h>
12+
#include <rte_spinlock.h>
1213

1314
#include <stdint.h>
1415
#include <sys/queue.h>
@@ -32,6 +33,7 @@ GR_IFACE_INFO(GR_IFACE_TYPE_PORT, iface_info_port, {
3233
struct rte_mempool *pool;
3334
char *devargs;
3435
uint32_t pool_size;
36+
rte_spinlock_t txq_locks[RTE_MAX_QUEUES_PER_PORT];
3537
struct {
3638
mac_filter_flags_t flags;
3739
unsigned hw_limit;

modules/infra/control/graph.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_
164164
struct tx_node_ctx *ctx = tx_node_ctx(node);
165165
ctx->txq.port_id = UINT16_MAX;
166166
ctx->txq.queue_id = UINT16_MAX;
167+
ctx->lock = NULL;
167168
}
168169

169170
// initialize the port_output node context to point to invalid edges
@@ -191,6 +192,26 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_
191192
ctx->txq.port_id = qmap->port_id;
192193
ctx->txq.queue_id = qmap->queue_id;
193194

195+
// set spinlock only if multiple workers share this TX queue
196+
unsigned txq_users = 0;
197+
struct worker *w = NULL;
198+
STAILQ_FOREACH (w, &workers, next) {
199+
gr_vec_foreach_ref (struct queue_map *q, w->txqs) {
200+
if (q->port_id == qmap->port_id && q->queue_id == qmap->queue_id)
201+
txq_users++;
202+
}
203+
}
204+
if (txq_users > 1) {
205+
struct iface_info_port *port = find_port(ports, qmap->port_id);
206+
ctx->lock = &port->txq_locks[qmap->queue_id];
207+
LOG(WARNING,
208+
"[CPU %d] port %s txq %u shared by %u workers",
209+
worker->cpu_id,
210+
port->devargs,
211+
qmap->queue_id,
212+
txq_users);
213+
}
214+
194215
for (rte_edge_t edge = 0; edge < gr_vec_len(tx_node_names); edge++) {
195216
if (strcmp(tx_node_names[edge], node_name) == 0) {
196217
// update the port_output context data to map this port to the

modules/infra/control/port.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,26 @@ int port_configure(struct iface_info_port *p, uint16_t n_txq_min) {
9191
if (numa_available() != -1)
9292
socket_id = rte_eth_dev_socket_id(p->port_id);
9393

94-
// FIXME: deal with drivers that do not support more than 1 (or N) tx queues
95-
p->n_txq = n_txq_min;
94+
if ((ret = rte_eth_dev_info_get(p->port_id, &info)) < 0)
95+
return errno_log(-ret, "rte_eth_dev_info_get");
96+
9697
if (p->n_rxq == 0)
9798
p->n_rxq = 1;
9899

99-
if ((ret = rte_eth_dev_info_get(p->port_id, &info)) < 0)
100-
return errno_log(-ret, "rte_eth_dev_info_get");
100+
// cap number of queues to device maximum
101+
p->n_txq = RTE_MIN(n_txq_min, info.max_tx_queues);
101102

102103
if (strcmp(info.driver_name, "net_tap") == 0) {
103104
p->n_txq = RTE_MAX(p->n_txq, p->n_rxq);
104105
p->n_rxq = p->n_txq;
105106
}
106107

108+
if (p->n_txq < n_txq_min)
109+
LOG(NOTICE, "port %s TX queues limited to %u", p->devargs, p->n_txq);
110+
111+
for (uint16_t q = 0; q < p->n_txq; q++)
112+
rte_spinlock_init(&p->txq_locks[q]);
113+
107114
rxq_size = get_rxq_size(p, &info);
108115
txq_size = get_txq_size(p, &info);
109116

modules/infra/control/worker.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,10 @@ static void worker_txq_distribute(gr_vec struct iface_info_port **ports) {
236236
STAILQ_FOREACH (worker, &workers, next) {
237237
if (gr_vec_len(worker->rxqs) == 0)
238238
continue;
239+
assert(port->n_txq > 0);
239240
struct queue_map txq = {
240241
.port_id = port->port_id,
241-
.queue_id = txq_idx,
242+
.queue_id = txq_idx % port->n_txq,
242243
.enabled = port->started,
243244
};
244245
gr_vec_add(worker->txqs, txq);

modules/infra/control/worker_test.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@ static struct worker w2 = {.cpu_id = 2};
2626
static struct worker w3 = {.cpu_id = 3};
2727
static struct worker w4 = {.cpu_id = 4};
2828
static struct worker w5 = {.cpu_id = 5};
29-
static struct rte_eth_dev_info dev_info = {.driver_name = "net_null", .nb_rx_queues = 2};
29+
static struct rte_eth_dev_info dev_info = {
30+
.driver_name = "net_null",
31+
.nb_rx_queues = 2,
32+
.max_rx_queues = 4,
33+
.max_tx_queues = 4,
34+
};
3035

3136
// mocked types/functions
3237
int gr_rte_log_type;
@@ -167,6 +172,7 @@ static int setup(void **) {
167172
port->started = true;
168173
port->port_id = i;
169174
port->n_rxq = 2;
175+
port->n_txq = 3;
170176
ifaces[i] = iface;
171177
}
172178
STAILQ_INSERT_TAIL(&workers, &w1, next);
@@ -401,14 +407,9 @@ static void queue_distribute_increase(void **) {
401407
assert_qmaps(w5.rxqs, q(2, 0));
402408
assert_qmaps(w1.txqs, q(0, 2), q(1, 2), q(2, 2));
403409
assert_qmaps(w2.txqs, q(0, 3), q(1, 3), q(2, 3));
404-
assert_qmaps(w3.txqs, q(0, 4), q(1, 4), q(2, 4));
410+
assert_qmaps(w3.txqs, q(0, 0), q(1, 0), q(2, 0));
405411
assert_qmaps(w4.txqs, q(0, 0), q(1, 0), q(2, 0));
406412
assert_qmaps(w5.txqs, q(0, 1), q(1, 1), q(2, 1));
407-
408-
for (unsigned i = 0; i < ARRAY_DIM(ifaces); i++) {
409-
struct iface_info_port *p = iface_info_port(ifaces[i]);
410-
assert_int_equal(p->n_txq, CPU_COUNT(&affinity));
411-
}
412413
}
413414

414415
int main(void) {

modules/infra/datapath/gr_rxtx.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <rte_build_config.h>
1010
#include <rte_graph.h>
11+
#include <rte_spinlock.h>
1112

1213
#include <stddef.h>
1314
#include <stdint.h>
@@ -28,7 +29,10 @@ GR_NODE_CTX_TYPE(rx_node_ctx, {
2829
uint16_t burst_size;
2930
});
3031

31-
GR_NODE_CTX_TYPE(tx_node_ctx, { struct port_queue txq; });
32+
GR_NODE_CTX_TYPE(tx_node_ctx, {
33+
struct port_queue txq;
34+
rte_spinlock_t *lock;
35+
});
3236

3337
struct port_output_edges {
3438
rte_edge_t edges[RTE_MAX_ETHPORTS];

modules/infra/datapath/port_tx.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <rte_build_config.h>
1414
#include <rte_ethdev.h>
1515
#include <rte_malloc.h>
16+
#include <rte_spinlock.h>
1617

1718
#include <stdint.h>
1819

@@ -53,7 +54,14 @@ tx_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t
5354
}
5455
}
5556

56-
tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs);
57+
if (likely(ctx->lock == NULL)) {
58+
tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs);
59+
} else {
60+
rte_spinlock_lock(ctx->lock);
61+
tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs);
62+
rte_spinlock_unlock(ctx->lock);
63+
}
64+
5765
if (tx_ok < nb_objs)
5866
rte_node_enqueue(graph, node, TX_ERROR, &objs[tx_ok], nb_objs - tx_ok);
5967

0 commit comments

Comments
 (0)