Skip to content

Commit 721fd83

Browse files
committed
Dpdk - introduce DpdkDevice class
1 parent dfaa896 commit 721fd83

File tree

3 files changed

+385
-0
lines changed

3 files changed

+385
-0
lines changed

Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ if WITH_DPDK
158158
ipfixprobe_input_src+=\
159159
input/dpdk/dpdkMbuf.hpp \
160160
input/dpdk/dpdkMbuf.cpp \
161+
input/dpdk/dpdkDevice.hpp \
162+
input/dpdk/dpdkDevice.cpp \
161163
input/dpdk.cpp \
162164
input/dpdk.h \
163165
input/dpdk-ring.cpp \

input/dpdk/dpdkDevice.cpp

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/**
2+
* \file
3+
* \brief Implementation of the DpdkDevice class.
4+
* \author Pavel Siska <[email protected]>
5+
* \date 2023
6+
*/
7+
/*
8+
* Copyright (C) 2023 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*/
25+
26+
#include "dpdkDevice.hpp"
27+
28+
#include <chrono>
29+
#include <cstring>
30+
#include <iostream>
31+
#include <ipfixprobe/input.hpp>
32+
#include <rte_errno.h>
33+
#include <rte_version.h>
34+
#include <unistd.h>
35+
36+
namespace ipxp {
37+
38+
DpdkDevice::DpdkDevice(
39+
uint16_t portID,
40+
uint16_t rxQueueCount,
41+
uint16_t memPoolSize,
42+
uint16_t mbufsCount)
43+
: m_portID(portID)
44+
, m_rxQueueCount(rxQueueCount)
45+
, m_txQueueCount(0)
46+
, m_mBufsCount(mbufsCount)
47+
, m_isNfbDpdkDriver(false)
48+
, m_supportedRSS(false)
49+
, m_supportedHWTimestamp(false)
50+
{
51+
validatePort();
52+
recognizeDriver();
53+
configurePort();
54+
initMemPools(memPoolSize);
55+
setupRxQueues();
56+
configureRSS();
57+
enablePort();
58+
}
59+
60+
DpdkDevice::~DpdkDevice()
61+
{
62+
rte_eth_dev_stop(m_portID);
63+
rte_eth_dev_close(m_portID);
64+
}
65+
66+
void DpdkDevice::validatePort()
67+
{
68+
if (!rte_eth_dev_is_valid_port(m_portID)) {
69+
throw PluginError(
70+
"DpdkDevice::validatePort() has failed. Invalid DPDK port [" + std::to_string(m_portID)
71+
+ "] specified");
72+
}
73+
}
74+
75+
void DpdkDevice::recognizeDriver()
76+
{
77+
rte_eth_dev_info rteDevInfo;
78+
if (rte_eth_dev_info_get(m_portID, &rteDevInfo)) {
79+
throw PluginError("DpdkDevice::recognizeDriver() has failed. Unable to get rte dev info");
80+
}
81+
82+
if (std::strcmp(rteDevInfo.driver_name, "net_nfb") == 0) {
83+
m_isNfbDpdkDriver = true;
84+
registerRxTimestamp();
85+
setRxTimestampDynflag();
86+
}
87+
88+
std::cerr << "Capabilities of the port " << m_portID << " with driver "
89+
<< rteDevInfo.driver_name << ":" << std::endl;
90+
std::cerr << "\tRX offload: " << rteDevInfo.rx_offload_capa << std::endl;
91+
std::cerr << "\tflow type RSS offloads: " << rteDevInfo.flow_type_rss_offloads << std::endl;
92+
93+
/* Check if RSS hashing is supported in NIC */
94+
m_supportedRSS = (rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP) != 0;
95+
std::cerr << "\tDetected RSS offload capability: " << (m_supportedRSS ? "yes" : "no")
96+
<< std::endl;
97+
98+
/* Check if HW timestamps are supported, we support NFB cards only */
99+
if (m_isNfbDpdkDriver) {
100+
m_supportedHWTimestamp = (rteDevInfo.rx_offload_capa & RTE_ETH_RX_OFFLOAD_TIMESTAMP) != 0;
101+
} else {
102+
m_supportedHWTimestamp = false;
103+
}
104+
std::cerr << "\tDetected HW timestamp capability: " << (m_supportedHWTimestamp ? "yes" : "no")
105+
<< std::endl;
106+
}
107+
108+
void DpdkDevice::registerRxTimestamp()
109+
{
110+
if (rte_mbuf_dyn_rx_timestamp_register(&m_rxTimestampOffset, NULL)) {
111+
throw PluginError(
112+
"DpdkDevice::registerRxTimestamp() has failed. Unable to get Rx timestamp offset");
113+
}
114+
}
115+
116+
void DpdkDevice::setRxTimestampDynflag()
117+
{
118+
m_rxTimestampDynflag
119+
= RTE_BIT64(rte_mbuf_dynflag_lookup(RTE_MBUF_DYNFLAG_RX_TIMESTAMP_NAME, NULL));
120+
}
121+
122+
void DpdkDevice::configurePort()
123+
{
124+
auto portConfig = createPortConfig();
125+
if (rte_eth_dev_configure(m_portID, m_rxQueueCount, m_txQueueCount, &portConfig)) {
126+
throw PluginError("DpdkDevice::configurePort() has failed. Unable to configure interface");
127+
}
128+
}
129+
130+
rte_eth_conf DpdkDevice::createPortConfig()
131+
{
132+
if (m_rxQueueCount > 1 && !m_supportedRSS) {
133+
std::cerr << "RSS is not supported by card, multiple queues will not work as expected."
134+
<< std::endl;
135+
throw PluginError(
136+
"DpdkDevice::createPortConfig() has failed. Required RSS for q>1 is not supported.");
137+
}
138+
139+
#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
140+
rte_eth_conf portConfig {.rxmode = {.mtu = RTE_ETHER_MAX_LEN}};
141+
#else
142+
rte_eth_conf portConfig {.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN}};
143+
#endif
144+
145+
if (m_supportedRSS) {
146+
#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
147+
portConfig.rxmode.mq_mode = RTE_ETH_MQ_RX_RSS;
148+
#else
149+
portConfig.rxmode.mq_mode = ETH_MQ_RX_RSS;
150+
#endif
151+
} else {
152+
portConfig.rxmode.mq_mode = RTE_ETH_MQ_RX_NONE;
153+
}
154+
155+
if (m_supportedHWTimestamp) {
156+
portConfig.rxmode.offloads |= RTE_ETH_RX_OFFLOAD_TIMESTAMP;
157+
}
158+
return portConfig;
159+
}
160+
161+
void DpdkDevice::initMemPools(uint16_t memPoolSize)
162+
{
163+
constexpr int MEMPOOL_CACHE_SIZE = 256;
164+
165+
m_memPools.reserve(m_rxQueueCount);
166+
167+
for (uint16_t rxQueueID = 0; rxQueueID < m_rxQueueCount; rxQueueID++) {
168+
std::string memPoolName
169+
= "mbuf_pool_" + std::to_string(m_portID) + "_" + std::to_string(rxQueueID);
170+
rte_mempool* memPool = rte_pktmbuf_pool_create(
171+
memPoolName.c_str(),
172+
memPoolSize,
173+
MEMPOOL_CACHE_SIZE,
174+
0,
175+
RTE_MBUF_DEFAULT_BUF_SIZE,
176+
rte_lcore_to_socket_id(rxQueueID));
177+
if (!memPool) {
178+
throw PluginError(
179+
"DpdkDevice::initMemPool() has failed. Failed to create packets memory pool for "
180+
"port "
181+
+ std::to_string(m_portID) + ", pool name: " + memPoolName + ". Error was: '"
182+
+ std::string(rte_strerror(rte_errno))
183+
+ "' [Error code: " + std::to_string(rte_errno) + "]");
184+
}
185+
186+
m_memPools.emplace_back(memPool);
187+
}
188+
}
189+
190+
void DpdkDevice::setupRxQueues()
191+
{
192+
for (uint16_t rxQueueID = 0; rxQueueID < m_rxQueueCount; rxQueueID++) {
193+
int ret = rte_eth_rx_queue_setup(
194+
m_portID,
195+
rxQueueID,
196+
m_mBufsCount,
197+
rte_eth_dev_socket_id(m_portID),
198+
nullptr,
199+
m_memPools[rxQueueID]);
200+
if (ret < 0) {
201+
throw PluginError(
202+
"DpdkDevice::setupRxQueues() has failed. Failed to set up RX queue(s) for port "
203+
+ std::to_string(m_portID));
204+
}
205+
}
206+
}
207+
208+
void DpdkDevice::configureRSS()
209+
{
210+
if (!m_supportedRSS) {
211+
std::cerr << "Skipped RSS hash setting for port " << m_portID << "." << std::endl;
212+
return;
213+
}
214+
215+
constexpr size_t RSS_KEY_LEN = 40;
216+
// biflow hash key
217+
static uint8_t rssKey[RSS_KEY_LEN]
218+
= {0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
219+
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
220+
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A};
221+
222+
struct rte_eth_rss_conf rssConfig
223+
= {.rss_key = rssKey,
224+
.rss_key_len = RSS_KEY_LEN,
225+
#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
226+
.rss_hf = RTE_ETH_RSS_IP,
227+
#else
228+
.rss_hf = ETH_RSS_IP,
229+
#endif
230+
};
231+
232+
if (rte_eth_dev_rss_hash_update(m_portID, &rssConfig)) {
233+
std::cerr << "Setting RSS hash for port " << m_portID << "." << std::endl;
234+
}
235+
}
236+
237+
void DpdkDevice::enablePort()
238+
{
239+
if (rte_eth_dev_start(m_portID) < 0) {
240+
throw PluginError("DpdkDevice::enablePort() has failed. Failed to start DPDK port");
241+
}
242+
243+
if (rte_eth_promiscuous_enable(m_portID)) {
244+
throw PluginError("DpdkDevice::enablePort() has failed. Failed to set promiscuous mode");
245+
}
246+
247+
std::cerr << "DPDK input at port " << m_portID << " started." << std::endl;
248+
}
249+
250+
uint16_t DpdkDevice::receive(DpdkMbuf& dpdkMuf, uint16_t rxQueueID)
251+
{
252+
dpdkMuf.releaseMbufs();
253+
uint16_t receivedPackets
254+
= rte_eth_rx_burst(m_portID, rxQueueID, dpdkMuf.data(), dpdkMuf.maxSize());
255+
dpdkMuf.setMbufsInUse(receivedPackets);
256+
return receivedPackets;
257+
}
258+
259+
timeval DpdkDevice::getPacketTimestamp(rte_mbuf* mbuf)
260+
{
261+
timeval tv;
262+
if (m_isNfbDpdkDriver && (mbuf->ol_flags & m_rxTimestampDynflag)) {
263+
static constexpr time_t nanosecInSec = 1000000000;
264+
static constexpr time_t nsecInUsec = 1000;
265+
266+
rte_mbuf_timestamp_t timestamp
267+
= *RTE_MBUF_DYNFIELD(mbuf, m_rxTimestampOffset, rte_mbuf_timestamp_t*);
268+
tv.tv_sec = timestamp / nanosecInSec;
269+
tv.tv_usec = (timestamp - ((tv.tv_sec) * nanosecInSec)) / nsecInUsec;
270+
271+
return tv;
272+
} else {
273+
auto now = std::chrono::system_clock::now();
274+
auto now_t = std::chrono::system_clock::to_time_t(now);
275+
276+
auto dur = now - std::chrono::system_clock::from_time_t(now_t);
277+
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
278+
279+
tv.tv_sec = now_t;
280+
tv.tv_usec = micros;
281+
return tv;
282+
}
283+
}
284+
285+
} // namespace ipxp

input/dpdk/dpdkDevice.hpp

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* \file
3+
* \brief Declaration of the DpdkDevice class.
4+
* \author Pavel Siska <[email protected]>
5+
* \date 2023
6+
*/
7+
/*
8+
* Copyright (C) 2023 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*/
25+
26+
#pragma once
27+
28+
#include "dpdkMbuf.hpp"
29+
30+
#include <rte_ethdev.h>
31+
#include <rte_mempool.h>
32+
#include <vector>
33+
34+
namespace ipxp {
35+
36+
/**
37+
* @brief The DpdkDevice class represents a DPDK (Data Plane Development Kit) device.
38+
* It provides functionality to manage and interact with the DPDK device.
39+
*/
40+
class DpdkDevice {
41+
public:
42+
/**
43+
* @brief Constructs a DpdkDevice object with the specified parameters.
44+
* @param portID The ID of the DPDK port to be used.
45+
* @param rxQueueCount The number of receive queues to be configured.
46+
* @param memPoolSize The size of the memory pool for packet buffers.
47+
* @param mbufsCount The number of mbufs (packet buffers) to be allocated.
48+
*/
49+
DpdkDevice(uint16_t portID, uint16_t rxQueueCount, uint16_t memPoolSize, uint16_t mbufsCount);
50+
51+
/**
52+
* @brief Receives packets from the specified receive queue of the DPDK device.
53+
* @param dpdkMuf A reference to a DpdkMbuf object to store the received packets.
54+
* @param rxQueueID The ID of the receive queue from which to receive packets.
55+
* @return The number of packets received.
56+
*/
57+
uint16_t receive(DpdkMbuf& dpdkMuf, uint16_t rxQueueID);
58+
59+
/**
60+
* @brief Retrieves the packet timestamp from the given mbuf.
61+
* @param mbuf The rte_mbuf structure representing the received packet.
62+
* @return The timestamp of the packet.
63+
*/
64+
timeval getPacketTimestamp(rte_mbuf* mbuf);
65+
66+
/**
67+
* @brief Destructs the DpdkDevice object.
68+
* Stops and closes the DPDK port associated with the device.
69+
*/
70+
~DpdkDevice();
71+
72+
private:
73+
void validatePort();
74+
void recognizeDriver();
75+
void configurePort();
76+
rte_eth_conf createPortConfig();
77+
void initMemPools(uint16_t memPoolSize);
78+
void setupRxQueues();
79+
void configureRSS();
80+
void enablePort();
81+
void createRteMempool(uint16_t mempoolSize);
82+
void setRxTimestampDynflag();
83+
void registerRxTimestamp();
84+
85+
std::vector<rte_mempool*> m_memPools;
86+
uint16_t m_portID;
87+
uint16_t m_rxQueueCount;
88+
uint16_t m_txQueueCount;
89+
uint16_t m_mBufsCount;
90+
bool m_isNfbDpdkDriver;
91+
bool m_supportedRSS;
92+
bool m_supportedHWTimestamp;
93+
int m_rxTimestampOffset;
94+
int m_rxTimestampDynflag;
95+
96+
};
97+
98+
} // namespace ipxp

0 commit comments

Comments
 (0)