Skip to content

Commit e781c4d

Browse files
jaroslavpesekSiskaPavel
authored andcommitted
ndp ctt controller - ctt cond compilation
1 parent 6e4793a commit e781c4d

File tree

9 files changed

+236
-292
lines changed

9 files changed

+236
-292
lines changed

configure.ac

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,32 @@ if [[ -z "$WITH_NDP_TRUE" ]]; then
226226
RPM_BUILDREQ+=" netcope-common-devel"
227227
fi
228228

229+
AC_ARG_WITH([ctt],
230+
AC_HELP_STRING([--with-ctt],[Compile ipfixprobe with ctt plugin for using Connection Tracking Table]),
231+
[
232+
if test "$withval" = "yes"; then
233+
withctt="yes"
234+
else
235+
withctt="no"
236+
fi
237+
], [withctt="no"]
238+
)
239+
240+
if test x${withctt} = xyes; then
241+
AC_LANG_PUSH([C++])
242+
CXXFLAGS="$CXXFLAGS -std=c++17"
243+
AC_CHECK_HEADERS([ctt.hpp], [libctt=yes], AC_MSG_ERROR([ctt.hpp not found. Try installing libctt-devel]))
244+
AC_LANG_POP([C++])
245+
fi
246+
247+
AM_CONDITIONAL(WITH_CTT, test x${libctt} = xyes && test x${withctt} = xyes)
248+
if [[ -z "$WITH_CTT_TRUE" ]]; then
249+
AC_DEFINE([WITH_CTT], [1], [Define to 1 if the ctt is available])
250+
LIBS="-lctt $LIBS"
251+
RPM_REQUIRES+=" libctt"
252+
RPM_BUILDREQ+=" libctt-devel"
253+
fi
254+
229255
AC_ARG_WITH([pcap],
230256
AC_HELP_STRING([--with-pcap],[Compile ipfixprobe with pcap plugin for capturing using libpcap library]),
231257
[

include/ipfixprobe/flowifc.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,13 @@ struct Flow : public Record {
263263
};
264264

265265
uint64_t flow_hash;
266+
267+
#ifdef WITH_CTT
266268
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
267269
bool ctt_valid; /**< CTT validity flag. */
268270
int ctt_state; /**< CTT - offload or not. */
271+
#endif
272+
269273
PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check
270274
if the flow process plugins requires all available data, only
271275
metadata or nothing of this. */

input/ndp.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <cstdint>
3131
#include <cstdio>
3232
#include <cstring>
33+
#include <iostream>
3334
#include <netinet/in.h>
3435
#include <sys/types.h>
3536
#include <cstdint>

storage/cache-ctt.cpp

Whitespace-only changes.

storage/cache-ctt.hpp

Lines changed: 0 additions & 72 deletions
This file was deleted.

storage/cache.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <cstdlib>
3434
#include <iostream>
3535
#include <cstring>
36+
#include <ratio>
3637
#include <sys/time.h>
3738

3839
#include <ipfixprobe/ring.h>
@@ -192,6 +193,9 @@ void NHTFlowCache::init(const char *params)
192193
m_timeout_idx = 0;
193194
m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1);
194195
m_line_new_idx = m_line_size / 2;
196+
#ifdef WITH_CTT
197+
m_ctt_controller.init(parser.m_dev, 0);
198+
#endif /* WITH_CTT */
195199

196200
if (m_export_queue == nullptr) {
197201
throw PluginError("output queue must be set before init");
@@ -658,4 +662,68 @@ void NHTFlowCache::prefetch_export_expired() const
658662
__builtin_prefetch(m_flow_table[i], 0, 1);
659663
}
660664
}
665+
666+
#ifdef WITH_CTT
667+
668+
void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts)
669+
{
670+
try {
671+
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
672+
std::vector<std::byte> state = assemble_state(
673+
OffloadMode::PACKET_OFFLOAD,
674+
MetaType::FULL,
675+
ts);
676+
677+
std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: ";
678+
for (auto& byte : state) {
679+
std::cout << std::hex << static_cast<int>(byte) << " ";
680+
}
681+
std::cout << std::endl;
682+
m_commander->write_record(std::move(key), std::move(state));
683+
}
684+
catch (const std::exception& e) {
685+
throw;
686+
}
687+
}
688+
689+
void CttController::export_record(uint64_t flow_hash_ctt)
690+
{
691+
try {
692+
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
693+
m_commander->export_and_delete_record(std::move(key));
694+
std::cout << "Exported record with key: " << flow_hash_ctt << std::endl;
695+
}
696+
catch (const std::exception& e) {
697+
throw;
698+
}
699+
}
700+
701+
std::vector<std::byte> CttController::assemble_key(uint64_t flow_hash_ctt)
702+
{
703+
std::vector<std::byte> key(key_size_bytes, std::byte(0));
704+
for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) {
705+
key[i] = static_cast<std::byte>((flow_hash_ctt >> (8 * i)) & 0xFF);
706+
}
707+
return key;
708+
}
709+
710+
std::vector<std::byte> CttController::assemble_state(
711+
OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts)
712+
{
713+
std::vector<std::byte> state(state_size_bytes, std::byte(0));
714+
std::vector<std::byte> state_mask(state_mask_size_bytes, std::byte(0));
715+
716+
state[0] = static_cast<std::byte>(offload_mode);
717+
state[1] = static_cast<std::byte>(meta_type);
718+
719+
// timestamp in sec/ns format, 32+32 bits - 64 bits in total
720+
for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) {
721+
state[2 + i] = static_cast<std::byte>((ts.tv_sec >> (8 * i)) & 0xFF);
722+
}
723+
for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) {
724+
state[6 + i] = static_cast<std::byte>((ts.tv_usec >> (8 * i)) & 0xFF);
725+
}
726+
return state;
661727
}
728+
#endif // WITH_CTT
729+
}

storage/cache.hpp

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,99 @@
4242

4343
#include "fragmentationCache/fragmentationCache.hpp"
4444

45+
#ifdef WITH_CTT
46+
#include <sys/time.h>
47+
#include <ctt_async.hpp>
48+
#include <ctt_factory.hpp>
49+
#include <ctt_exceptions.hpp>
50+
#include <ctt_modes.hpp>
51+
#include <ctt.hpp>
52+
#endif /* WITH_CTT */
53+
4554
namespace ipxp {
4655

56+
#ifdef WITH_CTT
57+
58+
class CttController {
59+
public:
60+
enum class OffloadMode : uint8_t {
61+
NO_OFFLOAD = 0x0,
62+
PACKET_OFFLOAD = 0x1,
63+
META_EXPORT = 0x2,
64+
PACKET_OFFLOAD_WITH_EXPORT = 0x3
65+
};
66+
enum class MetaType : uint8_t {
67+
FULL = 0x0,
68+
HALF = 0x1,
69+
TS_ONLY = 0x2,
70+
NO_META = 0x3
71+
};
72+
/**
73+
* @brief init the CTT.
74+
*
75+
* @param nfb_dev The NFB device file (e.g., "/dev/nfb0").
76+
* @param ctt_comp_index The index of the CTT component.
77+
*/
78+
void init(const std::string& nfb_dev, unsigned ctt_comp_index) {
79+
m_commander = std::make_unique<ctt::AsyncCommander>(ctt::NfbParams{nfb_dev, ctt_comp_index});
80+
try {
81+
// Get UserInfo to determine key, state, and state_mask sizes
82+
ctt::UserInfo user_info = m_commander->get_user_info();
83+
key_size_bytes = (user_info.key_bit_width + 7) / 8;
84+
state_size_bytes = (user_info.state_bit_width + 7) / 8;
85+
state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8;
86+
87+
// Enable the CTT
88+
std::future<void> enable_future = m_commander->enable(true);
89+
enable_future.wait();
90+
}
91+
catch (const std::exception& e) {
92+
throw;
93+
}
94+
}
95+
96+
/**
97+
* @brief Command: mark a flow for offload.
98+
*
99+
* @param flow_hash_ctt The flow hash to be offloaded.
100+
*/
101+
void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first);
102+
103+
/**
104+
* @brief Command: export a flow from the CTT.
105+
*
106+
* @param flow_hash_ctt The flow hash to be exported.
107+
*/
108+
void export_record(uint64_t flow_hash_ctt);
109+
110+
private:
111+
std::unique_ptr<ctt::AsyncCommander> m_commander;
112+
size_t key_size_bytes;
113+
size_t state_size_bytes;
114+
size_t state_mask_size_bytes;
115+
116+
/**
117+
* @brief Assembles the state vector from the given values.
118+
*
119+
* @param offload_mode The offload mode.
120+
* @param meta_type The metadata type.
121+
* @param timestamp_first The first timestamp of the flow.
122+
* @return A byte vector representing the assembled state vector.
123+
*/
124+
std::vector<std::byte> assemble_state(
125+
OffloadMode offload_mode, MetaType meta_type,
126+
const struct timeval& timestamp_first);
127+
128+
/**
129+
* @brief Assembles the key vector from the given flow hash.
130+
*
131+
* @param flow_hash_ctt The flow hash.
132+
* @return A byte vector representing the assembled key vector.
133+
*/
134+
std::vector<std::byte> assemble_key(uint64_t flow_hash_ctt);
135+
};
136+
#endif /* WITH_CTT */
137+
47138
struct __attribute__((packed)) flow_key_v4_t {
48139
uint16_t src_port;
49140
uint16_t dst_port;
@@ -99,6 +190,9 @@ class CacheOptParser : public OptionsParser
99190
bool m_enable_fragmentation_cache;
100191
std::size_t m_frag_cache_size;
101192
time_t m_frag_cache_timeout;
193+
#ifdef WITH_CTT
194+
std::string m_dev;
195+
#endif /* WITH_CTT */
102196

103197
CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"),
104198
m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE),
@@ -156,6 +250,16 @@ class CacheOptParser : public OptionsParser
156250
}
157251
return true;
158252
});
253+
254+
#ifdef WITH_CTT
255+
register_option("d", "dev", "DEV", "Device name",
256+
[this](const char *arg) {
257+
m_dev = arg;
258+
return true;
259+
},
260+
OptionFlags::RequiredArgument);
261+
#endif /* WITH_CTT */
262+
159263
}
160264
};
161265

@@ -214,6 +318,35 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
214318
*/
215319
void set_telemetry_dir(std::shared_ptr<telemetry::Directory> dir) override;
216320

321+
#ifdef WITH_CTT
322+
323+
int plugins_post_create(Flow &rec, Packet &pkt) {
324+
int ret = StoragePlugin::plugins_post_create(rec, pkt);
325+
rec.ctt_state = static_cast<int>(CttController::OffloadMode::NO_OFFLOAD);
326+
if (no_data_required(rec)) {
327+
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
328+
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
329+
}
330+
return ret;
331+
}
332+
333+
// override post_update method
334+
int plugins_post_update(Flow &rec, Packet &pkt) {
335+
int ret = StoragePlugin::plugins_post_update(rec, pkt);
336+
if (no_data_required(rec) && (rec.ctt_state == static_cast<int>(CttController::OffloadMode::NO_OFFLOAD))) {
337+
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
338+
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
339+
}
340+
return ret;
341+
}
342+
343+
// override pre_export method
344+
void plugins_pre_export(Flow &rec) {
345+
StoragePlugin::plugins_pre_export(rec);
346+
m_ctt_controller.export_record(rec.flow_hash_ctt);
347+
}
348+
#endif /* WITH_CTT */
349+
217350
private:
218351
uint32_t m_cache_size;
219352
uint32_t m_line_size;
@@ -242,7 +375,9 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
242375
char m_key_inv[MAX_KEY_LENGTH];
243376
FlowRecord **m_flow_table;
244377
FlowRecord *m_flow_records;
245-
378+
#ifdef WITH_CTT
379+
CttController m_ctt_controller;
380+
#endif /* WITH_CTT */
246381
FragmentationCache m_fragmentation_cache;
247382
FlowEndReasonStats m_flow_end_reason_stats = {};
248383
FlowRecordStats m_flow_record_stats = {};
@@ -265,4 +400,4 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
265400
};
266401

267402
}
268-
#endif /* IPXP_STORAGE_CACHE_HPP */
403+
#endif /* IPXP_STORAGE_CACHE_HPP */

0 commit comments

Comments
 (0)