Skip to content

Commit 420f466

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
Process plugins - Introduce NetTimeSeries process plugin
1 parent 676329b commit 420f466

File tree

7 files changed

+477
-248
lines changed

7 files changed

+477
-248
lines changed

src/plugins/process/nettisa/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ project(ipfixprobe-process-nettisa VERSION 1.0.0 DESCRIPTION "ipfixprobe-process
33
add_library(ipfixprobe-process-nettisa MODULE
44
src/nettisa.cpp
55
src/nettisa.hpp
6+
src/nettisaContext.hpp
7+
src/nettisaFields.hpp
68
)
79

810
set_target_properties(ipfixprobe-process-nettisa PROPERTIES
@@ -12,6 +14,10 @@ set_target_properties(ipfixprobe-process-nettisa PROPERTIES
1214

1315
target_include_directories(ipfixprobe-process-nettisa PRIVATE
1416
${CMAKE_SOURCE_DIR}/include/
17+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/processPlugin
18+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/pluginFactory
19+
${CMAKE_SOURCE_DIR}/src/plugins/process/common
20+
${adaptmon_SOURCE_DIR}/lib/include/public/
1521
)
1622

1723
target_link_libraries(ipfixprobe-process-nettisa PRIVATE
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# NetTimeSeries Plugin
2+
3+
This plugin analyzes network data as time series, enabling more comprehensive and insightful analysis.
4+
5+
## Features
6+
7+
- Calculates and exports statistical properties of the flow based on packet lengths.
8+
9+
## Output Fields
10+
11+
| Field Name | Data Type | Description |
12+
| ------------------------ | ---------- | ------------------------------------------------------------------------------ |
13+
| `NTS_MEAN` | `float` | Mean packet length over the flow duration. |
14+
| `NTS_MIN` | `uint16_t` | Minimum packet length over the flow duration. |
15+
| `NTS_MAX` | `uint16_t` | Maximum packet length over the flow duration. |
16+
| `NTS_STDEV` | `float` | Standard deviation of packet lengths over the flow duration. |
17+
| `NTS_KURTOSIS` | `float` | Kurtosis of packet lengths over the flow duration. |
18+
| `NTS_ROOT_MEAN_SQUARE` | `float` | Root mean square of packet lengths over the flow duration. |
19+
| `NTS_AVERAGE_DISPERSION` | `float` | Average dispersion of packet lengths over the flow duration. |
20+
| `NTS_MEAN_SCALED_TIME` | `float` | Mean of packet lengths scaled by time over the flow duration. |
21+
| `NTS_MEAN_DIFFTIMES` | `float` | Mean of time differences between packets over the flow duration. |
22+
| `NTS_MIN_DIFFTIMES` | `float` | Minimum of time differences between packets over the flow duration. |
23+
| `NTS_MAX_DIFFTIMES` | `float` | Maximum of time differences between packets over the flow duration. |
24+
| `NTS_TIME_DISTRIBUTION` | `float` | Sum of deviations from mean interpacket arrival times. |
25+
| `NTS_SWITCHING_RATIO` | `float` | Ratio of packets when payload length changed in comparison to previous packet. |
26+
27+
## Usage
28+
29+
### YAML Configuration
30+
31+
Add the plugin to your ipfixprobe YAML configuration:
32+
33+
```yaml
34+
process_plugins:
35+
- nettisa
36+
```
37+
38+
### CLI Usage
39+
40+
You can also enable the plugin directly from the command line:
41+
42+
`ipfixprobe -p nettisa ...`

src/plugins/process/nettisa/src/nettisa.cpp

Lines changed: 189 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,31 @@
22
* @file
33
* @brief Plugin for parsing Nettisa flow.
44
* @author Josef Koumar [email protected]
5+
* @author Damir Zainullin <[email protected]>
56
* @date 2025
67
*
7-
* Copyright (c) 2025 CESNET
8+
* Provides a plugin that extracts advanced statistics based on packet lengths,
9+
* stores them in per-flow plugin data, and exposes fields via FieldManager.
810
*
9-
* SPDX-License-Identifier: BSD-3-Clause
11+
* @copyright Copyright (c) 2025 CESNET, z.s.p.o.
1012
*/
1113

1214
#include "nettisa.hpp"
1315

16+
#include "nettisaGetters.hpp"
17+
1418
#include <cmath>
1519
#include <iostream>
1620

17-
#include <ipfixprobe/pluginFactory/pluginManifest.hpp>
18-
#include <ipfixprobe/pluginFactory/pluginRegistrar.hpp>
19-
#include <ipfixprobe/utils.hpp>
21+
#include <fieldGroup.hpp>
22+
#include <fieldManager.hpp>
23+
#include <flowRecord.hpp>
24+
#include <ipfixprobe/options.hpp>
25+
#include <pluginFactory.hpp>
26+
#include <pluginManifest.hpp>
27+
#include <pluginRegistrar.hpp>
2028

21-
namespace ipxp {
29+
namespace ipxp::process::nettisa {
2230

2331
static const PluginManifest nettisaPluginManifest = {
2432
.name = "nettisa",
@@ -32,103 +40,202 @@ static const PluginManifest nettisaPluginManifest = {
3240
},
3341
};
3442

35-
NETTISAPlugin::NETTISAPlugin(const std::string& params, int pluginID)
36-
: ProcessPlugin(pluginID)
43+
static FieldGroup createNetTimeSeriesSchema(
44+
FieldManager& fieldManager,
45+
FieldHandlers<NetTimeSeriesFields>& handlers) noexcept
3746
{
38-
init(params.c_str());
47+
FieldGroup schema = fieldManager.createFieldGroup("nettisa");
48+
49+
handlers.insert(
50+
NetTimeSeriesFields::NTS_MEAN,
51+
schema.addScalarField("NTS_MEAN", getNTSMeanField));
52+
53+
handlers.insert(NetTimeSeriesFields::NTS_MIN, schema.addScalarField("NTS_MIN", getNTSMinField));
54+
55+
handlers.insert(NetTimeSeriesFields::NTS_MAX, schema.addScalarField("NTS_MAX", getNTSMaxField));
56+
57+
handlers.insert(
58+
NetTimeSeriesFields::NTS_STDEV,
59+
schema.addScalarField("NTS_STDEV", getNTSStdevField));
60+
61+
handlers.insert(
62+
NetTimeSeriesFields::NTS_KURTOSIS,
63+
schema.addScalarField("NTS_KURTOSIS", getNTSKurtosisField));
64+
65+
handlers.insert(
66+
NetTimeSeriesFields::NTS_ROOT_MEAN_SQUARE,
67+
schema.addScalarField("NTS_ROOT_MEAN_SQUARE", getNTSRootMeanSquareField));
68+
69+
handlers.insert(
70+
NetTimeSeriesFields::NTS_AVERAGE_DISPERSION,
71+
schema.addScalarField("NTS_AVERAGE_DISPERSION", getNTSAverageDispersionField));
72+
73+
handlers.insert(
74+
NetTimeSeriesFields::NTS_MEAN_SCALED_TIME,
75+
schema.addScalarField("NTS_MEAN_SCALED_TIME", getNTSMeanScaledTimeField));
76+
77+
handlers.insert(
78+
NetTimeSeriesFields::NTS_MEAN_DIFFTIMES,
79+
schema.addScalarField("NTS_MEAN_DIFFTIMES", getNTSMeanDifftimesField));
80+
81+
handlers.insert(
82+
NetTimeSeriesFields::NTS_MAX_DIFFTIMES,
83+
schema.addScalarField("NTS_MAX_DIFFTIMES", getNTSMaxDifftimesField));
84+
85+
handlers.insert(
86+
NetTimeSeriesFields::NTS_MIN_DIFFTIMES,
87+
schema.addScalarField("NTS_MIN_DIFFTIMES", getNTSMinDifftimesField));
88+
89+
handlers.insert(
90+
NetTimeSeriesFields::NTS_TIME_DISTRIBUTION,
91+
schema.addScalarField("NTS_TIME_DISTRIBUTION", getNTSTimeDistributionField));
92+
93+
handlers.insert(
94+
NetTimeSeriesFields::NTS_SWITCHING_RATIO,
95+
schema.addScalarField("NTS_SWITCHING_RATIO", getNTSSwitchingRatioField));
96+
97+
return schema;
3998
}
4099

41-
ProcessPlugin* NETTISAPlugin::copy()
100+
NetTimeSeriesPlugin::NetTimeSeriesPlugin(
101+
[[maybe_unused]] const std::string& params,
102+
FieldManager& manager)
42103
{
43-
return new NETTISAPlugin(*this);
104+
createNetTimeSeriesSchema(manager, m_fieldHandlers);
44105
}
45106

46-
void NETTISAPlugin::update_record(
47-
RecordExtNETTISA* nettisa_data,
48-
const Packet& pkt,
49-
const Flow& rec)
107+
OnInitResult NetTimeSeriesPlugin::onInit(const FlowContext& flowContext, void* pluginContext)
50108
{
51-
float variation_from_mean = pkt.payload_len_wire - nettisa_data->mean;
52-
uint32_t n = rec.dst_packets + rec.src_packets;
53-
uint64_t packet_time = timeval2usec(pkt.ts);
54-
uint64_t record_time = timeval2usec(rec.time_first);
55-
float diff_time = fmax(packet_time - nettisa_data->prev_time, 0);
56-
nettisa_data->sum_payload += pkt.payload_len_wire;
57-
nettisa_data->prev_time = packet_time;
58-
// MEAN
59-
nettisa_data->mean += (variation_from_mean) / n;
60-
// MIN
61-
nettisa_data->min = std::min(nettisa_data->min, pkt.payload_len_wire);
62-
// MAX
63-
nettisa_data->max = std::max(nettisa_data->max, pkt.payload_len_wire);
64-
// ROOT MEAN SQUARE
65-
nettisa_data->root_mean_square += pow(pkt.payload_len_wire, 2);
66-
// AVERAGE DISPERSION
67-
nettisa_data->average_dispersion += abs(variation_from_mean);
68-
// KURTOSIS
69-
nettisa_data->kurtosis += pow(variation_from_mean, 4);
70-
// MEAN SCALED TIME
71-
nettisa_data->mean_scaled_time
72-
+= (packet_time - record_time - nettisa_data->mean_scaled_time) / n;
73-
// MEAN TIME DIFFERENCES
74-
nettisa_data->mean_difftimes += (diff_time - nettisa_data->mean_difftimes) / n;
75-
// MIN
76-
nettisa_data->min_difftimes = fmin(nettisa_data->min_difftimes, diff_time);
77-
// MAX
78-
nettisa_data->max_difftimes = fmax(nettisa_data->max_difftimes, diff_time);
79-
// TIME DISTRIBUTION
80-
nettisa_data->time_distribution += abs(nettisa_data->mean_difftimes - diff_time);
81-
// SWITCHING RATIO
82-
if (nettisa_data->prev_payload != pkt.packet_len_wire) {
83-
nettisa_data->switching_ratio += 1;
84-
nettisa_data->prev_payload = pkt.packet_len_wire;
109+
auto& nettisaContext
110+
= *std::construct_at(reinterpret_cast<NetTimeSeriesContext*>(pluginContext));
111+
updateNetTimeSeries(
112+
flowContext.flowRecord,
113+
*flowContext.packetContext.packet,
114+
*flowContext.packetContext.features,
115+
nettisaContext);
116+
return OnInitResult::ConstructedNeedsUpdate;
117+
}
118+
119+
OnUpdateResult NetTimeSeriesPlugin::onUpdate(const FlowContext& flowContext, void* pluginContext)
120+
{
121+
auto& nettisaContext = *reinterpret_cast<NetTimeSeriesContext*>(pluginContext);
122+
updateNetTimeSeries(
123+
flowContext.flowRecord,
124+
*flowContext.packetContext.packet,
125+
*flowContext.packetContext.features,
126+
nettisaContext);
127+
return OnUpdateResult::NeedsUpdate;
128+
}
129+
130+
void NetTimeSeriesPlugin::updateNetTimeSeries(
131+
FlowRecord& flowRecord,
132+
const amon::Packet& packet,
133+
const PacketFeatures& features,
134+
NetTimeSeriesContext& nettisaContext) noexcept
135+
{
136+
const float variationFromMean
137+
= static_cast<float>(features.ipPayloadLength) - nettisaContext.mean;
138+
const float packetsTotal = static_cast<float>(
139+
flowRecord.directionalData[Direction::Forward].packets
140+
+ flowRecord.directionalData[Direction::Reverse].packets + 1);
141+
const float diff = std::max<float>(
142+
static_cast<float>(
143+
packet.timestamp.nanoseconds() - flowRecord.timeLastUpdate.nanoseconds()),
144+
0);
145+
nettisaContext.processingState.sumPayload += features.ipPayloadLength;
146+
nettisaContext.processingState.prevTime = packet.timestamp;
147+
nettisaContext.mean += variationFromMean / packetsTotal;
148+
nettisaContext.min
149+
= std::min<uint16_t>(nettisaContext.min, static_cast<uint16_t>(features.ipPayloadLength));
150+
nettisaContext.max
151+
= std::max<uint16_t>(nettisaContext.max, static_cast<uint16_t>(features.ipPayloadLength));
152+
nettisaContext.rootMeanSquare += static_cast<float>(std::pow(features.ipPayloadLength, 2));
153+
nettisaContext.averageDispersion += std::abs(variationFromMean);
154+
nettisaContext.kurtosis += static_cast<float>(std::pow(variationFromMean, 4));
155+
nettisaContext.meanScaledTime
156+
+= static_cast<float>(
157+
packet.timestamp.nanoseconds() - flowRecord.timeCreation.nanoseconds()
158+
- nettisaContext.meanScaledTime)
159+
/ packetsTotal;
160+
nettisaContext.meanDifftimes += (diff - nettisaContext.meanDifftimes) / packetsTotal;
161+
nettisaContext.minDifftimes = std::min(nettisaContext.minDifftimes, diff);
162+
nettisaContext.maxDifftimes = std::max(nettisaContext.maxDifftimes, diff);
163+
nettisaContext.timeDistribution += std::abs(nettisaContext.meanDifftimes - diff);
164+
if (nettisaContext.processingState.prevPayload != features.ipPayloadLength) {
165+
nettisaContext.switchingRatio += 1;
166+
nettisaContext.processingState.prevPayload
167+
= static_cast<uint16_t>(features.ipPayloadLength);
85168
}
86169
}
87170

88-
int NETTISAPlugin::post_create(Flow& rec, const Packet& pkt)
171+
OnExportResult NetTimeSeriesPlugin::onExport(const FlowRecord& flowRecord, void* pluginContext)
89172
{
90-
RecordExtNETTISA* nettisa_data = new RecordExtNETTISA(m_pluginID);
91-
rec.add_extension(nettisa_data);
173+
auto& nettisaContext = *reinterpret_cast<NetTimeSeriesContext*>(pluginContext);
174+
175+
const float packetsTotal = static_cast<float>(
176+
flowRecord.directionalData[Direction::Forward].packets
177+
+ flowRecord.directionalData[Direction::Reverse].packets);
178+
if (packetsTotal == 1) {
179+
return OnExportResult::Remove;
180+
}
181+
nettisaContext.switchingRatio = nettisaContext.switchingRatio / packetsTotal;
182+
nettisaContext.standardDeviation = static_cast<float>(std::pow(
183+
(nettisaContext.rootMeanSquare / packetsTotal)
184+
- std::pow(
185+
static_cast<float>(nettisaContext.processingState.sumPayload) / packetsTotal,
186+
2),
187+
0.5));
188+
if (nettisaContext.standardDeviation == 0) {
189+
nettisaContext.kurtosis = 0;
190+
} else {
191+
nettisaContext.kurtosis = static_cast<float>(
192+
nettisaContext.kurtosis
193+
/ (packetsTotal * std::pow(nettisaContext.standardDeviation, 4)));
194+
}
195+
nettisaContext.timeDistribution = (nettisaContext.timeDistribution / (packetsTotal - 1))
196+
/ (nettisaContext.maxDifftimes - nettisaContext.minDifftimes);
92197

93-
nettisa_data->prev_time = timeval2usec(pkt.ts);
198+
nettisaContext.rootMeanSquare
199+
= static_cast<float>(std::pow(nettisaContext.rootMeanSquare / packetsTotal, 0.5));
200+
nettisaContext.averageDispersion = nettisaContext.averageDispersion / packetsTotal;
94201

95-
update_record(nettisa_data, pkt, rec);
96-
return 0;
202+
makeAllFieldsAvailable(flowRecord);
203+
return OnExportResult::NoAction;
97204
}
98205

99-
int NETTISAPlugin::post_update(Flow& rec, const Packet& pkt)
206+
void NetTimeSeriesPlugin::makeAllFieldsAvailable(const FlowRecord& flowRecord) noexcept
100207
{
101-
RecordExtNETTISA* nettisa_data = (RecordExtNETTISA*) rec.get_extension(m_pluginID);
208+
m_fieldHandlers[NetTimeSeriesFields::NTS_MEAN].setAsAvailable(flowRecord);
209+
m_fieldHandlers[NetTimeSeriesFields::NTS_MIN].setAsAvailable(flowRecord);
210+
m_fieldHandlers[NetTimeSeriesFields::NTS_MAX].setAsAvailable(flowRecord);
211+
m_fieldHandlers[NetTimeSeriesFields::NTS_STDEV].setAsAvailable(flowRecord);
212+
m_fieldHandlers[NetTimeSeriesFields::NTS_KURTOSIS].setAsAvailable(flowRecord);
213+
m_fieldHandlers[NetTimeSeriesFields::NTS_ROOT_MEAN_SQUARE].setAsAvailable(flowRecord);
214+
m_fieldHandlers[NetTimeSeriesFields::NTS_AVERAGE_DISPERSION].setAsAvailable(flowRecord);
215+
m_fieldHandlers[NetTimeSeriesFields::NTS_MEAN_SCALED_TIME].setAsAvailable(flowRecord);
216+
m_fieldHandlers[NetTimeSeriesFields::NTS_MEAN_DIFFTIMES].setAsAvailable(flowRecord);
217+
m_fieldHandlers[NetTimeSeriesFields::NTS_MIN_DIFFTIMES].setAsAvailable(flowRecord);
218+
m_fieldHandlers[NetTimeSeriesFields::NTS_MAX_DIFFTIMES].setAsAvailable(flowRecord);
219+
m_fieldHandlers[NetTimeSeriesFields::NTS_TIME_DISTRIBUTION].setAsAvailable(flowRecord);
220+
m_fieldHandlers[NetTimeSeriesFields::NTS_SWITCHING_RATIO].setAsAvailable(flowRecord);
221+
}
102222

103-
update_record(nettisa_data, pkt, rec);
104-
return 0;
223+
void NetTimeSeriesPlugin::onDestroy(void* pluginContext) noexcept
224+
{
225+
std::destroy_at(reinterpret_cast<NetTimeSeriesContext*>(pluginContext));
105226
}
106227

107-
void NETTISAPlugin::pre_export(Flow& rec)
228+
PluginDataMemoryLayout NetTimeSeriesPlugin::getDataMemoryLayout() const noexcept
108229
{
109-
RecordExtNETTISA* nettisa_data = (RecordExtNETTISA*) rec.get_extension(m_pluginID);
110-
uint32_t n = rec.src_packets + rec.dst_packets;
111-
if (n == 1) {
112-
rec.remove_extension(m_pluginID);
113-
return;
114-
} else {
115-
nettisa_data->switching_ratio = nettisa_data->switching_ratio / n;
116-
nettisa_data->stdev = pow(
117-
(nettisa_data->root_mean_square / n) - pow(nettisa_data->sum_payload / n, 2),
118-
0.5);
119-
if (nettisa_data->stdev == 0) {
120-
nettisa_data->kurtosis = 0;
121-
} else {
122-
nettisa_data->kurtosis = nettisa_data->kurtosis / (n * pow(nettisa_data->stdev, 4));
123-
}
124-
nettisa_data->time_distribution = (nettisa_data->time_distribution / (n - 1))
125-
/ (nettisa_data->max_difftimes - nettisa_data->min);
126-
}
127-
nettisa_data->root_mean_square = pow(nettisa_data->root_mean_square / n, 0.5);
128-
nettisa_data->average_dispersion = nettisa_data->average_dispersion / n;
230+
return {
231+
.size = sizeof(NetTimeSeriesContext),
232+
.alignment = alignof(NetTimeSeriesContext),
233+
};
129234
}
130235

131-
static const PluginRegistrar<NETTISAPlugin, ProcessPluginFactory>
236+
static const PluginRegistrar<
237+
NetTimeSeriesPlugin,
238+
PluginFactory<ProcessPlugin, const std::string&, FieldManager&>>
132239
nettisaRegistrar(nettisaPluginManifest);
133240

134-
} // namespace ipxp
241+
} // namespace ipxp::process::nettisa

0 commit comments

Comments
 (0)