Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/dp_port.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ static int dp_port_init_ethdev(struct dp_port *port, struct rte_eth_dev_info *de

/* Default config */
port_conf.txmode.offloads &= dev_info->tx_offload_capa;
if (dev_info->rx_offload_capa & RTE_ETH_RX_OFFLOAD_TIMESTAMP)
port_conf.rxmode.offloads |= RTE_ETH_RX_OFFLOAD_TIMESTAMP;

if (dp_conf_get_nic_type() == DP_CONF_NIC_TYPE_TAP || dp_conf_is_multiport_eswitch())
nr_hairpin_queues = 0;
Expand Down
6 changes: 4 additions & 2 deletions src/dpdk_layer.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ static int graph_main_loop(__rte_unused void *arg)
nanosleep(&standby_sleep, NULL);
}

DPS_LOG_INFO("Starting packet processing");
rx_node_start_processing();
if (!force_quit) {
DPS_LOG_INFO("Starting packet processing");
rx_node_start_processing();
}

while (!force_quit)
rte_graph_walk(graph);
Expand Down
81 changes: 77 additions & 4 deletions src/nodes/rx_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,56 @@ DP_NODE_REGISTER_SOURCE(RX, rx, NEXT_NODES);
struct rx_node_ctx {
const struct dp_port *port;
uint16_t queue_id;
bool flush_old_packets;
};
static_assert(sizeof(struct rx_node_ctx) <= RTE_NODE_CTX_SZ,
"Rx node context will not fit into the node");

// also some way to map ports to nodes is needed
static rte_node_t rx_node_ids[DP_MAX_PORTS];
static struct rx_node_ctx *rx_node_ctxs[DP_MAX_PORTS];

// dpservice starts in "standby mode" (no processing of traffic)
static volatile bool standing_by = true;
static int rx_timestamp_offset;
static uint64_t flush_timestamp;


static uint64_t get_current_timestamp(void)
{
const struct dp_ports *ports;
uint64_t timestamp = 0;
int ret;

// is timestamping of packets even supported?
if (rx_timestamp_offset <= 0)
return 0;

// PF0/PF1 are in isolated mode, which prevents rte_eth_read_clock() from working
// thus find the first VF that is up (timestamp is the same for all ports on the NIC)
ports = dp_get_ports();
DP_FOREACH_PORT(ports, port) {
if (!port->allocated)
continue;
ret = rte_eth_read_clock(port->port_id, &timestamp);
if (!DP_FAILED(ret))
break;
}
return timestamp;
}

void rx_node_start_processing(void)
{
// even though processing was stopped, buffers still contain old packets
// to flush them, need to know which ones are old, need current timestamp
flush_timestamp = get_current_timestamp();
if (flush_timestamp > 0) {
// notify all Rx nodes that they need to flush
for (size_t i = 0; i < RTE_DIM(rx_node_ctxs); ++i)
if (rx_node_ctxs[i])
rx_node_ctxs[i]->flush_old_packets = true;
}

standing_by = false;
}

Expand Down Expand Up @@ -65,6 +102,13 @@ static int rx_node_init(const struct rte_graph *graph, struct rte_node *node)
uint16_t port_id;
const struct dp_port *port;

if (rx_timestamp_offset == 0) {
if (DP_FAILED(rte_mbuf_dyn_rx_timestamp_register(&rx_timestamp_offset, NULL))) {
DPS_LOG_ERR("Cannot register Rx timestamp field", DP_LOG_RET(rte_errno));
return DP_ERROR;
}
}

// Find this node's dedicated port to be used in processing
for (port_id = 0; port_id < RTE_DIM(rx_node_ids); ++port_id)
if (rx_node_ids[port_id] == node->id)
Expand All @@ -84,17 +128,34 @@ static int rx_node_init(const struct rte_graph *graph, struct rte_node *node)
// save dp_port to this node's context for accessing its id and the status of allocation
ctx->port = port;
ctx->queue_id = graph->id;
rx_node_ctxs[port_id] = ctx;
DPNODE_LOG_INFO(node, "Initialized", DP_LOG_PORTID(ctx->port->port_id), DP_LOG_QUEUEID(ctx->queue_id));
return DP_OK;
}


static uint16_t rx_find_old_packets(void **objs, uint16_t n_pkts, uint64_t timestamp)
{
rte_mbuf_timestamp_t *pkt_timestamp;
uint16_t old = 0;

for (uint16_t i = 0; i < n_pkts; ++i) {
pkt_timestamp = RTE_MBUF_DYNFIELD(objs[i], rx_timestamp_offset, rte_mbuf_timestamp_t *);
if (*pkt_timestamp >= timestamp)
break;
old++;
}
return old;
}

static uint16_t rx_node_process(struct rte_graph *graph,
struct rte_node *node,
void **objs,
uint16_t cnt)
{
struct rx_node_ctx *ctx = (struct rx_node_ctx *)node->ctx;
uint16_t n_pkts;
uint16_t old;

RTE_SET_USED(cnt); // this is a source node, input data is not present yet

Expand All @@ -104,13 +165,25 @@ static uint16_t rx_node_process(struct rte_graph *graph,
if (unlikely(standing_by))
return 0;

n_pkts = rte_eth_rx_burst(ctx->port->port_id,
ctx->queue_id,
(struct rte_mbuf **)objs,
RTE_GRAPH_BURST_SIZE);
n_pkts = rte_eth_rx_burst(ctx->port->port_id, ctx->queue_id, (struct rte_mbuf **)objs, RTE_GRAPH_BURST_SIZE);
if (unlikely(!n_pkts))
return 0;

if (unlikely(ctx->flush_old_packets)) {
DPS_LOG_INFO("Flushing old packets", DP_LOG_PORT(ctx->port));
old = rx_find_old_packets(objs, n_pkts, flush_timestamp);
if (old > 0) {
rte_pktmbuf_free_bulk((struct rte_mbuf **)objs, old);
objs += old;
n_pkts -= old;
DPS_LOG_INFO("Flushed old packets", DP_LOG_VALUE(old), DP_LOG_PORT(ctx->port));
// if all packets were old, continue flushing
if (old == n_pkts)
return 0;
}
ctx->flush_old_packets = false;
}

node->idx = n_pkts;

// Rx node only ever leads to CLS node (can move all packets at once)
Expand Down
Loading