Skip to content

Commit c6bec67

Browse files
committed
WIP
1 parent 0798224 commit c6bec67

File tree

6 files changed

+295
-11
lines changed

6 files changed

+295
-11
lines changed

include/ipfixprobe/inputPlugin.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,14 @@ class IPXP_API InputPlugin
6161
* @brief Sets the telemetry directories for this plugin.
6262
* @param plugin_dir Shared pointer to the plugin-specific telemetry directory.
6363
* @param queues_dir Shared pointer to the telemetry directory for queues.
64+
* @param summary_dir Shared pointer to the telemetry directory for summary statistics.
65+
* @param pipeline_dir Shared pointer to the telemetry directory for the pipeline.
6466
*/
6567
void set_telemetry_dirs(
6668
std::shared_ptr<telemetry::Directory> plugin_dir,
67-
std::shared_ptr<telemetry::Directory> queues_dir);
69+
std::shared_ptr<telemetry::Directory> queues_dir,
70+
std::shared_ptr<telemetry::Directory> summary_dir,
71+
std::shared_ptr<telemetry::Directory> pipeline_dir);
6872

6973
/// Number of packets seen by the plugin.
7074
uint64_t m_seen = 0;
@@ -95,7 +99,10 @@ class IPXP_API InputPlugin
9599
ParserStats m_parser_stats = {};
96100

97101
private:
98-
void create_parser_stats_telemetry(std::shared_ptr<telemetry::Directory> queues_dir);
102+
void create_parser_stats_telemetry(
103+
std::shared_ptr<telemetry::Directory> queues_dir,
104+
std::shared_ptr<telemetry::Directory> summaryDirectory,
105+
std::shared_ptr<telemetry::Directory> pipelineDirectory);
99106
};
100107

101108
/**

include/ipfixprobe/parser-stats.hpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,139 @@
2525

2626
#pragma once
2727

28+
#include <array>
2829
#include <cstdint>
30+
#include <string>
31+
32+
#include <ipfixprobe/packet.hpp>
33+
#include <telemetry.hpp>
2934

3035
namespace ipxp {
3136

37+
static constexpr std::size_t MAX_VLAN_ID = 4096;
38+
39+
class PacketSizeHistogram {
40+
public:
41+
static constexpr std::size_t HISTOGRAM_SIZE = 10;
42+
43+
struct Value {
44+
uint64_t packets = 0;
45+
uint64_t bytes = 0;
46+
};
47+
48+
PacketSizeHistogram()
49+
{
50+
for (uint16_t bucketID = 0; bucketID < 8192; ++bucketID) {
51+
if (bucketID <= 64) {
52+
m_size_to_bucket[bucketID] = 0;
53+
} else if (bucketID < 128) {
54+
m_size_to_bucket[bucketID] = 1;
55+
} else if (bucketID < 256) {
56+
m_size_to_bucket[bucketID] = 2;
57+
} else if (bucketID < 512) {
58+
m_size_to_bucket[bucketID] = 3;
59+
} else if (bucketID < 1024) {
60+
m_size_to_bucket[bucketID] = 4;
61+
} else if (bucketID < 1518) {
62+
m_size_to_bucket[bucketID] = 5;
63+
} else if (bucketID < 2048) {
64+
m_size_to_bucket[bucketID] = 6;
65+
} else if (bucketID < 4096) {
66+
m_size_to_bucket[bucketID] = 7;
67+
} else if (bucketID < 8192) {
68+
m_size_to_bucket[bucketID] = 8;
69+
} else {
70+
m_size_to_bucket[bucketID] = 9;
71+
}
72+
}
73+
}
74+
75+
void update(uint16_t size)
76+
{
77+
if (size < 8192) {
78+
const std::size_t bucket = m_size_to_bucket[size];
79+
m_histogram[bucket].packets++;
80+
m_histogram[bucket].bytes += size;
81+
} else {
82+
m_histogram[HISTOGRAM_SIZE - 1].packets++;
83+
m_histogram[HISTOGRAM_SIZE - 1].bytes += size;
84+
}
85+
}
86+
87+
Value get_bucket_value(std::size_t bucket) const
88+
{
89+
if (bucket < HISTOGRAM_SIZE) {
90+
return m_histogram[bucket];
91+
}
92+
return {};
93+
}
94+
95+
std::string get_bucket_name(std::size_t bucket) const
96+
{
97+
if (bucket == 0) {
98+
return "0-64";
99+
} else if (bucket == 1) {
100+
return "65-127";
101+
} else if (bucket == 2) {
102+
return "128-255";
103+
} else if (bucket == 3) {
104+
return "256-511";
105+
} else if (bucket == 4) {
106+
return "512-1023";
107+
} else if (bucket == 5) {
108+
return "1024-1518";
109+
} else if (bucket == 6) {
110+
return "1519-2047";
111+
} else if (bucket == 7) {
112+
return "2048-4095";
113+
} else if (bucket == 8) {
114+
return "4096-8191";
115+
}
116+
return "8192+";
117+
}
118+
119+
private:
120+
std::array<Value, HISTOGRAM_SIZE> m_histogram = {};
121+
std::array<uint8_t, 8192> m_size_to_bucket = {};
122+
};
123+
124+
struct VlanStats {
125+
void update(const Packet& pkt)
126+
{
127+
if (pkt.ip_version == IP::v4) {
128+
ipv4_packets++;
129+
ipv4_bytes += pkt.packet_len;
130+
} else if (pkt.ip_version == IP::v6) {
131+
ipv6_packets++;
132+
ipv6_bytes += pkt.packet_len;
133+
}
134+
135+
if (pkt.ip_proto == IPPROTO_TCP) {
136+
tcp_packets++;
137+
} else if (pkt.ip_proto == IPPROTO_UDP) {
138+
udp_packets++;
139+
}
140+
141+
total_packets++;
142+
total_bytes += pkt.packet_len;
143+
144+
size_histogram.update(pkt.packet_len);
145+
}
146+
147+
uint64_t ipv4_packets;
148+
uint64_t ipv6_packets;
149+
uint64_t ipv4_bytes;
150+
uint64_t ipv6_bytes;
151+
152+
uint64_t tcp_packets;
153+
uint64_t udp_packets;
154+
155+
uint64_t total_packets;
156+
uint64_t total_bytes;
157+
158+
PacketSizeHistogram size_histogram;
159+
};
160+
32161
/**
33162
* \brief Structure for storing parser statistics.
34163
*/
@@ -40,12 +169,16 @@ struct ParserStats {
40169

41170
uint64_t ipv4_packets;
42171
uint64_t ipv6_packets;
172+
uint64_t ipv4_bytes;
173+
uint64_t ipv6_bytes;
43174

44175
uint64_t tcp_packets;
45176
uint64_t udp_packets;
46177

47178
uint64_t seen_packets;
48179
uint64_t unknown_packets;
180+
181+
VlanStats vlan_stats[MAX_VLAN_ID];
49182
};
50183

51184
} // namespace ipxp

include/ipfixprobe/telemetry-utils.hpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,32 @@ class TelemetryUtils {
5656
m_holder.add(file);
5757
}
5858

59+
/**
60+
* @brief Register an aggregated file in the telemetry holder
61+
*
62+
* If the file is already registered, it will not be registered again.
63+
*
64+
* @param directory Directory to register the file in
65+
* @param name Name of the aggregated file
66+
* @param aggFilesPattern Pattern for aggregated files
67+
* @param aggOps Aggregation operations to perform on the files
68+
* @param patternRootDir Optional root directory for pattern matching
69+
*/
70+
void register_agg_file(
71+
std::shared_ptr<telemetry::Directory> directory,
72+
const std::string_view& name,
73+
const std::string& aggFilesPattern,
74+
const std::vector<telemetry::AggOperation>& aggOps,
75+
std::shared_ptr<telemetry::Directory> patternRootDir = nullptr)
76+
{
77+
if (directory->getEntry(name)) {
78+
return;
79+
}
80+
81+
auto file = directory->addAggFile(name, aggFilesPattern, aggOps, patternRootDir);
82+
m_holder.add(file);
83+
}
84+
5985
protected:
6086
telemetry::Holder m_holder;
6187
};

src/core/inputPlugin.cpp

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,139 @@ static telemetry::Content get_parser_stats_content(const ParserStats& parserStat
2727

2828
dict["ipv4_packets"] = parserStats.ipv4_packets;
2929
dict["ipv6_packets"] = parserStats.ipv6_packets;
30+
dict["ipv4_bytes"] = parserStats.ipv4_bytes;
31+
dict["ipv6_bytes"] = parserStats.ipv6_bytes;
3032

3133
dict["tcp_packets"] = parserStats.tcp_packets;
3234
dict["udp_packets"] = parserStats.udp_packets;
3335

3436
dict["seen_packets"] = parserStats.seen_packets;
3537
dict["unknown_packets"] = parserStats.unknown_packets;
38+
return dict;
39+
}
3640

41+
static telemetry::Content get_vlan_stats(const VlanStats& vlanStats)
42+
{
43+
telemetry::Dict dict;
44+
dict["ipv4_packets"] = vlanStats.ipv4_packets;
45+
dict["ipv4_bytes"] = vlanStats.ipv4_bytes;
46+
dict["ipv6_packets"] = vlanStats.ipv6_packets;
47+
dict["ipv6_bytes"] = vlanStats.ipv6_bytes;
48+
dict["tcp_packets"] = vlanStats.tcp_packets;
49+
dict["udp_packets"] = vlanStats.udp_packets;
50+
dict["total_packets"] = vlanStats.total_packets;
51+
dict["total_bytes"] = vlanStats.total_bytes;
52+
return dict;
53+
}
54+
55+
static telemetry::Content get_vlan_size_histogram_content(const PacketSizeHistogram& sizeHistogram)
56+
{
57+
telemetry::Dict dict;
58+
for (std::size_t bucket = 0; bucket < PacketSizeHistogram::HISTOGRAM_SIZE; ++bucket) {
59+
const PacketSizeHistogram::Value value = sizeHistogram.get_bucket_value(bucket);
60+
dict["etherPacketCount[" + sizeHistogram.get_bucket_name(bucket) + "]"]
61+
= telemetry::ScalarWithUnit {value.packets, "packets"};
62+
dict["etherPacketSize[" + sizeHistogram.get_bucket_name(bucket) + "]"]
63+
= telemetry::ScalarWithUnit {value.bytes, "bytes"};
64+
}
3765
return dict;
3866
}
3967

4068
void InputPlugin::create_parser_stats_telemetry(
41-
std::shared_ptr<telemetry::Directory> queueDirectory)
69+
std::shared_ptr<telemetry::Directory> queueDirectory,
70+
std::shared_ptr<telemetry::Directory> summaryDirectory,
71+
std::shared_ptr<telemetry::Directory> pipelineDirectory)
4272
{
73+
auto parserDir = queueDirectory->addDir("parser");
74+
auto summaryParserDir = summaryDirectory->addDir("parser");
75+
4376
telemetry::FileOps statsOps
4477
= {[this]() { return get_parser_stats_content(m_parser_stats); }, nullptr};
45-
register_file(queueDirectory, "parser-stats", statsOps);
78+
79+
auto vlanStatsDir = parserDir->addDir("vlan-stats");
80+
for (std::size_t vlan_id = 0; vlan_id < MAX_VLAN_ID; ++vlan_id) {
81+
telemetry::FileOps vlanStatsOps
82+
= {[this, vlan_id]() { return get_vlan_stats(m_parser_stats.vlan_stats[vlan_id]); },
83+
nullptr};
84+
telemetry::FileOps vlanHistogramOps
85+
= {[this, vlan_id]() {
86+
return get_vlan_size_histogram_content(
87+
m_parser_stats.vlan_stats[vlan_id].size_histogram);
88+
},
89+
nullptr};
90+
auto vlanIDDir = vlanStatsDir->addDir(std::to_string(vlan_id));
91+
auto vlanSummaryDir = summaryParserDir->addDirs("vlan-stats/" + std::to_string(vlan_id));
92+
register_file(vlanIDDir, "stats", vlanStatsOps);
93+
register_file(vlanIDDir, "histogram", vlanHistogramOps);
94+
95+
const std::vector<telemetry::AggOperation> aggOps {
96+
{telemetry::AggMethodType::SUM, "ipv4_packets"},
97+
{telemetry::AggMethodType::SUM, "ipv4_bytes"},
98+
{telemetry::AggMethodType::SUM, "ipv6_packets"},
99+
{telemetry::AggMethodType::SUM, "ipv6_bytes"},
100+
{telemetry::AggMethodType::SUM, "tcp_packets"},
101+
{telemetry::AggMethodType::SUM, "udp_packets"},
102+
{telemetry::AggMethodType::SUM, "total_packets"},
103+
{telemetry::AggMethodType::SUM, "total_bytes"},
104+
};
105+
106+
register_agg_file(
107+
vlanSummaryDir,
108+
"stats",
109+
R"(queues/\d+/parser/vlan-stats/)" + std::to_string(vlan_id) + R"(/stats)",
110+
aggOps,
111+
pipelineDirectory);
112+
113+
std::vector<telemetry::AggOperation> aggHistogramSummaryOps;
114+
for (std::size_t bucket = 0; bucket < PacketSizeHistogram::HISTOGRAM_SIZE; ++bucket) {
115+
auto histogram = m_parser_stats.vlan_stats[vlan_id].size_histogram;
116+
aggHistogramSummaryOps.push_back(
117+
{telemetry::AggMethodType::SUM,
118+
"etherPacketCount[" + histogram.get_bucket_name(bucket) + "]"});
119+
aggHistogramSummaryOps.push_back(
120+
{telemetry::AggMethodType::SUM,
121+
"etherPacketSize[" + histogram.get_bucket_name(bucket) + "]"});
122+
}
123+
register_agg_file(
124+
vlanSummaryDir,
125+
"histogram",
126+
R"(queues/\d+/parser/vlan-stats/)" + std::to_string(vlan_id) + R"(/histogram)",
127+
aggHistogramSummaryOps,
128+
pipelineDirectory);
129+
}
130+
131+
const std::vector<telemetry::AggOperation> aggOps {
132+
{telemetry::AggMethodType::SUM, "ipv4_bytes"},
133+
{telemetry::AggMethodType::SUM, "ipv4_packets"},
134+
{telemetry::AggMethodType::SUM, "ipv6_bytes"},
135+
{telemetry::AggMethodType::SUM, "ipv6_packets"},
136+
{telemetry::AggMethodType::SUM, "mpls_packets"},
137+
{telemetry::AggMethodType::SUM, "pppoe_packets"},
138+
{telemetry::AggMethodType::SUM, "seen_packets"},
139+
{telemetry::AggMethodType::SUM, "tcp_packets"},
140+
{telemetry::AggMethodType::SUM, "trill_packets"},
141+
{telemetry::AggMethodType::SUM, "udp_packets"},
142+
{telemetry::AggMethodType::SUM, "unknown_packets"},
143+
{telemetry::AggMethodType::SUM, "vlan_packets"},
144+
};
145+
146+
register_agg_file(
147+
summaryParserDir,
148+
"parser-stats",
149+
R"(queues/\d+/parser/parser-stats)",
150+
aggOps,
151+
pipelineDirectory);
152+
153+
register_file(parserDir, "parser-stats", statsOps);
46154
}
47155

48156
void InputPlugin::set_telemetry_dirs(
49157
std::shared_ptr<telemetry::Directory> plugin_dir,
50-
std::shared_ptr<telemetry::Directory> queues_dir)
158+
std::shared_ptr<telemetry::Directory> queues_dir,
159+
std::shared_ptr<telemetry::Directory> summary_dir,
160+
std::shared_ptr<telemetry::Directory> pipeline_dir)
51161
{
52-
create_parser_stats_telemetry(queues_dir);
162+
create_parser_stats_telemetry(queues_dir, summary_dir, pipeline_dir);
53163
configure_telemetry_dirs(plugin_dir, queues_dir);
54164
}
55165

src/core/ipfixprobe.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@
2626
*
2727
*/
2828

29-
#include "ipfixprobe.hpp"
30-
3129
#include "buildConfig.hpp"
30+
#include "ipfixprobe.hpp"
3231
#include "stacktrace.hpp"
3332
#include "stats.hpp"
3433

@@ -373,6 +372,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
373372
// Input
374373
auto input_dir = conf.telemetry_root_node->addDir("input");
375374
auto pipeline_dir = conf.telemetry_root_node->addDir("pipeline");
375+
auto summary_dir = pipeline_dir->addDir("summary");
376376
auto flowcache_dir = conf.telemetry_root_node->addDir("flowcache");
377377
size_t pipeline_idx = 0;
378378
for (auto& it : parser.m_input) {
@@ -393,7 +393,11 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
393393
if (inputPlugin == nullptr) {
394394
throw IPXPError("invalid input plugin " + input_name);
395395
}
396-
inputPlugin->set_telemetry_dirs(input_plugin_dir, pipeline_queue_dir);
396+
inputPlugin->set_telemetry_dirs(
397+
input_plugin_dir,
398+
pipeline_queue_dir,
399+
summary_dir,
400+
pipeline_dir);
397401
conf.inputPlugins.emplace_back(inputPlugin);
398402
} catch (PluginError& e) {
399403
throw IPXPError(input_name + std::string(": ") + e.what());

0 commit comments

Comments
 (0)