Skip to content

Commit 8cdf254

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
Process plugins - Introduce PacketStats process plugin
1 parent b79435a commit 8cdf254

File tree

11 files changed

+667
-411
lines changed

11 files changed

+667
-411
lines changed

src/plugins/process/pstats/CMakeLists.txt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
project(ipfixprobe-process-pstats VERSION 1.0.0 DESCRIPTION "ipfixprobe-process-pstats plugin")
22

33
add_library(ipfixprobe-process-pstats MODULE
4-
src/pstats.cpp
5-
src/pstats.hpp
4+
src/packetStats.cpp
5+
src/packetStats.hpp
6+
src/packetStatsContext.hpp
7+
src/packetStatsStorage.hpp
68
)
79

810
set_target_properties(ipfixprobe-process-pstats PROPERTIES
911
CXX_VISIBILITY_PRESET hidden
1012
VISIBILITY_INLINES_HIDDEN YES
1113
)
1214

13-
target_include_directories(ipfixprobe-process-pstats PRIVATE
15+
target_include_directories(ipfixprobe-process-pstats PRIVATE
1416
${CMAKE_SOURCE_DIR}/include/
17+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/processPlugin
18+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/pluginFactory
19+
${CMAKE_SOURCE_DIR}/src/plugins/process/common
20+
${adaptmon_SOURCE_DIR}/lib/include/public/
1521
)
1622

1723
target_link_libraries(ipfixprobe-process-pstats PRIVATE
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# PacketStats Plugin
2+
3+
The **PacketStats Plugin** collects and exports properties of packet sequences from network flows.
4+
5+
## Features
6+
7+
- Does not export flows considered to be TCP scans.
8+
- Uses a memory-efficient storage mechanism to reduce memory usage for short flows (under 5 packets).
9+
10+
## Parameters
11+
12+
| Long name | Short name | Type | Default | Description |
13+
| --------- | --------------- | ------ | ------- | --------------------------------------------------------------------------- |
14+
| `i` | `includezeroes` | `bool` | false | Whether to include zero-length packets in the analysis |
15+
| `s` | `skipdup` | `bool` | false | Whether to skip packet duplicates. Compares every packet length to previous |
16+
17+
## Output Fields
18+
19+
| Field Name | Data Type | Description |
20+
| -------------------- | --------------------- | ------------------------------------------------------- |
21+
| `PPI_PKT_LENGTHS` | `array of uint16_t` | Lengths of the processed packets |
22+
| `PPI_PKT_TIMES` | `array of timestamps` | Timestamps of the processed packets |
23+
| `PPI_PKT_FLAGS` | `array of uint8_t` | TCP flags of the processed packets |
24+
| `PPI_PKT_DIRECTIONS` | `array of int8_t` | 1 for source → destination, -1 for destination → source |
25+
26+
## Usage
27+
28+
### YAML Configuration
29+
30+
Add the plugin to your ipfixprobe YAML configuration:
31+
32+
```yaml
33+
process_plugins:
34+
- pstats
35+
```
36+
37+
### CLI Usage
38+
39+
You can also enable the plugin directly from the command line:
40+
41+
`ipfixprobe -p pstats ...`
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/**
2+
* @file
3+
* @brief Plugin for parsing pstats traffic.
4+
* @author Tomas Cejka <[email protected]>
5+
* @author Karel Hynek <[email protected]>
6+
* @author Pavel Siska <[email protected]>
7+
* @author Damir Zainullin <[email protected]>
8+
* @date 2025
9+
*
10+
* Provides a plugin that calculates packet statistics as flags, acknowledgments, and sequences
11+
* within flows, stores it in per-flow plugin data, and exposes that field via FieldManager.
12+
*
13+
* @copyright Copyright (c) 2025 CESNET, z.s.p.o.
14+
*/
15+
16+
#include "packetStats.hpp"
17+
18+
#include "packetStatsGetters.hpp"
19+
#include "packetStatsOptionsParser.hpp"
20+
21+
#include <iostream>
22+
23+
#include <amon/layers/TCP.hpp>
24+
#include <fieldGroup.hpp>
25+
#include <fieldManager.hpp>
26+
#include <flowRecord.hpp>
27+
#include <pluginFactory.hpp>
28+
#include <pluginManifest.hpp>
29+
#include <pluginRegistrar.hpp>
30+
#include <utils.hpp>
31+
#include <utils/spanUtils.hpp>
32+
33+
namespace ipxp::process::packet_stats {
34+
35+
static const PluginManifest packetStatsPluginManifest = {
36+
.name = "pstats",
37+
.description = "Pstats process plugin for computing packet bursts stats.",
38+
.pluginVersion = "1.0.0",
39+
.apiVersion = "1.0.0",
40+
.usage =
41+
[]() {
42+
PacketStatsOptionsParser parser;
43+
parser.usage(std::cout);
44+
},
45+
};
46+
47+
static void createPacketStatsSchema(
48+
FieldManager& fieldManager,
49+
FieldHandlers<PacketStatsFields>& handlers) noexcept
50+
{
51+
FieldGroup schema = fieldManager.createFieldGroup("pstats");
52+
53+
/*handlers.insert(
54+
PacketStatsFields::PPI_PKT_LENGTHS,
55+
schema.addVectorField("PPI_PKT_LENGTHS", getPacketLengthsField));
56+
handlers.insert(
57+
PacketStatsFields::PPI_PKT_FLAGS,
58+
schema.addVectorField("PPI_PKT_FLAGS", getPacketFlagsField));
59+
handlers.insert(
60+
PacketStatsFields::PPI_PKT_DIRECTIONS,
61+
schema.addVectorField("PPI_PKT_DIRECTIONS", getPacketDirectionsField));
62+
handlers.insert(
63+
PacketStatsFields::PPI_PKT_TIMES,
64+
schema.addVectorField("PPI_PKT_TIMES", getPacketTimestampsField));*/
65+
}
66+
67+
PacketStatsPlugin::PacketStatsPlugin(
68+
[[maybe_unused]] const std::string& params,
69+
FieldManager& manager)
70+
{
71+
createPacketStatsSchema(manager, m_fieldHandlers);
72+
}
73+
74+
OnInitResult PacketStatsPlugin::onInit(const FlowContext& flowContext, void* pluginContext)
75+
{
76+
auto& packetStatsContext
77+
= *std::construct_at(reinterpret_cast<PacketStatsContext*>(pluginContext));
78+
updatePacketsData(
79+
*flowContext.packetContext.packet,
80+
flowContext.packetDirection,
81+
packetStatsContext);
82+
83+
return OnInitResult::ConstructedNeedsUpdate;
84+
}
85+
86+
OnUpdateResult PacketStatsPlugin::onUpdate(const FlowContext& flowContext, void* pluginContext)
87+
{
88+
auto& packetStatsContext
89+
= *std::construct_at(reinterpret_cast<PacketStatsContext*>(pluginContext));
90+
updatePacketsData(
91+
*flowContext.packetContext.packet,
92+
flowContext.packetDirection,
93+
packetStatsContext);
94+
95+
return OnUpdateResult::NeedsUpdate;
96+
}
97+
98+
OnExportResult
99+
PacketStatsPlugin::onExport(const FlowRecord& flowRecord, [[maybe_unused]] void* pluginContext)
100+
{
101+
const std::size_t packetsTotal = flowRecord.directionalData[Direction::Forward].packets
102+
+ flowRecord.directionalData[Direction::Reverse].packets;
103+
104+
const TCPFlags flags = flowRecord.directionalData[Direction::Forward].tcpFlags
105+
| flowRecord.directionalData[Direction::Reverse].tcpFlags;
106+
107+
if (packetsTotal <= MIN_FLOW_LENGTH && flags.bitfields.synchronize) {
108+
return OnExportResult::Remove;
109+
}
110+
111+
m_fieldHandlers[PacketStatsFields::PPI_PKT_LENGTHS].setAsAvailable(flowRecord);
112+
m_fieldHandlers[PacketStatsFields::PPI_PKT_TIMES].setAsAvailable(flowRecord);
113+
m_fieldHandlers[PacketStatsFields::PPI_PKT_FLAGS].setAsAvailable(flowRecord);
114+
m_fieldHandlers[PacketStatsFields::PPI_PKT_DIRECTIONS].setAsAvailable(flowRecord);
115+
116+
return OnExportResult::NoAction;
117+
}
118+
119+
constexpr static bool isSequenceOverflowed(const uint32_t currentValue, const uint32_t prevValue)
120+
{
121+
constexpr int64_t MAX_DIFF
122+
= static_cast<int64_t>(static_cast<double>(std::numeric_limits<uint32_t>::max()) / 100);
123+
124+
return static_cast<int64_t>(prevValue) - static_cast<int64_t>(currentValue) > MAX_DIFF;
125+
}
126+
127+
static bool isDuplicate(
128+
const amon::Packet& packet,
129+
const amon::layers::TCPView& tcp,
130+
const Direction direction,
131+
const std::size_t ipPayloadLength,
132+
const PacketStatsContext& packetStatsContext) noexcept
133+
{
134+
// Current seq <= previous ack?
135+
const bool suspiciousSequence = tcp.header().sequenceNumber
136+
<= packetStatsContext.processingState.lastSequence[direction]
137+
&& !isSequenceOverflowed(tcp.header().sequenceNumber,
138+
packetStatsContext.processingState.lastSequence[direction]);
139+
140+
// Current ack <= previous ack?
141+
const bool suspiciousAcknowledgment = tcp.header().acknowledgeNumber
142+
<= packetStatsContext.processingState.lastAcknowledgment[direction]
143+
&& !isSequenceOverflowed(tcp.header().acknowledgeNumber,
144+
packetStatsContext.processingState.lastAcknowledgment[direction]);
145+
146+
if (suspiciousSequence && suspiciousAcknowledgment
147+
&& packetStatsContext.processingState.currentStorageSize != 0
148+
&& ipPayloadLength == packetStatsContext.processingState.lastLength[direction]
149+
&& TCPFlags(tcp.flags()) == packetStatsContext.processingState.lastFlags[direction]) {
150+
return true;
151+
}
152+
153+
return false;
154+
}
155+
156+
void PacketStatsPlugin::updatePacketsData(
157+
const amon::Packet& packet,
158+
const Direction direction,
159+
PacketStatsContext& packetStatsContext) noexcept
160+
{
161+
auto tcp = getLayerView<amon::layers::TCPView>(packet, packet.layout.l4);
162+
if (!tcp.has_value()) {
163+
return;
164+
}
165+
166+
const std::optional<std::size_t> ipPayloadLength = getIPPayloadLength(packet);
167+
if (!ipPayloadLength.has_value()) {
168+
return;
169+
}
170+
171+
if (m_skipDuplicates
172+
&& isDuplicate(packet, *tcp, direction, *ipPayloadLength, packetStatsContext)) {
173+
return;
174+
}
175+
176+
packetStatsContext.processingState.lastSequence[direction] = tcp->header().sequenceNumber;
177+
packetStatsContext.processingState.lastAcknowledgment[direction]
178+
= tcp->header().acknowledgeNumber;
179+
packetStatsContext.processingState.lastLength[direction] = *ipPayloadLength;
180+
packetStatsContext.processingState.lastFlags[direction] = TCPFlags(tcp->flags());
181+
182+
if (*ipPayloadLength == 0 && !m_countEmptyPackets) {
183+
return;
184+
}
185+
186+
if (packetStatsContext.processingState.currentStorageSize == PacketStatsContext::INITIAL_SIZE) {
187+
packetStatsContext.reserveMaxSize();
188+
}
189+
if (packetStatsContext.processingState.currentStorageSize == PacketStatsContext::MAX_SIZE) {
190+
return;
191+
}
192+
193+
std::visit(
194+
[&](auto& storage) {
195+
storage->set(
196+
packetStatsContext.processingState.currentStorageSize++,
197+
static_cast<uint16_t>(*ipPayloadLength),
198+
TCPFlags(tcp->flags()),
199+
packet.timestamp,
200+
direction ? 1 : -1);
201+
},
202+
packetStatsContext.storage);
203+
}
204+
205+
void PacketStatsPlugin::onDestroy(void* pluginContext) noexcept
206+
{
207+
std::destroy_at(reinterpret_cast<PacketStatsContext*>(pluginContext));
208+
}
209+
210+
PluginDataMemoryLayout PacketStatsPlugin::getDataMemoryLayout() const noexcept
211+
{
212+
return {
213+
.size = sizeof(PacketStatsContext),
214+
.alignment = alignof(PacketStatsContext),
215+
};
216+
}
217+
218+
static const PluginRegistrar<
219+
PacketStatsPlugin,
220+
PluginFactory<ProcessPlugin, const std::string&, FieldManager&>>
221+
packetStatsRegistrar(packetStatsPluginManifest);
222+
223+
} // namespace ipxp::process::packet_stats

0 commit comments

Comments
 (0)