Skip to content

Commit 0178a55

Browse files
authored
Merge pull request #141 from CESNET/dpdk-ring-reader
DPDK ring reader
2 parents 1445d7f + bdbaa44 commit 0178a55

File tree

4 files changed

+338
-1
lines changed

4 files changed

+338
-1
lines changed

Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ endif
157157
if WITH_DPDK
158158
ipfixprobe_input_src+=\
159159
input/dpdk.cpp \
160-
input/dpdk.h
160+
input/dpdk.h \
161+
input/dpdk-ring.cpp \
162+
input/dpdk-ring.h
161163
endif
162164

163165
ipfixprobe_headers_src=\

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ Here are the examples of various plugins usage:
159159
# The following `dpdk` interfaces are given without parameters; their configuration is inherited from the first one.
160160
# Example for the queue of 3 DPDK input plugins (q=3):
161161
`./ipfixprobe -i "dpdk;p=0;q=3;e=-c 0x1 -a <[domain:]bus:devid.func>" -i dpdk -i dpdk -p http "-p" bstats -p tls -o "ipfix;h=127.0.0.1"`
162+
163+
# Read packets using DPDK input interface as secondary process with shared memory (DPDK rings) - in this case, 4 DPDK rings are used
164+
`./ipfixprobe -i 'dpdk-ring;r=rx_ipfixprobe_0;e= --proc-type=secondary' -i 'dpdk-ring;r=rx_ipfixprobe_1' -i 'dpdk-ring;r=rx_ipfixprobe_2' -i 'dpdk-ring;r=rx_ipfixprobe_3' -o 'text'`
162165
```
163166

164167
## Flow Data Extension - Processing Plugins

input/dpdk-ring.cpp

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/**
2+
* \file dpdk-ring.cpp
3+
* \brief DPDK ring input interface for ipfixprobe (secondary DPDK app).
4+
* \author Jaroslav Pesek <[email protected]>
5+
* \date 2023
6+
*/
7+
/*
8+
* Copyright (C) 2023 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*
25+
*/
26+
#include <cstring>
27+
#include <mutex>
28+
#include <rte_ethdev.h>
29+
#include <rte_version.h>
30+
#include <unistd.h>
31+
#include <rte_eal.h>
32+
#include <rte_errno.h>
33+
34+
#include "dpdk-ring.h"
35+
#include "parser.hpp"
36+
37+
namespace ipxp {
38+
__attribute__((constructor)) static void register_this_plugin()
39+
{
40+
static PluginRecord rec = PluginRecord("dpdk-ring", []() { return new DpdkRingReader(); });
41+
register_plugin(&rec);
42+
}
43+
44+
DpdkRingCore *DpdkRingCore::m_instance = nullptr;
45+
46+
DpdkRingCore &DpdkRingCore::getInstance()
47+
{
48+
if (!m_instance) {
49+
m_instance = new DpdkRingCore();
50+
}
51+
return *m_instance;
52+
}
53+
54+
DpdkRingCore::~DpdkRingCore()
55+
{
56+
rte_eal_cleanup();
57+
m_instance = nullptr;
58+
}
59+
60+
void DpdkRingCore::deinit()
61+
{
62+
if (m_instance) {
63+
delete m_instance;
64+
m_instance = nullptr;
65+
}
66+
}
67+
68+
void DpdkRingCore::configure(const char* params) {
69+
if (isConfigured) {
70+
return;
71+
}
72+
73+
try {
74+
parser.parse(params);
75+
} catch (ParserError& e) {
76+
throw PluginError(e.what());
77+
}
78+
79+
configureEal(parser.eal_params());
80+
isConfigured = true;
81+
}
82+
83+
std::vector<char *> DpdkRingCore::convertStringToArgvFormat(const std::string& ealParams)
84+
{
85+
// set first value as program name (argv[0])
86+
std::vector<char *> args = {"ipfixprobe"};
87+
std::istringstream iss(ealParams);
88+
std::string token;
89+
90+
while(iss >> token) {
91+
char *arg = new char[token.size() + 1];
92+
copy(token.begin(), token.end(), arg);
93+
arg[token.size()] = '\0';
94+
args.push_back(arg);
95+
}
96+
return args;
97+
}
98+
99+
void DpdkRingCore::configureEal(const std::string& ealParams)
100+
{
101+
std::vector<char *> args = convertStringToArgvFormat(ealParams);
102+
103+
if (rte_eal_init(args.size(), args.data()) < 0) {
104+
rte_exit(EXIT_FAILURE, "Cannot initialize RTE_EAL: %s\n", rte_strerror(rte_errno));
105+
}
106+
}
107+
108+
DpdkRingReader::DpdkRingReader()
109+
: m_dpdkRingCore(DpdkRingCore::getInstance())
110+
{
111+
pkts_read_ = 0;
112+
}
113+
114+
DpdkRingReader::~DpdkRingReader()
115+
{
116+
m_dpdkRingCore.deinit();
117+
}
118+
119+
void DpdkRingReader::createRteMbufs(uint16_t mbufsSize)
120+
{
121+
try {
122+
mbufs_.resize(mbufsSize);
123+
} catch (const std::exception& e) {
124+
throw PluginError(e.what());
125+
}
126+
}
127+
128+
void DpdkRingReader::init(const char* params)
129+
{
130+
m_dpdkRingCore.configure(params);
131+
DpdkRingOptParser parser;
132+
try {
133+
parser.parse(params);
134+
} catch (ParserError& e) {
135+
throw PluginError(e.what());
136+
}
137+
createRteMbufs(m_dpdkRingCore.parser.pkt_buffer_size());
138+
m_ring = rte_ring_lookup(parser.ring_name().c_str());
139+
if (!m_ring) {
140+
throw PluginError("Cannot find ring with name: " + parser.ring_name());
141+
} else {
142+
is_reader_ready = true;
143+
}
144+
}
145+
146+
struct timeval DpdkRingReader::getTimestamp(rte_mbuf* mbuf)
147+
{
148+
struct timeval tv;
149+
auto now = std::chrono::system_clock::now();
150+
auto now_t = std::chrono::system_clock::to_time_t(now);
151+
152+
auto dur = now - std::chrono::system_clock::from_time_t(now_t);
153+
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
154+
155+
tv.tv_sec = now_t;
156+
tv.tv_usec = micros;
157+
return tv;
158+
}
159+
160+
InputPlugin::Result DpdkRingReader::get(PacketBlock& packets)
161+
{
162+
while (is_reader_ready == false) {
163+
usleep(1000);
164+
}
165+
166+
parser_opt_t opt {&packets, false, false, 0};
167+
168+
packets.cnt = 0;
169+
for (auto i = 0; i < pkts_read_; i++) {
170+
rte_pktmbuf_free(mbufs_[i]);
171+
}
172+
pkts_read_ = rte_ring_dequeue_burst(
173+
m_ring,
174+
reinterpret_cast<void**>(mbufs_.data()),
175+
mbufs_.capacity(),
176+
nullptr);
177+
if (pkts_read_ == 0) {
178+
return Result::TIMEOUT;
179+
}
180+
for (auto i = 0; i < pkts_read_; i++) {
181+
parse_packet(&opt,
182+
getTimestamp(mbufs_[i]),
183+
rte_pktmbuf_mtod(mbufs_[i], const std::uint8_t*),
184+
rte_pktmbuf_data_len(mbufs_[i]),
185+
rte_pktmbuf_data_len(mbufs_[i]));
186+
m_seen++;
187+
m_parsed++;
188+
}
189+
return Result::PARSED;
190+
}
191+
} // namespace ipxp

input/dpdk-ring.h

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* \file dpdk-ring.h
3+
* \brief DPDK ring input interface for ipfixprobe (secondary DPDK app).
4+
* \author Jaroslav Pesek <[email protected]>
5+
* \date 2023
6+
*/
7+
/*
8+
* Copyright (C) 2023 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*
25+
*/
26+
#include <config.h>
27+
#ifdef WITH_DPDK
28+
29+
#ifndef IPXP_DPDK_RING_READER_H
30+
#define IPXP_DPDK_RING_READER_H
31+
32+
#include <ipfixprobe/input.hpp>
33+
#include <ipfixprobe/utils.hpp>
34+
35+
#include <memory>
36+
#include <rte_mbuf.h>
37+
#include <rte_ring.h>
38+
#include <sstream>
39+
40+
namespace ipxp {
41+
class DpdkRingOptParser : public OptionsParser {
42+
private:
43+
static constexpr size_t DEFAULT_MBUF_BURST_SIZE = 256;
44+
size_t pkt_buffer_size_;
45+
46+
std::string ring_name_;
47+
std::string eal_;
48+
public:
49+
DpdkRingOptParser()
50+
: OptionsParser("dpdk-ring", "DPDK ring input interface for ipfixprobe (secondary DPDK app).")
51+
, pkt_buffer_size_(DEFAULT_MBUF_BURST_SIZE)
52+
{
53+
register_option(
54+
"b",
55+
"bsize",
56+
"SIZE",
57+
"Size of the MBUF packet buffer. Default: " + std::to_string(DEFAULT_MBUF_BURST_SIZE),
58+
[this](const char* arg) {try{pkt_buffer_size_ = str2num<decltype(pkt_buffer_size_)>(arg);} catch (std::invalid_argument&){return false;} return true; },
59+
RequiredArgument);
60+
register_option(
61+
"r",
62+
"ring",
63+
"RING",
64+
"Name of the ring to read packets from. Need to be specified explicitly thus no default provided.",
65+
[this](const char* arg) {ring_name_ = arg; return true;},
66+
OptionFlags::RequiredArgument);
67+
register_option(
68+
"e",
69+
"eal",
70+
"EAL",
71+
"DPDK eal",
72+
[this](const char *arg){eal_ = arg; return true;},
73+
OptionFlags::RequiredArgument);
74+
75+
}
76+
size_t pkt_buffer_size() const { return pkt_buffer_size_; }
77+
78+
std::string ring_name() const { return ring_name_; }
79+
80+
std::string eal_params() const { return eal_; }
81+
};
82+
83+
class DpdkRingCore {
84+
public:
85+
/**
86+
* @brief Configure DPDK secondary process.
87+
*
88+
* @param eal_params DPDK EAL parameters.
89+
*/
90+
void configure(const char *params);
91+
92+
/**
93+
* @brief Get the singleton dpdk core instance
94+
*/
95+
static DpdkRingCore &getInstance();
96+
void deinit();
97+
98+
DpdkRingOptParser parser;
99+
100+
private:
101+
std::vector<char *> convertStringToArgvFormat(const std::string &ealParams);
102+
void configureEal(const std::string &ealParams);
103+
~DpdkRingCore();
104+
bool isConfigured = false;
105+
static DpdkRingCore *m_instance;
106+
};
107+
108+
class DpdkRingReader : public InputPlugin {
109+
public:
110+
Result get(PacketBlock &packets) override;
111+
112+
void init(const char* params) override;
113+
114+
OptionsParser* get_parser() const override
115+
{
116+
return new DpdkRingOptParser();
117+
}
118+
119+
std::string get_name() const override
120+
{
121+
return "dpdk-ring";
122+
}
123+
124+
~DpdkRingReader();
125+
DpdkRingReader();
126+
private:
127+
std::vector<rte_mbuf *> mbufs_;
128+
std::uint16_t pkts_read_;
129+
130+
void createRteMbufs(uint16_t mbufsSize);
131+
struct timeval getTimestamp(rte_mbuf *mbuf);
132+
DpdkRingCore &m_dpdkRingCore;
133+
rte_ring *m_ring;
134+
bool is_reader_ready = false;
135+
136+
137+
};
138+
} // namespace ipxp
139+
140+
#endif // IPXP_DPDK_RING_READER_H
141+
#endif

0 commit comments

Comments
 (0)