diff --git a/src/dp_port.c b/src/dp_port.c index 6717e193f..8a8e12501 100644 --- a/src/dp_port.c +++ b/src/dp_port.c @@ -99,6 +99,8 @@ static int dp_port_init_ethdev(struct dp_port *port, struct rte_eth_dev_info *de /* Default config */ port_conf.txmode.offloads &= dev_info->tx_offload_capa; + if (dev_info->rx_offload_capa & RTE_ETH_RX_OFFLOAD_TIMESTAMP) + port_conf.rxmode.offloads |= RTE_ETH_RX_OFFLOAD_TIMESTAMP; if (dp_conf_get_nic_type() == DP_CONF_NIC_TYPE_TAP || dp_conf_is_multiport_eswitch()) nr_hairpin_queues = 0; diff --git a/src/dpdk_layer.c b/src/dpdk_layer.c index 129e38cc1..4dcc747c1 100644 --- a/src/dpdk_layer.c +++ b/src/dpdk_layer.c @@ -146,8 +146,10 @@ static int graph_main_loop(__rte_unused void *arg) nanosleep(&standby_sleep, NULL); } - DPS_LOG_INFO("Starting packet processing"); - rx_node_start_processing(); + if (!force_quit) { + DPS_LOG_INFO("Starting packet processing"); + rx_node_start_processing(); + } while (!force_quit) rte_graph_walk(graph); diff --git a/src/nodes/rx_node.c b/src/nodes/rx_node.c index 0212f2b01..3ab773203 100644 --- a/src/nodes/rx_node.c +++ b/src/nodes/rx_node.c @@ -21,19 +21,56 @@ DP_NODE_REGISTER_SOURCE(RX, rx, NEXT_NODES); struct rx_node_ctx { const struct dp_port *port; uint16_t queue_id; + bool flush_old_packets; }; static_assert(sizeof(struct rx_node_ctx) <= RTE_NODE_CTX_SZ, "Rx node context will not fit into the node"); // also some way to map ports to nodes is needed static rte_node_t rx_node_ids[DP_MAX_PORTS]; +static struct rx_node_ctx *rx_node_ctxs[DP_MAX_PORTS]; // dpservice starts in "standby mode" (no processing of traffic) static volatile bool standing_by = true; +static int rx_timestamp_offset; +static uint64_t flush_timestamp; +static uint64_t get_current_timestamp(void) +{ + const struct dp_ports *ports; + uint64_t timestamp = 0; + int ret; + + // is timestamping of packets even supported? + if (rx_timestamp_offset <= 0) + return 0; + + // PF0/PF1 are in isolated mode, which prevents rte_eth_read_clock() from working + // thus find the first VF that is up (timestamp is the same for all ports on the NIC) + ports = dp_get_ports(); + DP_FOREACH_PORT(ports, port) { + if (!port->allocated) + continue; + ret = rte_eth_read_clock(port->port_id, ×tamp); + if (!DP_FAILED(ret)) + break; + } + return timestamp; +} + void rx_node_start_processing(void) { + // even though processing was stopped, buffers still contain old packets + // to flush them, need to know which ones are old, need current timestamp + flush_timestamp = get_current_timestamp(); + if (flush_timestamp > 0) { + // notify all Rx nodes that they need to flush + for (size_t i = 0; i < RTE_DIM(rx_node_ctxs); ++i) + if (rx_node_ctxs[i]) + rx_node_ctxs[i]->flush_old_packets = true; + } + standing_by = false; } @@ -65,6 +102,13 @@ static int rx_node_init(const struct rte_graph *graph, struct rte_node *node) uint16_t port_id; const struct dp_port *port; + if (rx_timestamp_offset == 0) { + if (DP_FAILED(rte_mbuf_dyn_rx_timestamp_register(&rx_timestamp_offset, NULL))) { + DPS_LOG_ERR("Cannot register Rx timestamp field", DP_LOG_RET(rte_errno)); + return DP_ERROR; + } + } + // Find this node's dedicated port to be used in processing for (port_id = 0; port_id < RTE_DIM(rx_node_ids); ++port_id) if (rx_node_ids[port_id] == node->id) @@ -84,10 +128,26 @@ static int rx_node_init(const struct rte_graph *graph, struct rte_node *node) // save dp_port to this node's context for accessing its id and the status of allocation ctx->port = port; ctx->queue_id = graph->id; + rx_node_ctxs[port_id] = ctx; DPNODE_LOG_INFO(node, "Initialized", DP_LOG_PORTID(ctx->port->port_id), DP_LOG_QUEUEID(ctx->queue_id)); return DP_OK; } + +static uint16_t rx_find_old_packets(void **objs, uint16_t n_pkts, uint64_t timestamp) +{ + rte_mbuf_timestamp_t *pkt_timestamp; + uint16_t old = 0; + + for (uint16_t i = 0; i < n_pkts; ++i) { + pkt_timestamp = RTE_MBUF_DYNFIELD(objs[i], rx_timestamp_offset, rte_mbuf_timestamp_t *); + if (*pkt_timestamp >= timestamp) + break; + old++; + } + return old; +} + static uint16_t rx_node_process(struct rte_graph *graph, struct rte_node *node, void **objs, @@ -95,6 +155,7 @@ static uint16_t rx_node_process(struct rte_graph *graph, { struct rx_node_ctx *ctx = (struct rx_node_ctx *)node->ctx; uint16_t n_pkts; + uint16_t old; RTE_SET_USED(cnt); // this is a source node, input data is not present yet @@ -104,13 +165,25 @@ static uint16_t rx_node_process(struct rte_graph *graph, if (unlikely(standing_by)) return 0; - n_pkts = rte_eth_rx_burst(ctx->port->port_id, - ctx->queue_id, - (struct rte_mbuf **)objs, - RTE_GRAPH_BURST_SIZE); + n_pkts = rte_eth_rx_burst(ctx->port->port_id, ctx->queue_id, (struct rte_mbuf **)objs, RTE_GRAPH_BURST_SIZE); if (unlikely(!n_pkts)) return 0; + if (unlikely(ctx->flush_old_packets)) { + DPS_LOG_INFO("Flushing old packets", DP_LOG_PORT(ctx->port)); + old = rx_find_old_packets(objs, n_pkts, flush_timestamp); + if (old > 0) { + rte_pktmbuf_free_bulk((struct rte_mbuf **)objs, old); + objs += old; + n_pkts -= old; + DPS_LOG_INFO("Flushed old packets", DP_LOG_VALUE(old), DP_LOG_PORT(ctx->port)); + // if all packets were old, continue flushing + if (old == n_pkts) + return 0; + } + ctx->flush_old_packets = false; + } + node->idx = n_pkts; // Rx node only ever leads to CLS node (can move all packets at once)