Skip to content

Commit fa5340f

Browse files
committed
WIP - extends stats
1 parent 6c5b48f commit fa5340f

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

output/ipfix.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ const char *basic_tmplt_v6[] = {
134134
nullptr
135135
};
136136

137+
static telemetry::Content getExtendedAtomicStats(const IpfixExtendedAtomicStats& stats)
138+
{
139+
telemetry::Dict dict;
140+
141+
dict["eagain_problem"] = stats.eagain_problem;
142+
dict["lz4_compression_error"] = stats.lz4_compression_error;
143+
144+
return dict;
145+
}
146+
137147
IPFIXExporter::IPFIXExporter() :
138148
extensions(nullptr), extension_cnt(0),
139149
templates(nullptr), templatesDataSize(0),
@@ -264,7 +274,13 @@ void IPFIXExporter::init_plugin(const char *params, Plugins &plugins, const std:
264274
{.read = [this]() { return get_ipfix_config(); },
265275
.clear = nullptr});
266276

277+
auto plugin_stats_file = dir->addFile(
278+
"plugin_stats",
279+
{.read = [this]() { return getExtendedAtomicStats(ext_atomic_stats); },
280+
.clear = nullptr});
281+
267282
m_holder.add(plugin_config_file);
283+
m_holder.add(plugin_stats_file);
268284

269285
}
270286

@@ -880,8 +896,16 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
880896
}
881897

882898
auto dataLen = packetDataBuffer.compress();
899+
if (dataLen < 0) {
900+
ext_stats.lz4_compression_error++;
901+
return -2;
902+
}
883903
auto data = packetDataBuffer.getCompressed();
884904

905+
static constexpr size_t eagain_limit = 3;
906+
907+
size_t eagain_count = 0;
908+
885909
/* sendto() does not guarantee that everything will be send in one piece */
886910
while (sent < dataLen) {
887911
/* Send data to collector (TCP and SCTP ignores last two arguments) */
@@ -924,6 +948,11 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
924948
/* Say that we should try to connect and send data again */
925949
return 1;
926950
case EAGAIN:
951+
eagain_count++;
952+
if (eagain_count == eagain_limit) {
953+
ext_stats.eagain_problem++;
954+
}
955+
usleep(1000);
927956
// EAGAIN is returned when the socket is non-blocking and the send buffer is full
928957
// possible wait and stop flag check
929958
continue;
@@ -1179,9 +1208,14 @@ telemetry::Content IPFIXExporter::get_ipfix_config()
11791208
return dict;
11801209
}
11811210

1211+
11821212
void IPFIXExporter::update_plugin_stats(uint64_t timestamp)
11831213
{
1214+
ext_atomic_stats.eagain_problem.fetch_add(ext_stats.eagain_problem, std::memory_order_relaxed);
1215+
ext_atomic_stats.lz4_compression_error.fetch_add(ext_stats.lz4_compression_error, std::memory_order_relaxed);
11841216

1217+
ext_stats.eagain_problem = 0;
1218+
ext_stats.lz4_compression_error = 0;
11851219
}
11861220

11871221
// compress buffer implementation

output/ipfix.hpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -457,14 +457,14 @@ class CompressBuffer {
457457
LZ4_stream_t *lz4Stream;
458458
};
459459

460-
struct IpfixStats {
461-
std::atomic<uint64_t> sent_bytes;
462-
std::atomic<uint64_t> sent_flows;
463-
std::atomic<uint64_t> dropped_flows;
464-
std::atomic<uint64_t> sent_mbps;
465-
std::atomic<uint64_t> sent_fps;
466-
std::atomic<uint64_t> dropped_fps;
467-
std::atomic<uint64_t> timestamp;
460+
struct IpfixExtendedStats {
461+
uint64_t eagain_problem = 0;
462+
uint64_t lz4_compression_error = 0;
463+
};
464+
465+
struct IpfixExtendedAtomicStats {
466+
std::atomic<uint64_t> eagain_problem = 0;
467+
std::atomic<uint64_t> lz4_compression_error = 0;
468468
};
469469

470470
class IPFIXExporter : public OutputPlugin
@@ -519,6 +519,9 @@ class IPFIXExporter : public OutputPlugin
519519
uint16_t mtu; /**< Max size of packet payload sent */
520520
uint16_t tmpltMaxBufferSize; /**< Size of template buffer, tmpltBufferSize < packetDataBuffer */
521521

522+
IpfixExtendedStats ext_stats;
523+
IpfixExtendedAtomicStats ext_atomic_stats;
524+
522525
void init_template_buffer(template_t *tmpl);
523526
int fill_template_set_header(uint8_t *ptr, uint16_t size);
524527
void check_template_lifetime(template_t *tmpl);

0 commit comments

Comments
 (0)