Skip to content

Commit 6c5b48f

Browse files
committed
WIP - output telemetry
1 parent 70e7cc2 commit 6c5b48f

File tree

9 files changed

+219
-16
lines changed

9 files changed

+219
-16
lines changed

include/ipfixprobe/output.hpp

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,91 @@
3434
#include "process.hpp"
3535
#include "flowifc.hpp"
3636

37+
#include <telemetry.hpp>
38+
#include <atomic>
39+
#include <cstdint>
40+
3741
namespace ipxp {
3842

3943
#define DEFAULT_EXPORTER_ID 1
4044

45+
struct OutputPluginStats {
46+
uint64_t flows_seen = 0;
47+
uint64_t flows_dropped = 0;
48+
uint64_t flows_exported = 0;
49+
uint64_t bytes_exported = 0;
50+
51+
void clear()
52+
{
53+
flows_seen = 0;
54+
flows_dropped = 0;
55+
flows_exported = 0;
56+
bytes_exported = 0;
57+
}
58+
};
59+
60+
struct OutputPluginAtomicStats {
61+
std::atomic<uint64_t> flows_seen = 0;
62+
std::atomic<uint64_t> flows_dropped = 0;
63+
std::atomic<uint64_t> flows_exported = 0;
64+
std::atomic<uint64_t> bytes_exported = 0;
65+
std::atomic<uint64_t> fps_exported = 0;
66+
std::atomic<uint64_t> mbps_exported = 0;
67+
68+
69+
void updatePktBurstStats(const OutputPluginStats& basicStats, uint64_t timestamp)
70+
{
71+
flows_seen.fetch_add(basicStats.flows_seen, std::memory_order_relaxed);
72+
flows_dropped.fetch_add(basicStats.flows_dropped, std::memory_order_relaxed);
73+
flows_exported.fetch_add(basicStats.flows_exported, std::memory_order_relaxed);
74+
bytes_exported.fetch_add(basicStats.bytes_exported, std::memory_order_relaxed);
75+
76+
if (timestamp == 0) {
77+
fps_exported.store(0);
78+
mbps_exported.store(0);
79+
return;
80+
}
81+
82+
fps_exported.store(basicStats.flows_exported * 1000000000 / timestamp);
83+
mbps_exported.store(basicStats.bytes_exported * 8 * 1000000000 / timestamp / 1024 / 1024);
84+
}
85+
};
86+
87+
static telemetry::Dict createOutputPluginStatsDict(const OutputPluginAtomicStats& burstStats)
88+
{
89+
telemetry::Dict dict;
90+
dict["flows_seen"] = burstStats.flows_seen;
91+
dict["flows_dropped"] = burstStats.flows_dropped;
92+
dict["flows_exported"] = burstStats.flows_exported;
93+
dict["bytes_exported"] = burstStats.bytes_exported;
94+
dict["fps_exported"] = burstStats.fps_exported;
95+
dict["mbps_exported"] = burstStats.mbps_exported;
96+
return dict;
97+
}
98+
4199
/**
42100
* \brief Base class for flow exporters.
43101
*/
44102
class OutputPlugin : public Plugin
45103
{
46104
public:
47105
typedef std::vector<std::pair<std::string, ProcessPlugin *>> Plugins;
48-
uint64_t m_flows_seen; /**< Number of flows received to export. */
49-
uint64_t m_flows_dropped; /**< Number of flows that could not be exported. */
50106

51-
OutputPlugin() : m_flows_seen(0), m_flows_dropped(0) {}
107+
OutputPlugin() = default;
108+
52109
virtual ~OutputPlugin() {}
53110

54-
virtual void init(const char *params, Plugins &plugins) = 0;
111+
void init(const char *params, Plugins &plugins, const std::shared_ptr<telemetry::Directory>& dir)
112+
{
113+
const auto statsFile = dir->addFile(
114+
"basic_stats",
115+
{.read = [this]() { return createOutputPluginStatsDict(m_atomicStats); },
116+
.clear = nullptr});
117+
118+
m_holder.add(statsFile);
119+
120+
init_plugin(params, plugins, dir);
121+
}
55122

56123
enum class Result {
57124
EXPORTED = 0,
@@ -70,6 +137,34 @@ class OutputPlugin : public Plugin
70137
virtual void flush()
71138
{
72139
}
140+
141+
void update_stats(uint64_t timestamp) override final
142+
{
143+
m_atomicStats.updatePktBurstStats(m_stats, timestamp);
144+
m_stats.clear();
145+
146+
update_plugin_stats(timestamp);
147+
}
148+
149+
const OutputPluginAtomicStats& get_stats() const
150+
{
151+
return m_atomicStats;
152+
}
153+
154+
155+
protected:
156+
virtual void init_plugin(const char *params, Plugins &plugins, const std::shared_ptr<telemetry::Directory>& dir) = 0;
157+
158+
159+
virtual void update_plugin_stats(uint64_t timestamp)
160+
{
161+
(void) timestamp;
162+
}
163+
164+
OutputPluginStats m_stats = {};
165+
telemetry::Holder m_holder;
166+
private:
167+
OutputPluginAtomicStats m_atomicStats = {};
73168
};
74169

75170
}

include/ipfixprobe/plugin.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ class Plugin
6060
virtual void init(const char *params) {}
6161
virtual void close() {}
6262

63+
virtual void update_stats(uint64_t timestamp)
64+
{
65+
(void) timestamp;
66+
}
67+
6368
virtual OptionsParser *get_parser() const = 0;
6469
virtual std::string get_name() const = 0;
6570
};

ipfixprobe.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,9 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
287287
throw IPXPError("invalid output plugin " + output_name);
288288
}
289289

290-
output_plugin->init(output_params.c_str(), *process_plugins);
290+
auto output_plugin_dir = output_dir->addDir(output_name);
291+
292+
output_plugin->init(output_params.c_str(), *process_plugins, output_plugin_dir);
291293
conf.active.output.push_back(output_plugin);
292294
conf.active.all.push_back(output_plugin);
293295
} catch (PluginError &e) {

output/ipfix.cpp

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ void IPFIXExporter::init(const char *params)
232232
signal(SIGPIPE, SIG_IGN);
233233
}
234234

235-
void IPFIXExporter::init(const char *params, Plugins &plugins)
235+
void IPFIXExporter::init_plugin(const char *params, Plugins &plugins, const std::shared_ptr<telemetry::Directory>& dir)
236236
{
237237
init(params);
238238

@@ -258,6 +258,14 @@ void IPFIXExporter::init(const char *params, Plugins &plugins)
258258
}
259259
delete ext;
260260
}
261+
262+
auto plugin_config_file = dir->addFile(
263+
"config",
264+
{.read = [this]() { return get_ipfix_config(); },
265+
.clear = nullptr});
266+
267+
m_holder.add(plugin_config_file);
268+
261269
}
262270

263271
void IPFIXExporter::close()
@@ -398,13 +406,13 @@ bool IPFIXExporter::fill_template(const Flow &flow, template_t *tmplt)
398406

399407
int IPFIXExporter::export_flow(const Flow &flow)
400408
{
401-
m_flows_seen++;
409+
m_stats.flows_seen++;
402410
template_t *tmplt = get_template(flow);
403411
if (!fill_template(flow, tmplt)) {
404412
flush();
405413

406414
if (!fill_template(flow, tmplt)) {
407-
m_flows_dropped++;
415+
m_stats.flows_dropped++;
408416
return 1;
409417
}
410418
}
@@ -833,7 +841,7 @@ void IPFIXExporter::send_data()
833841
ret = send_packet(&pkt);
834842
}
835843
if (ret != 0) {
836-
m_flows_dropped += pkt.flows;
844+
m_stats.flows_dropped += pkt.flows;
837845
}
838846
}
839847
}
@@ -930,10 +938,12 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
930938

931939
/* No error from sendto(), add sent data count to total */
932940
sent += ret;
941+
m_stats.bytes_exported += ret;
933942
}
934943

935944
/* Update sequence number for next packet */
936945
sequenceNum += packet->flows;
946+
m_stats.flows_exported += packet->flows;
937947

938948
/* Increase packet counter */
939949
exportedPackets++;
@@ -1154,6 +1164,26 @@ int IPFIXExporter::reconnect()
11541164
return 0;
11551165
}
11561166

1167+
telemetry::Content IPFIXExporter::get_ipfix_config()
1168+
{
1169+
telemetry::Dict dict;
1170+
1171+
dict["protocol"] = (protocol == IPPROTO_UDP ? "UDP" : "TCP");
1172+
dict["host"] = host;
1173+
dict["port"] = static_cast<uint64_t>(port);
1174+
dict["odid"] = static_cast<uint64_t>(odid);
1175+
dict["dir_bit_field"] = static_cast<uint64_t>(dir_bit_field);
1176+
dict["mtu"] = static_cast<uint64_t>(mtu);
1177+
dict["LZ4_compression"] = (packetDataBuffer.isCompressing() ? "enabled" : "disabled");
1178+
1179+
return dict;
1180+
}
1181+
1182+
void IPFIXExporter::update_plugin_stats(uint64_t timestamp)
1183+
{
1184+
1185+
}
1186+
11571187
// compress buffer implementation
11581188

11591189
CompressBuffer::CompressBuffer() :

output/ipfix.hpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@
4040

4141
#include <vector>
4242
#include <map>
43+
#include <atomic>
44+
#include <cstdint>
4345
#include <cstdint>
4446

4547
#include <lz4.h>
48+
#include <telemetry.hpp>
4649

4750
#include <ipfixprobe/output.hpp>
4851
#include <ipfixprobe/process.hpp>
@@ -414,6 +417,8 @@ class CompressBuffer {
414417
*/
415418
void close();
416419

420+
bool isCompressing() const noexcept { return shouldCompress; }
421+
417422
// the maximum aditional size required to send metadata that are needed to
418423
// to decompress the data, the +4 is there for four 0 bytes that identify
419424
// ipfix_start_compress_header_t
@@ -452,13 +457,23 @@ class CompressBuffer {
452457
LZ4_stream_t *lz4Stream;
453458
};
454459

460+
struct IpfixStats {
461+
std::atomic<uint64_t> sent_bytes;
462+
std::atomic<uint64_t> sent_flows;
463+
std::atomic<uint64_t> dropped_flows;
464+
std::atomic<uint64_t> sent_mbps;
465+
std::atomic<uint64_t> sent_fps;
466+
std::atomic<uint64_t> dropped_fps;
467+
std::atomic<uint64_t> timestamp;
468+
};
469+
455470
class IPFIXExporter : public OutputPlugin
456471
{
457472
public:
458473
IPFIXExporter();
459474
~IPFIXExporter();
460475
void init(const char *params);
461-
void init(const char *params, Plugins &plugins);
476+
void init_plugin(const char *params, Plugins &plugins, const std::shared_ptr<telemetry::Directory>& dir) override;
462477
void close();
463478
OptionsParser *get_parser() const { return new IpfixOptParser(); }
464479
std::string get_name() const { return "ipfix"; }
@@ -526,6 +541,9 @@ class IPFIXExporter : public OutputPlugin
526541
bool fill_template(const Flow &flow, template_t *tmplt);
527542
void flush();
528543
void shutdown();
544+
545+
telemetry::Content get_ipfix_config();
546+
void update_plugin_stats(uint64_t timestamp) override;
529547
};
530548

531549
}

output/text.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void TextExporter::init(const char *params)
7777
*m_out << "conversation packets bytes tcp-flags time extensions" << std::endl;
7878
}
7979

80-
void TextExporter::init(const char *params, Plugins &plugins)
80+
void TextExporter::init_plugin(const char *params, Plugins &plugins, const std::shared_ptr<telemetry::Directory>& dir)
8181
{
8282
init(params);
8383
}
@@ -94,7 +94,7 @@ int TextExporter::export_flow(const Flow &flow)
9494
{
9595
RecordExt *ext = flow.m_exts;
9696

97-
m_flows_seen++;
97+
//m_flows_seen++;
9898
print_basic_flow(flow);
9999
while (ext != nullptr) {
100100
*m_out << " " << ext->get_text();

output/text.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class TextExporter : public OutputPlugin
6464
TextExporter();
6565
~TextExporter();
6666
void init(const char *params);
67-
void init(const char *params, Plugins &plugins);
67+
void init_plugin(const char *params, Plugins &plugins, const std::shared_ptr<telemetry::Directory>& dir) override;
6868
void close();
6969
OptionsParser *get_parser() const { return new TextOptParser(); }
7070
std::string get_name() const { return "text"; }

output/unirec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ int UnirecExporter::export_flow(const Flow &flow)
346346
trap_send(m_basic_idx, record_ptr, ur_rec_fixlen_size(tmplt_ptr) + ur_rec_varlen_size(tmplt_ptr, record_ptr));
347347
}
348348

349-
m_flows_seen++;
349+
//m_flows_seen++;
350350
uint64_t tmplt_dbits = 0; // templates dirty bits
351351
memset(m_ext_id_flgs, 0, sizeof(int) * m_ext_cnt); // in case one flow has multiple extension of same type
352352
int ext_processed_cnd = 0;

0 commit comments

Comments
 (0)