Skip to content

Commit ab26e24

Browse files
committed
CTT - add FlowRecord structure
1 parent 69ced11 commit ab26e24

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed

storage/flowRecord.cpp

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/**
2+
* \file
3+
* \author Damir Zainullin <[email protected]>
4+
* \brief FlowRecord implementation.
5+
*/
6+
/*
7+
* Copyright (C) 2023 CESNET
8+
*
9+
* LICENSE TERMS
10+
*
11+
* Redistribution and use in source and binary forms, with or without
12+
* modification, are permitted provided that the following conditions
13+
* are met:
14+
* 1. Redistributions of source code must retain the above copyright
15+
* notice, this list of conditions and the following disclaimer.
16+
* 2. Redistributions in binary form must reproduce the above copyright
17+
* notice, this list of conditions and the following disclaimer in
18+
* the documentation and/or other materials provided with the
19+
* distribution.
20+
* 3. Neither the name of the Company nor the names of its contributors
21+
* may be used to endorse or promote products derived from this
22+
* software without specific prior written permission.
23+
*/
24+
25+
#include "flowRecord.hpp"
26+
27+
#include <ipfixprobe/flowifc.hpp>
28+
#include <ipfixprobe/packet.hpp>
29+
#include <cstring>
30+
31+
namespace ipxp {
32+
33+
FlowRecord::FlowRecord()
34+
{
35+
erase();
36+
};
37+
38+
FlowRecord::~FlowRecord()
39+
{
40+
erase();
41+
};
42+
43+
void FlowRecord::erase()
44+
{
45+
m_flow.remove_extensions();
46+
m_hash = 0;
47+
memset(&m_flow.time_first, 0, sizeof(m_flow.time_first));
48+
memset(&m_flow.time_last, 0, sizeof(m_flow.time_last));
49+
m_flow.ip_version = 0;
50+
m_flow.ip_proto = 0;
51+
memset(&m_flow.src_ip, 0, sizeof(m_flow.src_ip));
52+
memset(&m_flow.dst_ip, 0, sizeof(m_flow.dst_ip));
53+
m_flow.src_port = 0;
54+
m_flow.dst_port = 0;
55+
m_flow.src_packets = 0;
56+
m_flow.dst_packets = 0;
57+
m_flow.src_bytes = 0;
58+
m_flow.dst_bytes = 0;
59+
m_flow.src_tcp_flags = 0;
60+
m_flow.dst_tcp_flags = 0;
61+
#ifdef WITH_CTT
62+
is_waiting_for_export = false;
63+
is_in_ctt = false;
64+
#endif /* WITH_CTT */
65+
}
66+
void FlowRecord::reuse()
67+
{
68+
m_flow.remove_extensions();
69+
m_flow.time_first = m_flow.time_last;
70+
m_flow.src_packets = 0;
71+
m_flow.dst_packets = 0;
72+
m_flow.src_bytes = 0;
73+
m_flow.dst_bytes = 0;
74+
m_flow.src_tcp_flags = 0;
75+
m_flow.dst_tcp_flags = 0;
76+
#ifdef WITH_CTT
77+
is_waiting_for_export = false;
78+
is_in_ctt = false;
79+
#endif /* WITH_CTT */
80+
}
81+
82+
void FlowRecord::create(const Packet &pkt, uint64_t hash)
83+
{
84+
m_flow.src_packets = 1;
85+
86+
m_hash = hash;
87+
88+
m_flow.time_first = pkt.ts;
89+
m_flow.time_last = pkt.ts;
90+
m_flow.flow_hash = hash;
91+
92+
memcpy(m_flow.src_mac, pkt.src_mac, 6);
93+
memcpy(m_flow.dst_mac, pkt.dst_mac, 6);
94+
95+
if (pkt.ip_version == IP::v4) {
96+
m_flow.ip_version = pkt.ip_version;
97+
m_flow.ip_proto = pkt.ip_proto;
98+
m_flow.src_ip.v4 = pkt.src_ip.v4;
99+
m_flow.dst_ip.v4 = pkt.dst_ip.v4;
100+
m_flow.src_bytes = pkt.ip_len;
101+
} else if (pkt.ip_version == IP::v6) {
102+
m_flow.ip_version = pkt.ip_version;
103+
m_flow.ip_proto = pkt.ip_proto;
104+
memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16);
105+
memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16);
106+
m_flow.src_bytes = pkt.ip_len;
107+
}
108+
109+
if (pkt.ip_proto == IPPROTO_TCP) {
110+
m_flow.src_port = pkt.src_port;
111+
m_flow.dst_port = pkt.dst_port;
112+
m_flow.src_tcp_flags = pkt.tcp_flags;
113+
} else if (pkt.ip_proto == IPPROTO_UDP) {
114+
m_flow.src_port = pkt.src_port;
115+
m_flow.dst_port = pkt.dst_port;
116+
} else if (pkt.ip_proto == IPPROTO_ICMP ||
117+
pkt.ip_proto == IPPROTO_ICMPV6) {
118+
m_flow.src_port = pkt.src_port;
119+
m_flow.dst_port = pkt.dst_port;
120+
}
121+
#ifdef WITH_CTT
122+
is_waiting_for_export = false;
123+
is_in_ctt = false;
124+
#endif /* WITH_CTT */
125+
}
126+
127+
void FlowRecord::update(const Packet &pkt)
128+
{
129+
m_flow.time_last = pkt.ts;
130+
if (pkt.source_pkt) {
131+
m_flow.src_packets++;
132+
m_flow.src_bytes += pkt.ip_len;
133+
134+
if (pkt.ip_proto == IPPROTO_TCP) {
135+
m_flow.src_tcp_flags |= pkt.tcp_flags;
136+
}
137+
} else {
138+
m_flow.dst_packets++;
139+
m_flow.dst_bytes += pkt.ip_len;
140+
141+
if (pkt.ip_proto == IPPROTO_TCP) {
142+
m_flow.dst_tcp_flags |= pkt.tcp_flags;
143+
}
144+
}
145+
}
146+
147+
} // ipxp

storage/flowRecord.hpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* \file
3+
* \author Damir Zainullin <[email protected]>
4+
* \brief FlowRecord declaration.
5+
*/
6+
/*
7+
* Copyright (C) 2023 CESNET
8+
*
9+
* LICENSE TERMS
10+
*
11+
* Redistribution and use in source and binary forms, with or without
12+
* modification, are permitted provided that the following conditions
13+
* are met:
14+
* 1. Redistributions of source code must retain the above copyright
15+
* notice, this list of conditions and the following disclaimer.
16+
* 2. Redistributions in binary form must reproduce the above copyright
17+
* notice, this list of conditions and the following disclaimer in
18+
* the documentation and/or other materials provided with the
19+
* distribution.
20+
* 3. Neither the name of the Company nor the names of its contributors
21+
* may be used to endorse or promote products derived from this
22+
* software without specific prior written permission.
23+
*/
24+
25+
#pragma once
26+
27+
#include <config.h>
28+
#include <ipfixprobe/packet.hpp>
29+
#include <ipfixprobe/flowifc.hpp>
30+
#include <cstdint>
31+
32+
namespace ipxp {
33+
34+
class alignas(64) FlowRecord
35+
{
36+
uint64_t m_hash;
37+
public:
38+
Flow m_flow;
39+
#ifdef WITH_CTT
40+
bool is_in_ctt; /**< Flow is offloaded by CTT if set. */
41+
bool is_waiting_for_export; /**< Export request of flow was sent to ctt,
42+
but still has not been processed in ctt. */
43+
timeval export_time; /**< Time point when we sure that the export request has already been processed by ctt,
44+
and flow is not in ctt anymore. */
45+
#endif /* WITH_CTT */
46+
47+
FlowRecord();
48+
~FlowRecord();
49+
50+
void erase();
51+
void reuse();
52+
53+
__attribute__((always_inline)) bool is_empty() const noexcept
54+
{
55+
return m_hash == 0;
56+
}
57+
58+
__attribute__((always_inline)) bool belongs(uint64_t hash) const noexcept
59+
{
60+
return hash == m_hash;
61+
}
62+
63+
void create(const Packet &pkt, uint64_t pkt_hash);
64+
void update(const Packet &pkt);
65+
};
66+
67+
} // ipxp

0 commit comments

Comments
 (0)