@@ -134,6 +134,17 @@ const char *basic_tmplt_v6[] = {
134134 nullptr
135135};
136136
137+ static telemetry::Content getExtendedAtomicStats (const IpfixExtendedAtomicStats& stats)
138+ {
139+ telemetry::Dict dict;
140+
141+ dict[" flows_dropped_eagain" ] = stats.flows_dropped_eagain ;
142+ dict[" fps_dropped_eagain" ] = stats.fps_dropped_eagain ;
143+ dict[" lz4_compression_error" ] = stats.lz4_compression_error ;
144+
145+ return dict;
146+ }
147+
137148IPFIXExporter::IPFIXExporter () :
138149 extensions (nullptr ), extension_cnt(0 ),
139150 templates (nullptr ), templatesDataSize(0 ),
@@ -264,7 +275,13 @@ void IPFIXExporter::init_plugin(const char *params, Plugins &plugins, const std:
264275 {.read = [this ]() { return get_ipfix_config (); },
265276 .clear = nullptr });
266277
278+ auto plugin_stats_file = dir->addFile (
279+ " plugin_stats" ,
280+ {.read = [this ]() { return getExtendedAtomicStats (ext_atomic_stats); },
281+ .clear = nullptr });
282+
267283 m_holder.add (plugin_config_file);
284+ m_holder.add (plugin_stats_file);
268285
269286}
270287
@@ -866,7 +883,7 @@ void IPFIXExporter::flush()
866883 * When the collector disconnects, tries to reconnect and resend the data
867884 *
868885 * \param packet Packet to send
869- * \return 0 on success, -1 on socket error, -2 on compress error,
886+ * \return 0 on success, -1 on socket error, -2 on compress error, -3 eagain error
870887 * 1 when data needs to be resent (after reconnect)
871888 */
872889int IPFIXExporter::send_packet (ipfix_packet_t *packet)
@@ -880,8 +897,16 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
880897 }
881898
882899 auto dataLen = packetDataBuffer.compress ();
900+ if (dataLen < 0 ) {
901+ ext_stats.lz4_compression_error ++;
902+ return -2 ;
903+ }
883904 auto data = packetDataBuffer.getCompressed ();
884905
906+ static constexpr size_t eagain_limit = 3 ;
907+
908+ size_t eagain_count = 0 ;
909+
885910 /* sendto() does not guarantee that everything will be send in one piece */
886911 while (sent < dataLen) {
887912 /* Send data to collector (TCP and SCTP ignores last two arguments) */
@@ -924,6 +949,12 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
924949 /* Say that we should try to connect and send data again */
925950 return 1 ;
926951 case EAGAIN:
952+ eagain_count++;
953+ if (eagain_count == eagain_limit) {
954+ ext_stats.flows_dropped_eagain += packet->flows ;
955+ return -1 ;
956+ }
957+ usleep (1000 );
927958 // EAGAIN is returned when the socket is non-blocking and the send buffer is full
928959 // possible wait and stop flag check
929960 continue ;
@@ -1179,9 +1210,21 @@ telemetry::Content IPFIXExporter::get_ipfix_config()
11791210 return dict;
11801211}
11811212
1213+
11821214void IPFIXExporter::update_plugin_stats (uint64_t timestamp)
11831215{
1216+ ext_atomic_stats.flows_dropped_eagain .fetch_add (ext_stats.flows_dropped_eagain , std::memory_order_relaxed);
1217+ ext_atomic_stats.lz4_compression_error .fetch_add (ext_stats.lz4_compression_error , std::memory_order_relaxed);
1218+
1219+ if (timestamp == 0 ) {
1220+ ext_atomic_stats.fps_dropped_eagain .store (0 );
1221+ return ;
1222+ }
1223+
1224+ ext_atomic_stats.fps_dropped_eagain .store (ext_stats.flows_dropped_eagain * 1000000000 / timestamp);
11841225
1226+ ext_stats.flows_dropped_eagain = 0 ;
1227+ ext_stats.lz4_compression_error = 0 ;
11851228}
11861229
11871230// compress buffer implementation
0 commit comments