2626 *
2727 */
2828
29+
2930#include < cstring>
3031#include < mutex>
3132#include < rte_ethdev.h>
@@ -121,9 +122,8 @@ DpdkCore& DpdkCore::getInstance()
121122
122123DpdkCore::~DpdkCore ()
123124{
124- rte_eth_dev_stop (m_portId);
125- rte_eth_dev_close (m_portId);
126- rte_eal_cleanup ();
125+ m_dpdkDevices.clear ();
126+ // rte_eal_cleanup(); // segfault?
127127 m_instance = nullptr ;
128128}
129129
@@ -135,103 +135,9 @@ void DpdkCore::deinit()
135135 }
136136}
137137
138- void DpdkCore::initInterface ()
139- {
140- validatePort ();
141- auto portConfig = createPortConfig ();
142- configurePort (portConfig);
143- }
144-
145- void DpdkCore::validatePort ()
146- {
147- if (!rte_eth_dev_is_valid_port (m_portId)) {
148- throw PluginError (" Invalid DPDK port specified" );
149- }
150- }
151-
152- struct rte_eth_conf DpdkCore::createPortConfig ()
153- {
154- if (m_rxQueueCount > 1 && !m_supportedRSS) {
155- std::cerr << " RSS is not supported by card, multiple queues will not work as expected." << std::endl;
156- throw PluginError (" Required RSS for q>1 is not supported." );
157- }
158-
159- #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
160- rte_eth_conf portConfig {.rxmode = {.mtu = RTE_ETHER_MAX_LEN}};
161- #else
162- rte_eth_conf portConfig {.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN}};
163- #endif
164-
165- if (m_supportedRSS) {
166- #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
167- portConfig.rxmode .mq_mode = RTE_ETH_MQ_RX_RSS;
168- #else
169- portConfig.rxmode .mq_mode = ETH_MQ_RX_RSS;
170- #endif
171- } else {
172- portConfig.rxmode .mq_mode = RTE_ETH_MQ_RX_NONE;
173- }
174-
175- if (m_supportedHWTimestamp) {
176- portConfig.rxmode .offloads |= RTE_ETH_RX_OFFLOAD_TIMESTAMP;
177- }
178- return portConfig;
179- }
180-
181- void DpdkCore::configurePort (const struct rte_eth_conf & portConfig)
182- {
183- if (rte_eth_dev_configure (m_portId, m_rxQueueCount, m_txQueueCount, &portConfig)) {
184- throw PluginError (" Unable to configure interface" );
185- }
186- }
187-
188- void DpdkCore::configureRSS ()
189- {
190- if (!m_supportedRSS) {
191- std::cerr << " SKipped RSS hash setting for port " << m_portId << " ." << std::endl;
192- return ;
193- }
194-
195- constexpr size_t RSS_KEY_LEN = 40 ;
196- // biflow hash key
197- static uint8_t rssKey[RSS_KEY_LEN] = {
198- 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A ,
199- 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A ,
200- 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A ,
201- 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A , 0x6D , 0x5A
202- };
203-
204- struct rte_eth_rss_conf rssConfig = {
205- .rss_key = rssKey,
206- .rss_key_len = RSS_KEY_LEN,
207- #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
208- .rss_hf = RTE_ETH_RSS_IP,
209- #else
210- .rss_hf = ETH_RSS_IP,
211- #endif
212- };
213-
214- if (rte_eth_dev_rss_hash_update (m_portId, &rssConfig)) {
215- std::cerr << " Setting RSS hash for port " << m_portId << " ." << std::endl;
216- }
217- }
218-
219- void DpdkCore::enablePort ()
220- {
221- if (rte_eth_dev_start (m_portId) < 0 ) {
222- throw PluginError (" Unable to start DPDK port" );
223- }
224-
225- if (rte_eth_promiscuous_enable (m_portId)) {
226- throw PluginError (" Unable to set promiscuous mode" );
227- }
228- }
229-
230- void DpdkCore::registerRxTimestamp ()
138+ uint16_t DpdkCore::getMbufsCount () const noexcept
231139{
232- if (rte_mbuf_dyn_rx_timestamp_register (&m_rxTimestampOffset, NULL )) {
233- throw PluginError (" Unable to get Rx timestamp offset" );
234- }
140+ return m_mBufsCount;
235141}
236142
237143void DpdkCore::configure (const char * params)
@@ -240,55 +146,24 @@ void DpdkCore::configure(const char* params)
240146 return ;
241147 }
242148
243-
244149 try {
245150 parser.parse (params);
246151 } catch (ParserError& e) {
247152 throw PluginError (e.what ());
248153 }
249-
250- m_portId = parser.port_num ();
251- m_rxQueueCount = parser.rx_queues ();
252- configureEal (parser.eal_params ());
253-
254- /* recognize NIC driver and check capabilities */
255- recognizeDriver ();
256- registerRxTimestamp ();
257- initInterface ();
258- isConfigured = true ;
259- }
260-
261- void DpdkCore::recognizeDriver ()
262- {
263- rte_eth_dev_info rteDevInfo;
264- if (rte_eth_dev_info_get (m_portId, &rteDevInfo)) {
265- throw PluginError (" Unable to get rte dev info" );
266- }
267-
268- if (std::strcmp (rteDevInfo.driver_name , " net_nfb" ) == 0 ) {
269- m_isNfbDpdkDriver = true ;
270- }
271154
272- std::cerr << " Capabilities of the port " << m_portId << " with driver " << rteDevInfo. driver_name << " : " << std::endl ;
273- std::cerr << " \t RX offload: " << rteDevInfo. rx_offload_capa << std::endl ;
274- std::cerr << " \t flow type RSS offloads: " << rteDevInfo. flow_type_rss_offloads << std::endl ;
155+ uint16_t mempoolSize = parser. pkt_mempool_size () ;
156+ uint16_t rxQueueCount = parser. rx_queues () ;
157+ m_mBufsCount = parser. pkt_buffer_size () ;
275158
276- /* Check if RSS hashing is supported in NIC */
277- m_supportedRSS = (rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP) != 0 ;
278- std::cerr << " \t Detected RSS offload capability: " << (m_supportedRSS ? " yes" : " no" ) << std::endl;
159+ configureEal (parser.eal_params ());
279160
280- /* Check if HW timestamps are supported, we support NFB cards only */
281- if (m_isNfbDpdkDriver) {
282- m_supportedHWTimestamp = (rteDevInfo.rx_offload_capa & RTE_ETH_RX_OFFLOAD_TIMESTAMP) != 0 ;
283- } else {
284- m_supportedHWTimestamp = false ;
161+ m_dpdkDevices.reserve (parser.port_numbers ().size ());
162+ for (auto portID : parser.port_numbers ()) {
163+ m_dpdkDevices.emplace_back (portID, rxQueueCount, mempoolSize, m_mBufsCount);
285164 }
286- std::cerr << " \t Detected HW timestamp capability: " << (m_supportedHWTimestamp ? " yes" : " no" ) << std::endl;
287- }
288165
289- bool DpdkCore::isNfbDpdkDriver ()
290- {
291- return m_isNfbDpdkDriver;
166+ isConfigured = true ;
292167}
293168
294169std::vector<char *> DpdkCore::convertStringToArgvFormat (const std::string& ealParams)
@@ -316,37 +191,14 @@ void DpdkCore::configureEal(const std::string& ealParams)
316191 }
317192}
318193
319- uint16_t DpdkCore::getRxQueueId ()
194+ uint16_t DpdkCore::getRxQueueId () noexcept
320195{
321196 return m_currentRxId++;
322197}
323198
324- void DpdkCore::startIfReady ()
325- {
326- if (m_rxQueueCount == m_currentRxId) {
327- configureRSS ();
328- enablePort ();
329- is_ifc_ready = true ;
330-
331- std::cerr << " DPDK input at port " << m_portId << " started." << std::endl;
332- }
333- }
334-
335- int DpdkCore::getRxTimestampOffset ()
336- {
337- return m_rxTimestampOffset;
338- }
339-
340- int DpdkCore::getRxTimestampDynflag ()
341- {
342- return RTE_BIT64 (rte_mbuf_dynflag_lookup (RTE_MBUF_DYNFLAG_RX_TIMESTAMP_NAME, NULL ));
343- }
344-
345199DpdkReader::DpdkReader ()
346200 : m_dpdkCore(DpdkCore::getInstance())
347201{
348- pkts_read_ = 0 ;
349- m_useHwRxTimestamp = false ;
350202}
351203
352204DpdkReader::~DpdkReader ()
@@ -358,104 +210,28 @@ void DpdkReader::init(const char* params)
358210{
359211 m_dpdkCore.configure (params);
360212 m_rxQueueId = m_dpdkCore.getRxQueueId ();
361- m_portId = m_dpdkCore.parser .port_num ();
362- m_rxTimestampOffset = m_dpdkCore.getRxTimestampOffset ();
363- m_rxTimestampDynflag = m_dpdkCore.getRxTimestampDynflag ();
364- m_useHwRxTimestamp = m_dpdkCore.isNfbDpdkDriver ();
365-
366- createRteMempool (m_dpdkCore.parser .pkt_mempool_size ());
367- createRteMbufs (m_dpdkCore.parser .pkt_buffer_size ());
368- setupRxQueue ();
369-
370- m_dpdkCore.startIfReady ();
371- }
372-
373- void DpdkReader::createRteMempool (uint16_t mempoolSize)
374- {
375- std::string mpool_name = " mbuf_pool_" + std::to_string (m_rxQueueId);
376- rteMempool = rte_pktmbuf_pool_create (
377- mpool_name.c_str (),
378- mempoolSize,
379- MEMPOOL_CACHE_SIZE,
380- 0 ,
381- RTE_MBUF_DEFAULT_BUF_SIZE,
382- rte_lcore_to_socket_id (m_rxQueueId));
383- if (!rteMempool) {
384- throw PluginError (" Unable to create memory pool. " + std::string (rte_strerror (rte_errno)));
385- }
386- }
387-
388- void DpdkReader::createRteMbufs (uint16_t mbufsSize)
389- {
390- try {
391- mbufs_.resize (mbufsSize);
392- } catch (const std::exception& e) {
393- throw PluginError (e.what ());
394- }
395- }
396-
397- void DpdkReader::setupRxQueue ()
398- {
399- int ret = rte_eth_rx_queue_setup (
400- m_portId,
401- m_rxQueueId,
402- mbufs_.size (),
403- rte_eth_dev_socket_id (m_portId),
404- nullptr ,
405- rteMempool);
406- if (ret < 0 ) {
407- throw PluginError (" Unable to set up RX queues" );
408- }
213+ m_dpdkDeviceCount = m_dpdkCore.getDpdkDeviceCount ();
214+ mBufs .resize (m_dpdkCore.getMbufsCount ());
409215}
410216
411- struct timeval DpdkReader::getTimestamp (rte_mbuf* mbuf)
412- {
413- struct timeval tv;
414- if (m_useHwRxTimestamp && (mbuf->ol_flags & m_rxTimestampDynflag)) {
415- static constexpr time_t nanosecInSec = 1000000000 ;
416- static constexpr time_t nsecInUsec = 1000 ;
417-
418- rte_mbuf_timestamp_t timestamp = *RTE_MBUF_DYNFIELD (mbuf, m_rxTimestampOffset, rte_mbuf_timestamp_t *);
419- tv.tv_sec = timestamp / nanosecInSec;
420- tv.tv_usec = (timestamp - ((tv.tv_sec ) * nanosecInSec)) / nsecInUsec;
421-
422- return tv;
423- } else {
424- auto now = std::chrono::system_clock::now ();
425- auto now_t = std::chrono::system_clock::to_time_t (now);
426-
427- auto dur = now - std::chrono::system_clock::from_time_t (now_t );
428- auto micros = std::chrono::duration_cast<std::chrono::microseconds>(dur).count ();
429-
430- tv.tv_sec = now_t ;
431- tv.tv_usec = micros;
432- return tv;
433- }
434-
435- }
436-
437217InputPlugin::Result DpdkReader::get (PacketBlock& packets)
438218{
439- while (m_dpdkCore.is_ifc_ready == false ) {
440- usleep (1000 );
441- }
442-
443219#ifndef WITH_FLEXPROBE
444220 parser_opt_t opt {&packets, false , false , 0 };
445221#endif
222+
446223 packets.cnt = 0 ;
447- for (auto i = 0 ; i < pkts_read_; i++) {
448- rte_pktmbuf_free (mbufs_[i]);
449- }
450- pkts_read_ = rte_eth_rx_burst (m_portId, m_rxQueueId, mbufs_.data (), mbufs_.size ());
451- if (pkts_read_ == 0 ) {
224+
225+ DpdkDevice& dpdkDevice = m_dpdkCore.getDpdkDevice (m_dpdkDeviceIndex++ % m_dpdkDeviceCount);
226+ uint16_t recivedPackets = dpdkDevice.receive (mBufs , m_rxQueueId);
227+ if (!recivedPackets) {
452228 return Result::TIMEOUT;
453229 }
454230
455- for (auto i = 0 ; i < pkts_read_; i ++) {
231+ for (auto packetID = 0 ; packetID < recivedPackets; packetID ++) {
456232#ifdef WITH_FLEXPROBE
457233 // Convert Flexprobe pre-parsed packet into IPFIXPROBE packet
458- auto conv_result = convert_from_flexprobe (mbufs_[i ], packets.pkts [packets.cnt ]);
234+ auto conv_result = convert_from_flexprobe (mBufs [packetID ], packets.pkts [packets.cnt ]);
459235 packets.bytes += packets.pkts [packets.cnt ].packet_len_wire ;
460236 m_seen++;
461237
@@ -466,10 +242,10 @@ InputPlugin::Result DpdkReader::get(PacketBlock& packets)
466242 packets.cnt ++;
467243#else
468244 parse_packet (&opt,
469- getTimestamp (mbufs_[i ]),
470- rte_pktmbuf_mtod (mbufs_[i ], const std::uint8_t *),
471- rte_pktmbuf_data_len (mbufs_[i ]),
472- rte_pktmbuf_data_len (mbufs_[i ]));
245+ dpdkDevice. getPacketTimestamp ( mBufs [packetID ]),
246+ rte_pktmbuf_mtod (mBufs [packetID ], const std::uint8_t *),
247+ rte_pktmbuf_data_len (mBufs [packetID ]),
248+ rte_pktmbuf_data_len (mBufs [packetID ]));
473249 m_seen++;
474250 m_parsed++;
475251#endif
0 commit comments