Skip to content

Commit 816a7cd

Browse files
committed
Dpdk: add DPDK port telemetry
1 parent 10fea0b commit 816a7cd

File tree

5 files changed

+396
-0
lines changed

5 files changed

+396
-0
lines changed

Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ ipfixprobe_input_src+=\
180180
input/dpdk/dpdkMbuf.cpp \
181181
input/dpdk/dpdkDevice.hpp \
182182
input/dpdk/dpdkDevice.cpp \
183+
input/dpdk/dpdkPortTelemetry.hpp \
184+
input/dpdk/dpdkPortTelemetry.cpp \
183185
input/dpdk.cpp \
184186
input/dpdk.h \
185187
input/dpdk-ring.cpp \

input/dpdk.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ void DpdkReader::configure_telemetry_dirs(
237237
auto port_dir = ports_dir->addDir(std::to_string(portID));
238238
telemetry::FileOps statsOps = {[=]() { return get_port_telemetry(portID); }, nullptr};
239239
register_file(port_dir, "stats", statsOps);
240+
m_portsTelemetry.emplace_back(portID, port_dir);
240241
}
241242

242243
telemetry::FileOps statsOps = {[=]() { return get_queue_telemetry(); }, nullptr};

input/dpdk.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#define IPXP_DPDK_READER_H
3232

3333
#include "dpdk/dpdkDevice.hpp"
34+
#include "dpdk/dpdkPortTelemetry.hpp"
3435

3536
#include <ipfixprobe/input.hpp>
3637
#include <ipfixprobe/utils.hpp>
@@ -226,6 +227,8 @@ class DpdkReader : public InputPlugin {
226227
telemetry::Content get_queue_telemetry();
227228
telemetry::Content get_port_telemetry(uint16_t portNumber);
228229

230+
std::vector<DpdkPortTelemetry> m_portsTelemetry;
231+
229232
struct DpdkRxStats {
230233
uint64_t receivedPackets;
231234
uint64_t receivedBytes;

input/dpdk/dpdkPortTelemetry.cpp

Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/**
2+
* \file
3+
* \brief Implementation of the DpdkPortTelemetry class and related helper functions
4+
* \author Pavel Siska <[email protected]>
5+
* \date 2024
6+
*/
7+
/*
8+
* Copyright (C) 2024 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*/
25+
26+
#include "dpdkPortTelemetry.hpp"
27+
28+
#include <algorithm>
29+
#include <array>
30+
#include <iomanip>
31+
#include <limits>
32+
#include <numeric>
33+
#include <sstream>
34+
#include <stdexcept>
35+
#include <string>
36+
#include <vector>
37+
38+
#include <rte_ethdev.h>
39+
40+
namespace ipxp {
41+
42+
static struct rte_eth_dev_info getDeviceInfo(uint16_t portId)
43+
{
44+
struct rte_eth_dev_info devInfo;
45+
46+
const int ret = rte_eth_dev_info_get(portId, &devInfo);
47+
if (ret < 0) {
48+
throw std::runtime_error("PortManager::getDeviceInfo() has failed");
49+
}
50+
51+
return devInfo;
52+
}
53+
54+
static std::string getDeviceNameByPortId(uint16_t portId)
55+
{
56+
std::array<char, RTE_ETH_NAME_MAX_LEN> deviceName;
57+
58+
const int ret = rte_eth_dev_get_name_by_port(portId, deviceName.data());
59+
if (ret < 0) {
60+
return "";
61+
}
62+
63+
return {deviceName.data()};
64+
}
65+
66+
static std::string getRssHashKeyByPortId(uint16_t portId)
67+
{
68+
uint8_t rssHashKeySize = 0;
69+
try {
70+
rssHashKeySize = getDeviceInfo(portId).hash_key_size;
71+
} catch (const std::exception& ex) {
72+
return "";
73+
}
74+
75+
std::vector<uint8_t> rssHashKey(rssHashKeySize);
76+
77+
struct rte_eth_rss_conf rssConf = {};
78+
rssConf.rss_key = rssHashKey.data();
79+
rssConf.rss_key_len = rssHashKeySize;
80+
81+
const int ret = rte_eth_dev_rss_hash_conf_get(portId, &rssConf);
82+
if (ret < 0) {
83+
return "";
84+
}
85+
86+
std::ostringstream oss;
87+
for (const auto& byte : rssHashKey) {
88+
oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(byte);
89+
}
90+
return oss.str();
91+
}
92+
93+
static std::string getRssHashByPortId(uint16_t portId)
94+
{
95+
struct rte_eth_rss_conf rssConf = {};
96+
rssConf.rss_key = nullptr;
97+
rssConf.rss_key_len = 0;
98+
99+
const int ret = rte_eth_dev_rss_hash_conf_get(portId, &rssConf);
100+
if (ret < 0) {
101+
return "";
102+
}
103+
104+
std::vector<std::string> rssHashes;
105+
106+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV4) != 0U) {
107+
rssHashes.emplace_back("IPV4");
108+
}
109+
if ((rssConf.rss_hf & RTE_ETH_RSS_FRAG_IPV4) != 0U) {
110+
rssHashes.emplace_back("FRAG_IPV4");
111+
}
112+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_TCP) != 0U) {
113+
rssHashes.emplace_back("NONFRAG_IPV4_TCP");
114+
}
115+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_UDP) != 0U) {
116+
rssHashes.emplace_back("NONFRAG_IPV4_UDP");
117+
}
118+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_SCTP) != 0U) {
119+
rssHashes.emplace_back("NONFRAG_IPV4_SCTP");
120+
}
121+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV4_OTHER) != 0U) {
122+
rssHashes.emplace_back("NONFRAG_IPV4_OTHER");
123+
}
124+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6) != 0U) {
125+
rssHashes.emplace_back("IPV6");
126+
}
127+
if ((rssConf.rss_hf & RTE_ETH_RSS_FRAG_IPV6) != 0U) {
128+
rssHashes.emplace_back("FRAG_IPV6");
129+
}
130+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_TCP) != 0U) {
131+
rssHashes.emplace_back("NONFRAG_IPV6_TCP");
132+
}
133+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_UDP) != 0U) {
134+
rssHashes.emplace_back("NONFRAG_IPV6_UDP");
135+
}
136+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_SCTP) != 0U) {
137+
rssHashes.emplace_back("NONFRAG_IPV6_SCTP");
138+
}
139+
if ((rssConf.rss_hf & RTE_ETH_RSS_NONFRAG_IPV6_OTHER) != 0U) {
140+
rssHashes.emplace_back("NONFRAG_IPV6_OTHER");
141+
}
142+
if ((rssConf.rss_hf & RTE_ETH_RSS_L2_PAYLOAD) != 0U) {
143+
rssHashes.emplace_back("L2_PAYLOAD");
144+
}
145+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6_EX) != 0U) {
146+
rssHashes.emplace_back("IPV6_EX");
147+
}
148+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6_TCP_EX) != 0U) {
149+
rssHashes.emplace_back("IPV6_TCP_EX");
150+
}
151+
if ((rssConf.rss_hf & RTE_ETH_RSS_IPV6_UDP_EX) != 0U) {
152+
rssHashes.emplace_back("IPV6_UDP_EX");
153+
}
154+
if ((rssConf.rss_hf & RTE_ETH_RSS_PORT) != 0U) {
155+
rssHashes.emplace_back("PORT");
156+
}
157+
if ((rssConf.rss_hf & RTE_ETH_RSS_VXLAN) != 0U) {
158+
rssHashes.emplace_back("VXLAN");
159+
}
160+
if ((rssConf.rss_hf & RTE_ETH_RSS_GENEVE) != 0U) {
161+
rssHashes.emplace_back("GENEVE");
162+
}
163+
if ((rssConf.rss_hf & RTE_ETH_RSS_NVGRE) != 0U) {
164+
rssHashes.emplace_back("NVGRE");
165+
}
166+
if ((rssConf.rss_hf & RTE_ETH_RSS_MPLS) != 0U) {
167+
rssHashes.emplace_back("MPLS");
168+
}
169+
170+
const std::string concatenatedRssHash = std::accumulate(
171+
rssHashes.begin(),
172+
rssHashes.end(),
173+
std::string {},
174+
[](const std::string& str1, const std::string& str2) {
175+
return str1.empty() ? str2 : str1 + ", " + str2;
176+
});
177+
178+
return concatenatedRssHash;
179+
}
180+
181+
static telemetry::Dict getDeviceStatsByPortId(uint16_t portId)
182+
{
183+
struct rte_eth_stats stats;
184+
const int ret = rte_eth_stats_get(portId, &stats);
185+
if (ret < 0) {
186+
return {};
187+
}
188+
189+
telemetry::Dict statsDict = {
190+
{"rx-ipackets", stats.ipackets},
191+
{"rx-ibytes", stats.ibytes},
192+
{"rx-imissed", stats.imissed},
193+
{"rx-ierrors", stats.ierrors},
194+
{"rx-nombuf", stats.rx_nombuf},
195+
{"tx-opackets", stats.opackets},
196+
{"tx-obytes", stats.obytes},
197+
{"tx-oerrors", stats.oerrors},
198+
};
199+
200+
return statsDict;
201+
}
202+
203+
static telemetry::Dict getDeviceQueueStatsByPortId(uint16_t portId)
204+
{
205+
struct rte_eth_stats stats;
206+
const int ret = rte_eth_stats_get(portId, &stats);
207+
if (ret < 0) {
208+
return {};
209+
}
210+
211+
const rte_eth_dev_info devInfo = getDeviceInfo(portId);
212+
213+
uint16_t maxQueuesCount;
214+
if (RTE_ETHDEV_QUEUE_STAT_CNTRS > std::numeric_limits<uint16_t>::max()) {
215+
maxQueuesCount = std::numeric_limits<uint16_t>::max();
216+
} else {
217+
maxQueuesCount = static_cast<uint16_t>(RTE_ETHDEV_QUEUE_STAT_CNTRS);
218+
}
219+
220+
const uint16_t rxQueuesCount = std::min(maxQueuesCount, devInfo.nb_rx_queues);
221+
const uint16_t txQueuesCount = std::min(maxQueuesCount, devInfo.nb_tx_queues);
222+
223+
telemetry::Dict dict;
224+
225+
for (uint16_t queueId = 0; queueId < rxQueuesCount; queueId++) {
226+
const std::string queueIdName = std::to_string(queueId);
227+
dict[queueIdName + "-rx-ipackets"] = stats.q_ipackets[queueId];
228+
dict[queueIdName + "-rx-ibytes"] = stats.q_ibytes[queueId];
229+
dict[queueIdName + "-rx-ierrors"] = stats.q_errors[queueId];
230+
}
231+
232+
for (uint16_t queueId = 0; queueId < txQueuesCount; queueId++) {
233+
const std::string queueIdName = std::to_string(queueId);
234+
dict[queueIdName + "-tx-opackets"] = stats.q_opackets[queueId];
235+
dict[queueIdName + "-tx-obytes"] = stats.q_obytes[queueId];
236+
}
237+
238+
return dict;
239+
}
240+
241+
static telemetry::Dict getDeviceXStatsByPortId(uint16_t portId)
242+
{
243+
int ret;
244+
ret = rte_eth_xstats_get_names(portId, nullptr, 0);
245+
if (ret < 0) {
246+
return {};
247+
}
248+
249+
const auto count = static_cast<unsigned int>(ret);
250+
251+
std::vector<rte_eth_xstat_name> xstatsNames(count);
252+
std::vector<rte_eth_xstat> xstats(count);
253+
254+
ret = rte_eth_xstats_get_names(portId, xstatsNames.data(), count);
255+
if (ret < 0) {
256+
return {};
257+
}
258+
259+
ret = rte_eth_xstats_get(portId, xstats.data(), count);
260+
if (ret < 0) {
261+
return {};
262+
}
263+
264+
telemetry::Dict dict;
265+
for (unsigned int idx = 0; idx < count; idx++) {
266+
dict[xstatsNames[idx].name] = xstats[idx].value;
267+
}
268+
269+
return dict;
270+
}
271+
272+
struct AppFsFile {
273+
std::string name;
274+
telemetry::FileOps ops;
275+
};
276+
277+
static std::vector<AppFsFile> getAppFsFiles(uint16_t portId)
278+
{
279+
std::vector<AppFsFile> files = {
280+
{
281+
.name = "devname",
282+
.ops = {
283+
.read = [portId]() { return getDeviceNameByPortId(portId); },
284+
},
285+
},
286+
{
287+
.name = "rss_hash_key",
288+
.ops = {
289+
.read = [portId]() { return getRssHashKeyByPortId(portId); },
290+
},
291+
},
292+
{
293+
.name = "rss_hash",
294+
.ops = {
295+
.read = [portId]() { return getRssHashByPortId(portId); },
296+
},
297+
},
298+
{
299+
.name = "devstats",
300+
.ops = {
301+
.read = [portId]() { return getDeviceStatsByPortId(portId); },
302+
},
303+
},
304+
{
305+
.name = "devstats_queues",
306+
.ops = {
307+
.read = [portId]() { return getDeviceQueueStatsByPortId(portId); },
308+
},
309+
},
310+
{
311+
.name = "devxstats",
312+
.ops = {
313+
.read = [portId]() { return getDeviceXStatsByPortId(portId); },
314+
},
315+
},
316+
317+
};
318+
return files;
319+
}
320+
321+
DpdkPortTelemetry::DpdkPortTelemetry(
322+
uint16_t portId,
323+
const std::shared_ptr<telemetry::Directory>& dir)
324+
: M_PORT_ID(portId)
325+
{
326+
for (auto [name, ops] : getAppFsFiles(M_PORT_ID)) {
327+
auto file = dir->addFile(name, ops);
328+
m_holder.add(file);
329+
}
330+
}
331+
332+
} // namespace ct

0 commit comments

Comments
 (0)