4242
4343#include " fragmentationCache/fragmentationCache.hpp"
4444
45+ #ifdef WITH_CTT
46+ #include < sys/time.h>
47+ #include < ctt_async.hpp>
48+ #include < ctt_factory.hpp>
49+ #include < ctt_exceptions.hpp>
50+ #include < ctt_modes.hpp>
51+ #include < ctt.hpp>
52+ #endif /* WITH_CTT */
53+
4554namespace ipxp {
4655
56+ #ifdef WITH_CTT
57+
58+ class CttController {
59+ public:
60+ enum class OffloadMode : uint8_t {
61+ NO_OFFLOAD = 0x0 ,
62+ PACKET_OFFLOAD = 0x1 ,
63+ META_EXPORT = 0x2 ,
64+ PACKET_OFFLOAD_WITH_EXPORT = 0x3
65+ };
66+ enum class MetaType : uint8_t {
67+ FULL = 0x0 ,
68+ HALF = 0x1 ,
69+ TS_ONLY = 0x2 ,
70+ NO_META = 0x3
71+ };
72+ /* *
73+ * @brief init the CTT.
74+ *
75+ * @param nfb_dev The NFB device file (e.g., "/dev/nfb0").
76+ * @param ctt_comp_index The index of the CTT component.
77+ */
78+ void init (const std::string& nfb_dev, unsigned ctt_comp_index) {
79+ m_commander = std::make_unique<ctt::AsyncCommander>(ctt::NfbParams{nfb_dev, ctt_comp_index});
80+ try {
81+ // Get UserInfo to determine key, state, and state_mask sizes
82+ ctt::UserInfo user_info = m_commander->get_user_info ();
83+ key_size_bytes = (user_info.key_bit_width + 7 ) / 8 ;
84+ state_size_bytes = (user_info.state_bit_width + 7 ) / 8 ;
85+ state_mask_size_bytes = (user_info.state_mask_bit_width + 7 ) / 8 ;
86+
87+ // Enable the CTT
88+ std::future<void > enable_future = m_commander->enable (true );
89+ enable_future.wait ();
90+ }
91+ catch (const std::exception& e) {
92+ throw ;
93+ }
94+ }
95+
96+ /* *
97+ * @brief Command: mark a flow for offload.
98+ *
99+ * @param flow_hash_ctt The flow hash to be offloaded.
100+ */
101+ void create_record (uint64_t flow_hash_ctt, const struct timeval & timestamp_first);
102+
103+ /* *
104+ * @brief Command: export a flow from the CTT.
105+ *
106+ * @param flow_hash_ctt The flow hash to be exported.
107+ */
108+ void export_record (uint64_t flow_hash_ctt);
109+
110+ private:
111+ std::unique_ptr<ctt::AsyncCommander> m_commander;
112+ size_t key_size_bytes;
113+ size_t state_size_bytes;
114+ size_t state_mask_size_bytes;
115+
116+ /* *
117+ * @brief Assembles the state vector from the given values.
118+ *
119+ * @param offload_mode The offload mode.
120+ * @param meta_type The metadata type.
121+ * @param timestamp_first The first timestamp of the flow.
122+ * @return A byte vector representing the assembled state vector.
123+ */
124+ std::vector<std::byte> assemble_state (
125+ OffloadMode offload_mode, MetaType meta_type,
126+ const struct timeval & timestamp_first);
127+
128+ /* *
129+ * @brief Assembles the key vector from the given flow hash.
130+ *
131+ * @param flow_hash_ctt The flow hash.
132+ * @return A byte vector representing the assembled key vector.
133+ */
134+ std::vector<std::byte> assemble_key (uint64_t flow_hash_ctt);
135+ };
136+ #endif /* WITH_CTT */
137+
47138struct __attribute__ ((packed)) flow_key_v4_t {
48139 uint16_t src_port;
49140 uint16_t dst_port;
@@ -99,6 +190,9 @@ class CacheOptParser : public OptionsParser
99190 bool m_enable_fragmentation_cache;
100191 std::size_t m_frag_cache_size;
101192 time_t m_frag_cache_timeout;
193+ #ifdef WITH_CTT
194+ std::string m_dev;
195+ #endif /* WITH_CTT */
102196
103197 CacheOptParser () : OptionsParser(" cache" , " Storage plugin implemented as a hash table" ),
104198 m_cache_size (1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE),
@@ -156,6 +250,16 @@ class CacheOptParser : public OptionsParser
156250 }
157251 return true ;
158252 });
253+
254+ #ifdef WITH_CTT
255+ register_option (" d" , " dev" , " DEV" , " Device name" ,
256+ [this ](const char *arg) {
257+ m_dev = arg;
258+ return true ;
259+ },
260+ OptionFlags::RequiredArgument);
261+ #endif /* WITH_CTT */
262+
159263 }
160264};
161265
@@ -214,6 +318,35 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
214318 */
215319 void set_telemetry_dir (std::shared_ptr<telemetry::Directory> dir) override ;
216320
321+ #ifdef WITH_CTT
322+
323+ int plugins_post_create (Flow &rec, Packet &pkt) {
324+ int ret = StoragePlugin::plugins_post_create (rec, pkt);
325+ rec.ctt_state = static_cast <int >(CttController::OffloadMode::NO_OFFLOAD);
326+ if (no_data_required (rec)) {
327+ m_ctt_controller.create_record (rec.flow_hash_ctt , rec.time_first );
328+ rec.ctt_state = static_cast <int >(CttController::OffloadMode::PACKET_OFFLOAD);
329+ }
330+ return ret;
331+ }
332+
333+ // override post_update method
334+ int plugins_post_update (Flow &rec, Packet &pkt) {
335+ int ret = StoragePlugin::plugins_post_update (rec, pkt);
336+ if (no_data_required (rec) && (rec.ctt_state == static_cast <int >(CttController::OffloadMode::NO_OFFLOAD))) {
337+ m_ctt_controller.create_record (rec.flow_hash_ctt , rec.time_first );
338+ rec.ctt_state = static_cast <int >(CttController::OffloadMode::PACKET_OFFLOAD);
339+ }
340+ return ret;
341+ }
342+
343+ // override pre_export method
344+ void plugins_pre_export (Flow &rec) {
345+ StoragePlugin::plugins_pre_export (rec);
346+ m_ctt_controller.export_record (rec.flow_hash_ctt );
347+ }
348+ #endif /* WITH_CTT */
349+
217350private:
218351 uint32_t m_cache_size;
219352 uint32_t m_line_size;
@@ -242,7 +375,9 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
242375 char m_key_inv[MAX_KEY_LENGTH];
243376 FlowRecord **m_flow_table;
244377 FlowRecord *m_flow_records;
245-
378+ #ifdef WITH_CTT
379+ CttController m_ctt_controller;
380+ #endif /* WITH_CTT */
246381 FragmentationCache m_fragmentation_cache;
247382 FlowEndReasonStats m_flow_end_reason_stats = {};
248383 FlowRecordStats m_flow_record_stats = {};
@@ -265,4 +400,4 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
265400};
266401
267402}
268- #endif /* IPXP_STORAGE_CACHE_HPP */
403+ #endif /* IPXP_STORAGE_CACHE_HPP */
0 commit comments