Skip to content

Commit 83da2eb

Browse files
committed
dpdk: add DPDK port telemetry
1 parent 174321f commit 83da2eb

File tree

5 files changed

+400
-0
lines changed

5 files changed

+400
-0
lines changed

Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ ipfixprobe_input_src+=\
181181
input/dpdk/dpdkDevice.hpp \
182182
input/dpdk/dpdkDevice.cpp \
183183
input/dpdk/dpdkCompat.hpp \
184+
input/dpdk/dpdkPortTelemetry.hpp \
185+
input/dpdk/dpdkPortTelemetry.cpp \
184186
input/dpdk.cpp \
185187
input/dpdk.h \
186188
input/dpdk-ring.cpp \

input/dpdk.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ void DpdkReader::configure_telemetry_dirs(
237237
auto port_dir = ports_dir->addDir(std::to_string(portID));
238238
telemetry::FileOps statsOps = {[=]() { return get_port_telemetry(portID); }, nullptr};
239239
register_file(port_dir, "stats", statsOps);
240+
m_portsTelemetry.emplace_back(portID, port_dir);
240241
}
241242

242243
telemetry::FileOps statsOps = {[=]() { return get_queue_telemetry(); }, nullptr};

input/dpdk.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#define IPXP_DPDK_READER_H
3232

3333
#include "dpdk/dpdkDevice.hpp"
34+
#include "dpdk/dpdkPortTelemetry.hpp"
3435

3536
#include <ipfixprobe/input.hpp>
3637
#include <ipfixprobe/utils.hpp>
@@ -226,6 +227,8 @@ class DpdkReader : public InputPlugin {
226227
telemetry::Content get_queue_telemetry();
227228
telemetry::Content get_port_telemetry(uint16_t portNumber);
228229

230+
std::vector<DpdkPortTelemetry> m_portsTelemetry;
231+
229232
struct DpdkRxStats {
230233
uint64_t receivedPackets;
231234
uint64_t receivedBytes;

input/dpdk/dpdkPortTelemetry.cpp

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
/**
2+
* \file
3+
* \brief Implementation of the DpdkPortTelemetry class and related helper functions
4+
* \author Pavel Siska <[email protected]>
5+
* \date 2024
6+
*/
7+
/*
8+
* Copyright (C) 2024 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*/
25+
26+
#include "dpdkPortTelemetry.hpp"
27+
#include "dpdkCompat.hpp"
28+
29+
#include <algorithm>
30+
#include <array>
31+
#include <iomanip>
32+
#include <limits>
33+
#include <numeric>
34+
#include <sstream>
35+
#include <stdexcept>
36+
#include <string>
37+
#include <vector>
38+
39+
#include <rte_ethdev.h>
40+
#include <rte_version.h>
41+
42+
namespace ipxp {
43+
44+
static struct rte_eth_dev_info getDeviceInfo(uint16_t portId)
45+
{
46+
struct rte_eth_dev_info devInfo;
47+
48+
const int ret = rte_eth_dev_info_get(portId, &devInfo);
49+
if (ret < 0) {
50+
throw std::runtime_error("getDeviceInfo() has failed");
51+
}
52+
53+
return devInfo;
54+
}
55+
56+
static std::string getDeviceNameByPortId(uint16_t portId)
57+
{
58+
std::array<char, RTE_ETH_NAME_MAX_LEN> deviceName;
59+
60+
const int ret = rte_eth_dev_get_name_by_port(portId, deviceName.data());
61+
if (ret < 0) {
62+
return "";
63+
}
64+
65+
return {deviceName.data()};
66+
}
67+
68+
static std::string getRssHashKeyByPortId(uint16_t portId)
69+
{
70+
uint8_t rssHashKeySize = 0;
71+
try {
72+
rssHashKeySize = getDeviceInfo(portId).hash_key_size;
73+
} catch (const std::exception& ex) {
74+
return "";
75+
}
76+
77+
std::vector<uint8_t> rssHashKey(rssHashKeySize);
78+
79+
struct rte_eth_rss_conf rssConf = {};
80+
rssConf.rss_key = rssHashKey.data();
81+
rssConf.rss_key_len = rssHashKeySize;
82+
83+
const int ret = rte_eth_dev_rss_hash_conf_get(portId, &rssConf);
84+
if (ret < 0) {
85+
return "";
86+
}
87+
88+
std::ostringstream oss;
89+
for (const auto& byte : rssHashKey) {
90+
oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(byte);
91+
}
92+
return oss.str();
93+
}
94+
95+
static std::string getRssHashByPortId(uint16_t portId)
96+
{
97+
struct rte_eth_rss_conf rssConf = {};
98+
rssConf.rss_key = nullptr;
99+
rssConf.rss_key_len = 0;
100+
101+
const int ret = rte_eth_dev_rss_hash_conf_get(portId, &rssConf);
102+
if (ret < 0) {
103+
return "";
104+
}
105+
106+
std::vector<std::string> rssHashes;
107+
108+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV4) != 0U) {
109+
rssHashes.emplace_back("IPV4");
110+
}
111+
if ((rssConf.rss_hf & RTE_ETH_RSS_FRAG_IPV4) != 0U) {
112+
rssHashes.emplace_back("FRAG_IPV4");
113+
}
114+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_TCP) != 0U) {
115+
rssHashes.emplace_back("NONFRAG_IPV4_TCP");
116+
}
117+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_UDP) != 0U) {
118+
rssHashes.emplace_back("NONFRAG_IPV4_UDP");
119+
}
120+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_SCTP) != 0U) {
121+
rssHashes.emplace_back("NONFRAG_IPV4_SCTP");
122+
}
123+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_OTHER) != 0U) {
124+
rssHashes.emplace_back("NONFRAG_IPV4_OTHER");
125+
}
126+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6) != 0U) {
127+
rssHashes.emplace_back("IPV6");
128+
}
129+
if ((rssConf.rss_hf & RTE_ETH_RSS_FRAG_IPV6) != 0U) {
130+
rssHashes.emplace_back("FRAG_IPV6");
131+
}
132+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_TCP) != 0U) {
133+
rssHashes.emplace_back("NONFRAG_IPV6_TCP");
134+
}
135+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_UDP) != 0U) {
136+
rssHashes.emplace_back("NONFRAG_IPV6_UDP");
137+
}
138+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_SCTP) != 0U) {
139+
rssHashes.emplace_back("NONFRAG_IPV6_SCTP");
140+
}
141+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_OTHER) != 0U) {
142+
rssHashes.emplace_back("NONFRAG_IPV6_OTHER");
143+
}
144+
if ((rssConf.rss_hf & RTE_ETH_RSS_L2_PAYLOAD) != 0U) {
145+
rssHashes.emplace_back("L2_PAYLOAD");
146+
}
147+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6_EX) != 0U) {
148+
rssHashes.emplace_back("IPV6_EX");
149+
}
150+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6_TCP_EX) != 0U) {
151+
rssHashes.emplace_back("IPV6_TCP_EX");
152+
}
153+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6_UDP_EX) != 0U) {
154+
rssHashes.emplace_back("IPV6_UDP_EX");
155+
}
156+
if ((rssConf.rss_hf & RTE_ETH_RSS_PORT) != 0U) {
157+
rssHashes.emplace_back("PORT");
158+
}
159+
if ((rssConf.rss_hf & RTE_ETH_RSS_VXLAN) != 0U) {
160+
rssHashes.emplace_back("VXLAN");
161+
}
162+
if ((rssConf.rss_hf & RTE_ETH_RSS_GENEVE) != 0U) {
163+
rssHashes.emplace_back("GENEVE");
164+
}
165+
if ((rssConf.rss_hf & RTE_ETH_RSS_NVGRE) != 0U) {
166+
rssHashes.emplace_back("NVGRE");
167+
}
168+
#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
169+
if ((rssConf.rss_hf & RTE_ETH_RSS_MPLS) != 0U) {
170+
rssHashes.emplace_back("MPLS");
171+
}
172+
#endif
173+
174+
const std::string concatenatedRssHash = std::accumulate(
175+
rssHashes.begin(),
176+
rssHashes.end(),
177+
std::string {},
178+
[](const std::string& str1, const std::string& str2) {
179+
return str1.empty() ? str2 : str1 + ", " + str2;
180+
});
181+
182+
return concatenatedRssHash;
183+
}
184+
185+
static telemetry::Dict getDeviceStatsByPortId(uint16_t portId)
186+
{
187+
struct rte_eth_stats stats;
188+
const int ret = rte_eth_stats_get(portId, &stats);
189+
if (ret < 0) {
190+
return {};
191+
}
192+
193+
telemetry::Dict statsDict = {
194+
{"rx-ipackets", stats.ipackets},
195+
{"rx-ibytes", stats.ibytes},
196+
{"rx-imissed", stats.imissed},
197+
{"rx-ierrors", stats.ierrors},
198+
{"rx-nombuf", stats.rx_nombuf},
199+
{"tx-opackets", stats.opackets},
200+
{"tx-obytes", stats.obytes},
201+
{"tx-oerrors", stats.oerrors},
202+
};
203+
204+
return statsDict;
205+
}
206+
207+
static telemetry::Dict getDeviceQueueStatsByPortId(uint16_t portId)
208+
{
209+
struct rte_eth_stats stats;
210+
const int ret = rte_eth_stats_get(portId, &stats);
211+
if (ret < 0) {
212+
return {};
213+
}
214+
215+
const rte_eth_dev_info devInfo = getDeviceInfo(portId);
216+
217+
uint16_t maxQueuesCount;
218+
if (RTE_ETHDEV_QUEUE_STAT_CNTRS > std::numeric_limits<uint16_t>::max()) {
219+
maxQueuesCount = std::numeric_limits<uint16_t>::max();
220+
} else {
221+
maxQueuesCount = static_cast<uint16_t>(RTE_ETHDEV_QUEUE_STAT_CNTRS);
222+
}
223+
224+
const uint16_t rxQueuesCount = std::min(maxQueuesCount, devInfo.nb_rx_queues);
225+
const uint16_t txQueuesCount = std::min(maxQueuesCount, devInfo.nb_tx_queues);
226+
227+
telemetry::Dict dict;
228+
229+
for (uint16_t queueId = 0; queueId < rxQueuesCount; queueId++) {
230+
const std::string queueIdName = std::to_string(queueId);
231+
dict[queueIdName + "-rx-ipackets"] = stats.q_ipackets[queueId];
232+
dict[queueIdName + "-rx-ibytes"] = stats.q_ibytes[queueId];
233+
dict[queueIdName + "-rx-ierrors"] = stats.q_errors[queueId];
234+
}
235+
236+
for (uint16_t queueId = 0; queueId < txQueuesCount; queueId++) {
237+
const std::string queueIdName = std::to_string(queueId);
238+
dict[queueIdName + "-tx-opackets"] = stats.q_opackets[queueId];
239+
dict[queueIdName + "-tx-obytes"] = stats.q_obytes[queueId];
240+
}
241+
242+
return dict;
243+
}
244+
245+
static telemetry::Dict getDeviceXStatsByPortId(uint16_t portId)
246+
{
247+
int ret;
248+
ret = rte_eth_xstats_get_names(portId, nullptr, 0);
249+
if (ret < 0) {
250+
return {};
251+
}
252+
253+
const auto count = static_cast<unsigned int>(ret);
254+
255+
std::vector<rte_eth_xstat_name> xstatsNames(count);
256+
std::vector<rte_eth_xstat> xstats(count);
257+
258+
ret = rte_eth_xstats_get_names(portId, xstatsNames.data(), count);
259+
if (ret < 0) {
260+
return {};
261+
}
262+
263+
ret = rte_eth_xstats_get(portId, xstats.data(), count);
264+
if (ret < 0) {
265+
return {};
266+
}
267+
268+
telemetry::Dict dict;
269+
for (unsigned int idx = 0; idx < count; idx++) {
270+
dict[xstatsNames[idx].name] = xstats[idx].value;
271+
}
272+
273+
return dict;
274+
}
275+
276+
struct AppFsFile {
277+
std::string name;
278+
telemetry::FileOps ops;
279+
};
280+
281+
static std::vector<AppFsFile> getAppFsFiles(uint16_t portId)
282+
{
283+
std::vector<AppFsFile> files = {
284+
{
285+
.name = "devname",
286+
.ops = {
287+
.read = [portId]() { return getDeviceNameByPortId(portId); },
288+
},
289+
},
290+
{
291+
.name = "rss_hash_key",
292+
.ops = {
293+
.read = [portId]() { return getRssHashKeyByPortId(portId); },
294+
},
295+
},
296+
{
297+
.name = "rss_hash",
298+
.ops = {
299+
.read = [portId]() { return getRssHashByPortId(portId); },
300+
},
301+
},
302+
{
303+
.name = "devstats",
304+
.ops = {
305+
.read = [portId]() { return getDeviceStatsByPortId(portId); },
306+
},
307+
},
308+
{
309+
.name = "devstats_queues",
310+
.ops = {
311+
.read = [portId]() { return getDeviceQueueStatsByPortId(portId); },
312+
},
313+
},
314+
{
315+
.name = "devxstats",
316+
.ops = {
317+
.read = [portId]() { return getDeviceXStatsByPortId(portId); },
318+
},
319+
},
320+
321+
};
322+
return files;
323+
}
324+
325+
DpdkPortTelemetry::DpdkPortTelemetry(
326+
uint16_t portId,
327+
const std::shared_ptr<telemetry::Directory>& dir)
328+
: M_PORT_ID(portId)
329+
{
330+
for (auto [name, ops] : getAppFsFiles(M_PORT_ID)) {
331+
auto file = dir->addFile(name, ops);
332+
m_holder.add(file);
333+
}
334+
}
335+
336+
} // namespace ct

0 commit comments

Comments
 (0)