22 * @file
33 * @brief Plugin for parsing Nettisa flow.
44 * @author Josef Koumar [email protected] 5+ * @author Damir Zainullin <[email protected] > 56 * @date 2025
67 *
7- * Copyright (c) 2025 CESNET
8+ * Provides a plugin that extracts advanced statistics based on packet lengths,
9+ * stores them in per-flow plugin data, and exposes fields via FieldManager.
810 *
9- * SPDX-License-Identifier: BSD-3-Clause
11+ * @copyright Copyright (c) 2025 CESNET, z.s.p.o.
1012 */
1113
1214#include " nettisa.hpp"
1315
16+ #include " nettisaGetters.hpp"
17+
1418#include < cmath>
1519#include < iostream>
1620
17- #include < ipfixprobe/pluginFactory/pluginManifest.hpp>
18- #include < ipfixprobe/pluginFactory/pluginRegistrar.hpp>
19- #include < ipfixprobe/utils.hpp>
21+ #include < fieldGroup.hpp>
22+ #include < fieldManager.hpp>
23+ #include < flowRecord.hpp>
24+ #include < ipfixprobe/options.hpp>
25+ #include < pluginFactory.hpp>
26+ #include < pluginManifest.hpp>
27+ #include < pluginRegistrar.hpp>
2028
21- namespace ipxp {
29+ namespace ipxp ::process::nettisa {
2230
2331static const PluginManifest nettisaPluginManifest = {
2432 .name = " nettisa" ,
@@ -32,103 +40,213 @@ static const PluginManifest nettisaPluginManifest = {
3240 },
3341};
3442
35- NETTISAPlugin::NETTISAPlugin (const std::string& params, int pluginID)
36- : ProcessPlugin(pluginID)
43+ static FieldGroup createNetTimeSeriesSchema (
44+ FieldManager& fieldManager,
45+ FieldHandlers<NetTimeSeriesFields>& handlers) noexcept
3746{
38- init (params.c_str ());
47+ FieldGroup schema = fieldManager.createFieldGroup (" nettisa" );
48+
49+ handlers.insert (
50+ NetTimeSeriesFields::NTS_MEAN,
51+ schema.addScalarField (" NTS_MEAN" , getNTSMeanField));
52+
53+ handlers.insert (NetTimeSeriesFields::NTS_MIN, schema.addScalarField (" NTS_MIN" , getNTSMinField));
54+
55+ handlers.insert (NetTimeSeriesFields::NTS_MAX, schema.addScalarField (" NTS_MAX" , getNTSMaxField));
56+
57+ handlers.insert (
58+ NetTimeSeriesFields::NTS_STDEV,
59+ schema.addScalarField (" NTS_STDEV" , getNTSStdevField));
60+
61+ handlers.insert (
62+ NetTimeSeriesFields::NTS_KURTOSIS,
63+ schema.addScalarField (" NTS_KURTOSIS" , getNTSKurtosisField));
64+
65+ handlers.insert (
66+ NetTimeSeriesFields::NTS_ROOT_MEAN_SQUARE,
67+ schema.addScalarField (" NTS_ROOT_MEAN_SQUARE" , getNTSRootMeanSquareField));
68+
69+ handlers.insert (
70+ NetTimeSeriesFields::NTS_AVERAGE_DISPERSION,
71+ schema.addScalarField (" NTS_AVERAGE_DISPERSION" , getNTSAverageDispersionField));
72+
73+ handlers.insert (
74+ NetTimeSeriesFields::NTS_MEAN_SCALED_TIME,
75+ schema.addScalarField (" NTS_MEAN_SCALED_TIME" , getNTSMeanScaledTimeField));
76+
77+ handlers.insert (
78+ NetTimeSeriesFields::NTS_MEAN_DIFFTIMES,
79+ schema.addScalarField (" NTS_MEAN_DIFFTIMES" , getNTSMeanDifftimesField));
80+
81+ handlers.insert (
82+ NetTimeSeriesFields::NTS_MAX_DIFFTIMES,
83+ schema.addScalarField (" NTS_MAX_DIFFTIMES" , getNTSMaxDifftimesField));
84+
85+ handlers.insert (
86+ NetTimeSeriesFields::NTS_MIN_DIFFTIMES,
87+ schema.addScalarField (" NTS_MIN_DIFFTIMES" , getNTSMinDifftimesField));
88+
89+ handlers.insert (
90+ NetTimeSeriesFields::NTS_TIME_DISTRIBUTION,
91+ schema.addScalarField (" NTS_TIME_DISTRIBUTION" , getNTSTimeDistributionField));
92+
93+ handlers.insert (
94+ NetTimeSeriesFields::NTS_SWITCHING_RATIO,
95+ schema.addScalarField (" NTS_SWITCHING_RATIO" , getNTSSwitchingRatioField));
96+
97+ return schema;
3998}
4099
41- ProcessPlugin* NETTISAPlugin::copy ()
100+ NetTimeSeriesPlugin::NetTimeSeriesPlugin (
101+ [[maybe_unused]] const std::string& params,
102+ FieldManager& manager)
42103{
43- return new NETTISAPlugin (* this );
104+ createNetTimeSeriesSchema (manager, m_fieldHandlers );
44105}
45106
46- void NETTISAPlugin::update_record (
47- RecordExtNETTISA* nettisa_data,
48- const Packet& pkt,
49- const Flow& rec)
107+ OnInitResult NetTimeSeriesPlugin::onInit (const FlowContext& flowContext, void * pluginContext)
50108{
51- float variation_from_mean = pkt.payload_len_wire - nettisa_data->mean ;
52- uint32_t n = rec.dst_packets + rec.src_packets ;
53- uint64_t packet_time = timeval2usec (pkt.ts );
54- uint64_t record_time = timeval2usec (rec.time_first );
55- float diff_time = fmax (packet_time - nettisa_data->prev_time , 0 );
56- nettisa_data->sum_payload += pkt.payload_len_wire ;
57- nettisa_data->prev_time = packet_time;
58- // MEAN
59- nettisa_data->mean += (variation_from_mean) / n;
60- // MIN
61- nettisa_data->min = std::min (nettisa_data->min , pkt.payload_len_wire );
62- // MAX
63- nettisa_data->max = std::max (nettisa_data->max , pkt.payload_len_wire );
64- // ROOT MEAN SQUARE
65- nettisa_data->root_mean_square += pow (pkt.payload_len_wire , 2 );
66- // AVERAGE DISPERSION
67- nettisa_data->average_dispersion += abs (variation_from_mean);
68- // KURTOSIS
69- nettisa_data->kurtosis += pow (variation_from_mean, 4 );
70- // MEAN SCALED TIME
71- nettisa_data->mean_scaled_time
72- += (packet_time - record_time - nettisa_data->mean_scaled_time ) / n;
73- // MEAN TIME DIFFERENCES
74- nettisa_data->mean_difftimes += (diff_time - nettisa_data->mean_difftimes ) / n;
75- // MIN
76- nettisa_data->min_difftimes = fmin (nettisa_data->min_difftimes , diff_time);
77- // MAX
78- nettisa_data->max_difftimes = fmax (nettisa_data->max_difftimes , diff_time);
79- // TIME DISTRIBUTION
80- nettisa_data->time_distribution += abs (nettisa_data->mean_difftimes - diff_time);
81- // SWITCHING RATIO
82- if (nettisa_data->prev_payload != pkt.packet_len_wire ) {
83- nettisa_data->switching_ratio += 1 ;
84- nettisa_data->prev_payload = pkt.packet_len_wire ;
109+ auto & nettisaContext
110+ = *std::construct_at (reinterpret_cast <NetTimeSeriesContext*>(pluginContext));
111+
112+ const std::optional<std::size_t > ipPayloadLength
113+ = getIPPayloadLength (*flowContext.packetContext .packet );
114+ if (!ipPayloadLength.has_value ()) {
115+ return OnInitResult::Irrelevant;
85116 }
117+
118+ updateNetTimeSeries (
119+ flowContext.flowRecord ,
120+ flowContext.packetContext .packet ->timestamp ,
121+ *ipPayloadLength,
122+ nettisaContext);
123+ return OnInitResult::ConstructedNeedsUpdate;
86124}
87125
88- int NETTISAPlugin::post_create (Flow& rec, const Packet& pkt )
126+ OnUpdateResult NetTimeSeriesPlugin::onUpdate ( const FlowContext& flowContext, void * pluginContext )
89127{
90- RecordExtNETTISA* nettisa_data = new RecordExtNETTISA (m_pluginID);
91- rec.add_extension (nettisa_data);
128+ auto & nettisaContext = *reinterpret_cast <NetTimeSeriesContext*>(pluginContext);
92129
93- nettisa_data->prev_time = timeval2usec (pkt.ts );
130+ const std::optional<std::size_t > ipPayloadLength
131+ = getIPPayloadLength (*flowContext.packetContext .packet );
132+ if (!ipPayloadLength.has_value ()) {
133+ return OnUpdateResult::NeedsUpdate;
134+ }
94135
95- update_record (nettisa_data, pkt, rec);
96- return 0 ;
136+ updateNetTimeSeries (
137+ flowContext.flowRecord ,
138+ flowContext.packetContext .packet ->timestamp ,
139+ *ipPayloadLength,
140+ nettisaContext);
141+ return OnUpdateResult::NeedsUpdate;
97142}
98143
99- int NETTISAPlugin::post_update (Flow& rec, const Packet& pkt)
144+ void NetTimeSeriesPlugin::updateNetTimeSeries (
145+ FlowRecord& flowRecord,
146+ const amon::types::Timestamp packetTimestamp,
147+ const std::size_t ipPayloadLength,
148+ NetTimeSeriesContext& nettisaContext) noexcept
100149{
101- RecordExtNETTISA* nettisa_data = (RecordExtNETTISA*) rec.get_extension (m_pluginID);
102-
103- update_record (nettisa_data, pkt, rec);
104- return 0 ;
150+ const float variationFromMean = static_cast <float >(ipPayloadLength) - nettisaContext.mean ;
151+ const float packetsTotal = static_cast <float >(
152+ flowRecord.directionalData [Direction::Forward].packets
153+ + flowRecord.directionalData [Direction::Reverse].packets + 1 );
154+ const float diff = std::max<float >(
155+ static_cast <float >(packetTimestamp.nanoseconds () - flowRecord.timeLastUpdate .nanoseconds ()),
156+ 0 );
157+ nettisaContext.processingState .sumPayload += ipPayloadLength;
158+ nettisaContext.processingState .prevTime = packetTimestamp;
159+ nettisaContext.mean += variationFromMean / packetsTotal;
160+ nettisaContext.min
161+ = std::min<uint16_t >(nettisaContext.min , static_cast <uint16_t >(ipPayloadLength));
162+ nettisaContext.max
163+ = std::max<uint16_t >(nettisaContext.max , static_cast <uint16_t >(ipPayloadLength));
164+ nettisaContext.rootMeanSquare += static_cast <float >(std::pow (ipPayloadLength, 2 ));
165+ nettisaContext.averageDispersion += std::abs (variationFromMean);
166+ nettisaContext.kurtosis += static_cast <float >(std::pow (variationFromMean, 4 ));
167+ nettisaContext.meanScaledTime
168+ += static_cast <float >(
169+ packetTimestamp.nanoseconds () - flowRecord.timeCreation .nanoseconds ()
170+ - nettisaContext.meanScaledTime )
171+ / packetsTotal;
172+ nettisaContext.meanDifftimes += (diff - nettisaContext.meanDifftimes ) / packetsTotal;
173+ nettisaContext.minDifftimes = std::min (nettisaContext.minDifftimes , diff);
174+ nettisaContext.maxDifftimes = std::max (nettisaContext.maxDifftimes , diff);
175+ nettisaContext.timeDistribution += std::abs (nettisaContext.meanDifftimes - diff);
176+ if (nettisaContext.processingState .prevPayload != ipPayloadLength) {
177+ nettisaContext.switchingRatio += 1 ;
178+ nettisaContext.processingState .prevPayload = static_cast <uint16_t >(ipPayloadLength);
179+ }
105180}
106181
107- void NETTISAPlugin::pre_export (Flow& rec )
182+ OnExportResult NetTimeSeriesPlugin::onExport ( const FlowRecord& flowRecord, void * pluginContext )
108183{
109- RecordExtNETTISA* nettisa_data = (RecordExtNETTISA*) rec.get_extension (m_pluginID);
110- uint32_t n = rec.src_packets + rec.dst_packets ;
111- if (n == 1 ) {
112- rec.remove_extension (m_pluginID);
113- return ;
184+ auto & nettisaContext = *reinterpret_cast <NetTimeSeriesContext*>(pluginContext);
185+
186+ const float packetsTotal = static_cast <float >(
187+ flowRecord.directionalData [Direction::Forward].packets
188+ + flowRecord.directionalData [Direction::Reverse].packets );
189+ if (packetsTotal == 1 ) {
190+ return OnExportResult::Remove;
191+ }
192+ nettisaContext.switchingRatio = nettisaContext.switchingRatio / packetsTotal;
193+ nettisaContext.standardDeviation = static_cast <float >(std::pow (
194+ (nettisaContext.rootMeanSquare / packetsTotal)
195+ - std::pow (
196+ static_cast <float >(nettisaContext.processingState .sumPayload ) / packetsTotal,
197+ 2 ),
198+ 0.5 ));
199+ if (nettisaContext.standardDeviation == 0 ) {
200+ nettisaContext.kurtosis = 0 ;
114201 } else {
115- nettisa_data->switching_ratio = nettisa_data->switching_ratio / n;
116- nettisa_data->stdev = pow (
117- (nettisa_data->root_mean_square / n) - pow (nettisa_data->sum_payload / n, 2 ),
118- 0.5 );
119- if (nettisa_data->stdev == 0 ) {
120- nettisa_data->kurtosis = 0 ;
121- } else {
122- nettisa_data->kurtosis = nettisa_data->kurtosis / (n * pow (nettisa_data->stdev , 4 ));
123- }
124- nettisa_data->time_distribution = (nettisa_data->time_distribution / (n - 1 ))
125- / (nettisa_data->max_difftimes - nettisa_data->min );
202+ nettisaContext.kurtosis = static_cast <float >(
203+ nettisaContext.kurtosis
204+ / (packetsTotal * std::pow (nettisaContext.standardDeviation , 4 )));
126205 }
127- nettisa_data->root_mean_square = pow (nettisa_data->root_mean_square / n, 0.5 );
128- nettisa_data->average_dispersion = nettisa_data->average_dispersion / n;
206+ nettisaContext.timeDistribution = (nettisaContext.timeDistribution / (packetsTotal - 1 ))
207+ / (nettisaContext.maxDifftimes - nettisaContext.minDifftimes );
208+
209+ nettisaContext.rootMeanSquare
210+ = static_cast <float >(std::pow (nettisaContext.rootMeanSquare / packetsTotal, 0.5 ));
211+ nettisaContext.averageDispersion = nettisaContext.averageDispersion / packetsTotal;
212+
213+ makeAllFieldsAvailable (flowRecord);
214+ return OnExportResult::NoAction;
215+ }
216+
217+ void NetTimeSeriesPlugin::makeAllFieldsAvailable (const FlowRecord& flowRecord) noexcept
218+ {
219+ m_fieldHandlers[NetTimeSeriesFields::NTS_MEAN].setAsAvailable (flowRecord);
220+ m_fieldHandlers[NetTimeSeriesFields::NTS_MIN].setAsAvailable (flowRecord);
221+ m_fieldHandlers[NetTimeSeriesFields::NTS_MAX].setAsAvailable (flowRecord);
222+ m_fieldHandlers[NetTimeSeriesFields::NTS_STDEV].setAsAvailable (flowRecord);
223+ m_fieldHandlers[NetTimeSeriesFields::NTS_KURTOSIS].setAsAvailable (flowRecord);
224+ m_fieldHandlers[NetTimeSeriesFields::NTS_ROOT_MEAN_SQUARE].setAsAvailable (flowRecord);
225+ m_fieldHandlers[NetTimeSeriesFields::NTS_AVERAGE_DISPERSION].setAsAvailable (flowRecord);
226+ m_fieldHandlers[NetTimeSeriesFields::NTS_MEAN_SCALED_TIME].setAsAvailable (flowRecord);
227+ m_fieldHandlers[NetTimeSeriesFields::NTS_MEAN_DIFFTIMES].setAsAvailable (flowRecord);
228+ m_fieldHandlers[NetTimeSeriesFields::NTS_MIN_DIFFTIMES].setAsAvailable (flowRecord);
229+ m_fieldHandlers[NetTimeSeriesFields::NTS_MAX_DIFFTIMES].setAsAvailable (flowRecord);
230+ m_fieldHandlers[NetTimeSeriesFields::NTS_TIME_DISTRIBUTION].setAsAvailable (flowRecord);
231+ m_fieldHandlers[NetTimeSeriesFields::NTS_SWITCHING_RATIO].setAsAvailable (flowRecord);
232+ }
233+
234+ void NetTimeSeriesPlugin::onDestroy (void * pluginContext) noexcept
235+ {
236+ std::destroy_at (reinterpret_cast <NetTimeSeriesContext*>(pluginContext));
237+ }
238+
239+ PluginDataMemoryLayout NetTimeSeriesPlugin::getDataMemoryLayout () const noexcept
240+ {
241+ return {
242+ .size = sizeof (NetTimeSeriesContext),
243+ .alignment = alignof (NetTimeSeriesContext),
244+ };
129245}
130246
131- static const PluginRegistrar<NETTISAPlugin, ProcessPluginFactory>
247+ static const PluginRegistrar<
248+ NetTimeSeriesPlugin,
249+ PluginFactory<ProcessPlugin, const std::string&, FieldManager&>>
132250 nettisaRegistrar (nettisaPluginManifest);
133251
134- } // namespace ipxp
252+ } // namespace ipxp::process::nettisa
0 commit comments