Skip to content

Commit 356ecad

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
Process plugins - Introduce PacketStats process plugin
1 parent 2cc2661 commit 356ecad

File tree

11 files changed

+669
-411
lines changed

11 files changed

+669
-411
lines changed

src/plugins/process/pstats/CMakeLists.txt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
project(ipfixprobe-process-pstats VERSION 1.0.0 DESCRIPTION "ipfixprobe-process-pstats plugin")
22

33
add_library(ipfixprobe-process-pstats MODULE
4-
src/pstats.cpp
5-
src/pstats.hpp
4+
src/packetStats.cpp
5+
src/packetStats.hpp
6+
src/packetStatsContext.hpp
7+
src/packetStatsStorage.hpp
68
)
79

810
set_target_properties(ipfixprobe-process-pstats PROPERTIES
911
CXX_VISIBILITY_PRESET hidden
1012
VISIBILITY_INLINES_HIDDEN YES
1113
)
1214

13-
target_include_directories(ipfixprobe-process-pstats PRIVATE
15+
target_include_directories(ipfixprobe-process-pstats PRIVATE
1416
${CMAKE_SOURCE_DIR}/include/
17+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/processPlugin
18+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/pluginFactory
19+
${CMAKE_SOURCE_DIR}/src/plugins/process/common
20+
${adaptmon_SOURCE_DIR}/lib/include/public/
1521
)
1622

1723
target_link_libraries(ipfixprobe-process-pstats PRIVATE
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# PacketStats Plugin
2+
3+
The **PacketStats Plugin** collects and exports properties of packet sequences from network flows.
4+
5+
## Features
6+
7+
- Does not export flows considered to be TCP scans.
8+
- Uses a memory-efficient storage mechanism to reduce memory usage for short flows (under 5 packets).
9+
10+
## Parameters
11+
12+
| Long name | Short name | Type | Default | Description |
13+
| --------- | --------------- | ------ | ------- | --------------------------------------------------------------------------- |
14+
| `i` | `includezeroes` | `bool` | false | Whether to include zero-length packets in the analysis |
15+
| `s` | `skipdup` | `bool` | false | Whether to skip packet duplicates. Compares every packet length to previous |
16+
17+
## Output Fields
18+
19+
| Field Name | Data Type | Description |
20+
| -------------------- | --------------------- | ------------------------------------------------------- |
21+
| `PPI_PKT_LENGTHS` | `array of uint16_t` | Lengths of the processed packets |
22+
| `PPI_PKT_TIMES` | `array of timestamps` | Timestamps of the processed packets |
23+
| `PPI_PKT_FLAGS` | `array of uint8_t` | TCP flags of the processed packets |
24+
| `PPI_PKT_DIRECTIONS` | `array of int8_t` | 1 for source → destination, -1 for destination → source |
25+
26+
## Usage
27+
28+
### YAML Configuration
29+
30+
Add the plugin to your ipfixprobe YAML configuration:
31+
32+
```yaml
33+
process_plugins:
34+
- pstats
35+
```
36+
37+
### CLI Usage
38+
39+
You can also enable the plugin directly from the command line:
40+
41+
`ipfixprobe -p pstats ...`
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/**
2+
* @file
3+
* @brief Plugin for parsing pstats traffic.
4+
* @author Tomas Cejka <[email protected]>
5+
* @author Karel Hynek <[email protected]>
6+
* @author Pavel Siska <[email protected]>
7+
* @author Damir Zainullin <[email protected]>
8+
* @date 2025
9+
*
10+
* Provides a plugin that calculates packet statistics as flags, acknowledgments, and sequences
11+
* within flows, stores it in per-flow plugin data, and exposes that field via FieldManager.
12+
*
13+
* @copyright Copyright (c) 2025 CESNET, z.s.p.o.
14+
*/
15+
16+
#include "packetStats.hpp"
17+
18+
#include "packetStatsGetters.hpp"
19+
#include "packetStatsOptionsParser.hpp"
20+
21+
#include <iostream>
22+
23+
#include <fieldGroup.hpp>
24+
#include <fieldManager.hpp>
25+
#include <flowRecord.hpp>
26+
#include <pluginFactory.hpp>
27+
#include <pluginManifest.hpp>
28+
#include <pluginRegistrar.hpp>
29+
#include <utils.hpp>
30+
#include <utils/spanUtils.hpp>
31+
32+
namespace ipxp::process::packet_stats {
33+
34+
static const PluginManifest packetStatsPluginManifest = {
35+
.name = "pstats",
36+
.description = "Pstats process plugin for computing packet bursts stats.",
37+
.pluginVersion = "1.0.0",
38+
.apiVersion = "1.0.0",
39+
.usage =
40+
[]() {
41+
PacketStatsOptionsParser parser;
42+
parser.usage(std::cout);
43+
},
44+
};
45+
46+
static void createPacketStatsSchema(
47+
FieldManager& fieldManager,
48+
FieldHandlers<PacketStatsFields>& handlers) noexcept
49+
{
50+
FieldGroup schema = fieldManager.createFieldGroup("pstats");
51+
52+
/*handlers.insert(
53+
PacketStatsFields::PPI_PKT_LENGTHS,
54+
schema.addVectorField("PPI_PKT_LENGTHS", getPacketLengthsField));
55+
handlers.insert(
56+
PacketStatsFields::PPI_PKT_FLAGS,
57+
schema.addVectorField("PPI_PKT_FLAGS", getPacketFlagsField));
58+
handlers.insert(
59+
PacketStatsFields::PPI_PKT_DIRECTIONS,
60+
schema.addVectorField("PPI_PKT_DIRECTIONS", getPacketDirectionsField));
61+
handlers.insert(
62+
PacketStatsFields::PPI_PKT_TIMES,
63+
schema.addVectorField("PPI_PKT_TIMES", getPacketTimestampsField));*/
64+
}
65+
66+
PacketStatsPlugin::PacketStatsPlugin(
67+
[[maybe_unused]] const std::string& params,
68+
FieldManager& manager)
69+
{
70+
createPacketStatsSchema(manager, m_fieldHandlers);
71+
}
72+
73+
OnInitResult PacketStatsPlugin::onInit(const FlowContext& flowContext, void* pluginContext)
74+
{
75+
auto& packetStatsContext
76+
= *std::construct_at(reinterpret_cast<PacketStatsContext*>(pluginContext));
77+
updatePacketsData(
78+
*flowContext.packetContext.packet,
79+
*flowContext.packetContext.features,
80+
packetStatsContext);
81+
82+
return OnInitResult::ConstructedNeedsUpdate;
83+
}
84+
85+
OnUpdateResult PacketStatsPlugin::onUpdate(const FlowContext& flowContext, void* pluginContext)
86+
{
87+
auto& packetStatsContext
88+
= *std::construct_at(reinterpret_cast<PacketStatsContext*>(pluginContext));
89+
updatePacketsData(
90+
*flowContext.packetContext.packet,
91+
*flowContext.packetContext.features,
92+
packetStatsContext);
93+
94+
return OnUpdateResult::NeedsUpdate;
95+
}
96+
97+
OnExportResult
98+
PacketStatsPlugin::onExport(const FlowRecord& flowRecord, [[maybe_unused]] void* pluginContext)
99+
{
100+
const std::size_t packetsTotal = flowRecord.directionalData[Direction::Forward].packets
101+
+ flowRecord.directionalData[Direction::Reverse].packets;
102+
103+
const TCPFlags flags = flowRecord.directionalData[Direction::Forward].tcpFlags
104+
| flowRecord.directionalData[Direction::Reverse].tcpFlags;
105+
106+
if (packetsTotal <= MIN_FLOW_LENGTH && flags.bitfields.synchronize) {
107+
return OnExportResult::Remove;
108+
}
109+
110+
m_fieldHandlers[PacketStatsFields::PPI_PKT_LENGTHS].setAsAvailable(flowRecord);
111+
m_fieldHandlers[PacketStatsFields::PPI_PKT_TIMES].setAsAvailable(flowRecord);
112+
m_fieldHandlers[PacketStatsFields::PPI_PKT_FLAGS].setAsAvailable(flowRecord);
113+
m_fieldHandlers[PacketStatsFields::PPI_PKT_DIRECTIONS].setAsAvailable(flowRecord);
114+
115+
return OnExportResult::NoAction;
116+
}
117+
118+
constexpr static bool isSequenceOverflowed(const uint32_t currentValue, const uint32_t prevValue)
119+
{
120+
constexpr int64_t MAX_DIFF
121+
= static_cast<int64_t>(static_cast<double>(std::numeric_limits<uint32_t>::max()) / 100);
122+
123+
return static_cast<int64_t>(prevValue) - static_cast<int64_t>(currentValue) > MAX_DIFF;
124+
}
125+
126+
static bool isDuplicate(
127+
const amon::Packet& packet,
128+
const PacketFeatures& features,
129+
const PacketStatsContext& packetStatsContext) noexcept
130+
{
131+
if (features.tcp.has_value()) {
132+
return false;
133+
}
134+
135+
// Current seq <= previous ack?
136+
const bool suspiciousSequence
137+
= features.tcp->header().sequenceNumber
138+
<= packetStatsContext.processingState.lastSequence[features.direction]
139+
&& !isSequenceOverflowed(
140+
features.tcp->header().sequenceNumber,
141+
packetStatsContext.processingState.lastSequence[features.direction]);
142+
143+
// Current ack <= previous ack?
144+
const bool suspiciousAcknowledgment
145+
= features.tcp->header().acknowledgeNumber
146+
<= packetStatsContext.processingState.lastAcknowledgment[features.direction]
147+
&& !isSequenceOverflowed(
148+
features.tcp->header().acknowledgeNumber,
149+
packetStatsContext.processingState.lastAcknowledgment[features.direction]);
150+
151+
if (suspiciousSequence && suspiciousAcknowledgment
152+
&& packetStatsContext.processingState.currentStorageSize != 0
153+
&& features.ipPayloadLength
154+
== packetStatsContext.processingState.lastLength[features.direction]
155+
&& TCPFlags(features.tcp->flags())
156+
== packetStatsContext.processingState.lastFlags[features.direction]) {
157+
return true;
158+
}
159+
160+
return false;
161+
}
162+
163+
void PacketStatsPlugin::updatePacketsData(
164+
const amon::Packet& packet,
165+
const PacketFeatures& features,
166+
PacketStatsContext& packetStatsContext) noexcept
167+
{
168+
if (!features.tcp.has_value()) {
169+
return;
170+
}
171+
172+
if (m_skipDuplicates && isDuplicate(packet, features, packetStatsContext)) {
173+
return;
174+
}
175+
176+
packetStatsContext.processingState.lastSequence[features.direction]
177+
= features.tcp->header().sequenceNumber;
178+
packetStatsContext.processingState.lastAcknowledgment[features.direction]
179+
= features.tcp->header().acknowledgeNumber;
180+
packetStatsContext.processingState.lastLength[features.direction] = features.ipPayloadLength;
181+
packetStatsContext.processingState.lastFlags[features.direction]
182+
= TCPFlags(features.tcp->flags());
183+
184+
if (features.ipPayloadLength == 0 && !m_countEmptyPackets) {
185+
return;
186+
}
187+
188+
if (packetStatsContext.processingState.currentStorageSize == PacketStatsContext::INITIAL_SIZE) {
189+
packetStatsContext.reserveMaxSize();
190+
}
191+
if (packetStatsContext.processingState.currentStorageSize == PacketStatsContext::MAX_SIZE) {
192+
return;
193+
}
194+
195+
std::visit(
196+
[&](auto& storage) {
197+
storage->set(
198+
packetStatsContext.processingState.currentStorageSize++,
199+
static_cast<uint16_t>(features.ipPayloadLength),
200+
TCPFlags(features.tcp->flags()),
201+
packet.timestamp,
202+
features.direction ? 1 : -1);
203+
},
204+
packetStatsContext.storage);
205+
}
206+
207+
void PacketStatsPlugin::onDestroy(void* pluginContext) noexcept
208+
{
209+
std::destroy_at(reinterpret_cast<PacketStatsContext*>(pluginContext));
210+
}
211+
212+
PluginDataMemoryLayout PacketStatsPlugin::getDataMemoryLayout() const noexcept
213+
{
214+
return {
215+
.size = sizeof(PacketStatsContext),
216+
.alignment = alignof(PacketStatsContext),
217+
};
218+
}
219+
220+
static const PluginRegistrar<
221+
PacketStatsPlugin,
222+
PluginFactory<ProcessPlugin, const std::string&, FieldManager&>>
223+
packetStatsRegistrar(packetStatsPluginManifest);
224+
225+
} // namespace ipxp::process::packet_stats

0 commit comments

Comments
 (0)