Skip to content

Commit 1d7f733

Browse files
committed
dpdk: new input plugin for reading via dpdk rings as secondary dpdk process
1 parent 32f0081 commit 1d7f733

File tree

4 files changed

+372
-1
lines changed

4 files changed

+372
-1
lines changed

Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ endif
155155
if WITH_DPDK
156156
ipfixprobe_input_src+=\
157157
input/dpdk.cpp \
158-
input/dpdk.h
158+
input/dpdk.h \
159+
input/dpdk-ring.cpp \
160+
input/dpdk-ring.h
159161
endif
160162

161163
ipfixprobe_headers_src=\

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ Here are the examples of various plugins usage:
159159
# The following `dpdk` interfaces are given without parameters; their configuration is inherited from the first one.
160160
# Example for the queue of 3 DPDK input plugins (q=3):
161161
`./ipfixprobe -i "dpdk;p=0;q=3;e=-c 0x1 -a <[domain:]bus:devid.func>" -i dpdk -i dpdk -p http "-p" bstats -p tls -o "ipfix;h=127.0.0.1"`
162+
163+
# Read packets using DPDK input interface as secondary process with shared memory (DPDK rings) - in this case, 4 DPDK rings are used
164+
`./ipfixprobe -i 'dpdk-ring;r=rx_ipfixprobe_0;e= --proc-type=secondary' -i 'dpdk-ring;r=rx_ipfixprobe_1' -i 'dpdk-ring;r=rx_ipfixprobe_2' -i 'dpdk-ring;r=rx_ipfixprobe_3' -o 'text'`
162165
```
163166

164167
## Flow Data Extension - Processing Plugins

input/dpdk-ring.cpp

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/**
2+
* \file dpdk-ring.cpp
3+
* \brief DPDK ring input interface for ipfixprobe (secondary DPDK app).
4+
* \author Jaroslav Pesek <[email protected]>
5+
* \date 2023
6+
*/
7+
/*
8+
* Copyright (C) 2021 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*
25+
* ALTERNATIVELY, provided that this notice is retained in full, this
26+
* product may be distributed under the terms of the GNU General Public
27+
* License (GPL) version 2 or later, in which case the provisions
28+
* of the GPL apply INSTEAD OF those given above.
29+
*
30+
* This software is provided ``as is'', and any express or implied
31+
* warranties, including, but not limited to, the implied warranties of
32+
* merchantability and fitness for a particular purpose are disclaimed.
33+
* In no event shall the company or contributors be liable for any
34+
* direct, indirect, incidental, special, exemplary, or consequential
35+
* damages (including, but not limited to, procurement of substitute
36+
* goods or services; loss of use, data, or profits; or business
37+
* interruption) however caused and on any theory of liability, whether
38+
* in contract, strict liability, or tort (including negligence or
39+
* otherwise) arising in any way out of the use of this software, even
40+
* if advised of the possibility of such damage.
41+
*
42+
*/
43+
#include <cstring>
44+
#include <mutex>
45+
#include <rte_ethdev.h>
46+
#include <rte_version.h>
47+
#include <unistd.h>
48+
#include <rte_eal.h>
49+
#include <rte_errno.h>
50+
51+
#include "dpdk-ring.h"
52+
#include "parser.hpp"
53+
54+
namespace ipxp {
55+
__attribute__((constructor)) static void register_this_plugin()
56+
{
57+
static PluginRecord rec = PluginRecord("dpdk-ring", []() { return new DpdkRingReader(); });
58+
register_plugin(&rec);
59+
}
60+
61+
DpdkRingCore* DpdkRingCore::m_instance = nullptr;
62+
63+
DpdkRingCore& DpdkRingCore::getInstance()
64+
{
65+
if (!m_instance) {
66+
m_instance = new DpdkRingCore();
67+
}
68+
return *m_instance;
69+
}
70+
71+
DpdkRingCore::~DpdkRingCore()
72+
{
73+
rte_eal_cleanup();
74+
m_instance = nullptr;
75+
}
76+
77+
void DpdkRingCore::deinit()
78+
{
79+
if (m_instance) {
80+
delete m_instance;
81+
m_instance = nullptr;
82+
}
83+
}
84+
85+
void DpdkRingCore::configure(const char* params) {
86+
if (isConfigured) {
87+
return;
88+
}
89+
90+
try {
91+
parser.parse(params);
92+
} catch (ParserError& e) {
93+
throw PluginError(e.what());
94+
}
95+
96+
configureEal(parser.eal_params());
97+
isConfigured = true;
98+
}
99+
100+
std::vector<char *> DpdkRingCore::convertStringToArgvFormat(const std::string& ealParams)
101+
{
102+
// set first value as program name (argv[0])
103+
std::vector<char *> args = {"ipfixprobe"};
104+
std::istringstream iss(ealParams);
105+
std::string token;
106+
107+
while(iss >> token) {
108+
char *arg = new char[token.size() + 1];
109+
copy(token.begin(), token.end(), arg);
110+
arg[token.size()] = '\0';
111+
args.push_back(arg);
112+
}
113+
return args;
114+
}
115+
116+
void DpdkRingCore::configureEal(const std::string& ealParams)
117+
{
118+
std::vector<char *> args = convertStringToArgvFormat(ealParams);
119+
120+
if (rte_eal_init(args.size(), args.data()) < 0) {
121+
rte_exit(EXIT_FAILURE, "Cannot initialize RTE_EAL: %s\n", rte_strerror(rte_errno));
122+
}
123+
}
124+
125+
DpdkRingReader::DpdkRingReader()
126+
: m_dpdkRingCore(DpdkRingCore::getInstance())
127+
{
128+
pkts_read_ = 0;
129+
}
130+
131+
DpdkRingReader::~DpdkRingReader()
132+
{
133+
m_dpdkRingCore.deinit();
134+
}
135+
136+
void DpdkRingReader::createRteMbufs(uint16_t mbufsSize)
137+
{
138+
try {
139+
mbufs_.resize(mbufsSize);
140+
} catch (const std::exception& e) {
141+
throw PluginError(e.what());
142+
}
143+
}
144+
145+
void DpdkRingReader::init(const char* params)
146+
{
147+
m_dpdkRingCore.configure(params);
148+
DpdkRingOptParser parser;
149+
try {
150+
parser.parse(params);
151+
} catch (ParserError& e) {
152+
throw PluginError(e.what());
153+
}
154+
createRteMbufs(m_dpdkRingCore.parser.pkt_buffer_size());
155+
m_ring = rte_ring_lookup(parser.ring_name().c_str());
156+
if (!m_ring) {
157+
throw PluginError("Cannot find ring with name: " + parser.ring_name());
158+
} else {
159+
is_reader_ready = true;
160+
}
161+
}
162+
163+
struct timeval DpdkRingReader::getTimestamp(rte_mbuf* mbuf)
164+
{
165+
struct timeval tv;
166+
auto now = std::chrono::system_clock::now();
167+
auto now_t = std::chrono::system_clock::to_time_t(now);
168+
169+
auto dur = now - std::chrono::system_clock::from_time_t(now_t);
170+
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
171+
172+
tv.tv_sec = now_t;
173+
tv.tv_usec = micros;
174+
return tv;
175+
}
176+
177+
InputPlugin::Result DpdkRingReader::get(PacketBlock& packets)
178+
{
179+
while (is_reader_ready == false) {
180+
usleep(1000);
181+
}
182+
183+
parser_opt_t opt {&packets, false, false, 0};
184+
185+
packets.cnt = 0;
186+
for (auto i = 0; i < pkts_read_; i++) {
187+
rte_pktmbuf_free(mbufs_[i]);
188+
}
189+
pkts_read_ = rte_ring_dequeue_burst(
190+
m_ring,
191+
reinterpret_cast<void**>(mbufs_.data()),
192+
mbufs_.capacity(),
193+
nullptr);
194+
if (pkts_read_ == 0) {
195+
return Result::TIMEOUT;
196+
}
197+
for (auto i = 0; i < pkts_read_; i++) {
198+
parse_packet(&opt,
199+
getTimestamp(mbufs_[i]),
200+
rte_pktmbuf_mtod(mbufs_[i], const std::uint8_t*),
201+
rte_pktmbuf_data_len(mbufs_[i]),
202+
rte_pktmbuf_data_len(mbufs_[i]));
203+
m_seen++;
204+
m_parsed++;
205+
}
206+
return Result::PARSED;
207+
}
208+
} // namespace ipxp

input/dpdk-ring.h

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* \file dpdk-ring.h
3+
* \brief DPDK ring input interface for ipfixprobe (secondary DPDK app).
4+
* \author Jaroslav Pesek <[email protected]>
5+
* \date 2023
6+
*/
7+
/*
8+
* Copyright (C) 2021 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*
25+
* ALTERNATIVELY, provided that this notice is retained in full, this
26+
* product may be distributed under the terms of the GNU General Public
27+
* License (GPL) version 2 or later, in which case the provisions
28+
* of the GPL apply INSTEAD OF those given above.
29+
*
30+
* This software is provided ``as is'', and any express or implied
31+
* warranties, including, but not limited to, the implied warranties of
32+
* merchantability and fitness for a particular purpose are disclaimed.
33+
* In no event shall the company or contributors be liable for any
34+
* direct, indirect, incidental, special, exemplary, or consequential
35+
* damages (including, but not limited to, procurement of substitute
36+
* goods or services; loss of use, data, or profits; or business
37+
* interruption) however caused and on any theory of liability, whether
38+
* in contract, strict liability, or tort (including negligence or
39+
* otherwise) arising in any way out of the use of this software, even
40+
* if advised of the possibility of such damage.
41+
*
42+
*/
43+
#include <config.h>
44+
#ifdef WITH_DPDK
45+
46+
#ifndef IPXP_DPDK_RING_READER_H
47+
#define IPXP_DPDK_RING_READER_H
48+
49+
#include <ipfixprobe/input.hpp>
50+
#include <ipfixprobe/utils.hpp>
51+
52+
#include <memory>
53+
#include <rte_mbuf.h>
54+
#include <rte_ring.h>
55+
#include <sstream>
56+
57+
namespace ipxp {
58+
class DpdkRingOptParser : public OptionsParser {
59+
private:
60+
static constexpr size_t DEFAULT_MBUF_BURST_SIZE = 256;
61+
size_t pkt_buffer_size_;
62+
63+
std::string ring_name_;
64+
std::string eal_;
65+
public:
66+
DpdkRingOptParser()
67+
: OptionsParser("dpdk-ring", "DPDK ring input interface for ipfixprobe (secondary DPDK app).")
68+
, pkt_buffer_size_(DEFAULT_MBUF_BURST_SIZE)
69+
{
70+
register_option(
71+
"b",
72+
"bsize",
73+
"SIZE",
74+
"Size of the MBUF packet buffer. Default: " + std::to_string(DEFAULT_MBUF_BURST_SIZE),
75+
[this](const char* arg) {try{pkt_buffer_size_ = str2num<decltype(pkt_buffer_size_)>(arg);} catch (std::invalid_argument&){return false;} return true; },
76+
RequiredArgument);
77+
register_option(
78+
"r",
79+
"ring",
80+
"RING",
81+
"Name of the ring to read packets from. Need to be specified explicitly thus no default provided.",
82+
[this](const char* arg) {ring_name_ = arg; return true;},
83+
OptionFlags::RequiredArgument);
84+
register_option(
85+
"e",
86+
"eal",
87+
"EAL",
88+
"DPDK eal",
89+
[this](const char *arg){eal_ = arg; return true;},
90+
OptionFlags::RequiredArgument);
91+
92+
}
93+
size_t pkt_buffer_size() const { return pkt_buffer_size_; }
94+
95+
std::string ring_name() const { return ring_name_; }
96+
97+
std::string eal_params() const { return eal_; }
98+
};
99+
100+
class DpdkRingCore {
101+
public:
102+
/**
103+
* @brief Configure DPDK secondary process.
104+
*
105+
* @param eal_params DPDK EAL parameters.
106+
*/
107+
void configure(const char* params);
108+
109+
/**
110+
* @brief Get the singleton dpdk core instance
111+
*/
112+
static DpdkRingCore& getInstance();
113+
void deinit();
114+
115+
DpdkRingOptParser parser;
116+
117+
private:
118+
std::vector<char *> convertStringToArgvFormat(const std::string& ealParams);
119+
void configureEal(const std::string& ealParams);
120+
~DpdkRingCore();
121+
bool isConfigured = false;
122+
static DpdkRingCore* m_instance;
123+
};
124+
125+
class DpdkRingReader : public InputPlugin {
126+
public:
127+
Result get(PacketBlock& packets) override;
128+
129+
void init(const char* params) override;
130+
131+
OptionsParser* get_parser() const override
132+
{
133+
return new DpdkRingOptParser();
134+
}
135+
136+
std::string get_name() const override
137+
{
138+
return "dpdk-ring";
139+
}
140+
141+
~DpdkRingReader();
142+
DpdkRingReader();
143+
private:
144+
std::vector<rte_mbuf*> mbufs_;
145+
std::uint16_t pkts_read_;
146+
147+
void createRteMbufs(uint16_t mbufsSize);
148+
struct timeval getTimestamp(rte_mbuf* mbuf);
149+
DpdkRingCore& m_dpdkRingCore;
150+
rte_ring* m_ring;
151+
bool is_reader_ready = false;
152+
153+
154+
};
155+
} // namespace ipxp
156+
157+
#endif // IPXP_DPDK_RING_READER_H
158+
#endif

0 commit comments

Comments
 (0)