3333#include < cstdlib>
3434#include < iostream>
3535#include < cstring>
36+ #include < ratio>
3637#include < sys/time.h>
3738
3839#include < ipfixprobe/ring.h>
@@ -138,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash)
138139 m_flow.src_port = pkt.src_port ;
139140 m_flow.dst_port = pkt.dst_port ;
140141 }
142+ #ifdef WITH_CTT
143+ m_flow.is_delayed = false ;
144+ m_delayed_flow_waiting = false ;
145+ #endif /* WITH_CTT */
141146}
142147
143148void FlowRecord::update (const Packet &pkt, bool src)
144149{
150+ if (m_flow.is_delayed && !pkt.cttmeta .ctt_rec_matched ) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow
151+ auto flow_hash = m_hash;
152+ m_delayed_flow = m_flow;
153+ m_delayed_flow_waiting = true ;
154+ erase (); // erase the old flow, keeping the delayed flow
155+ create (pkt, flow_hash);
156+ return ;
157+ }
145158 m_flow.time_last = pkt.ts ;
146159 if (src) {
147160 m_flow.src_packets ++;
@@ -192,6 +205,9 @@ void NHTFlowCache::init(const char *params)
192205 m_timeout_idx = 0 ;
193206 m_line_mask = (m_cache_size - 1 ) & ~(m_line_size - 1 );
194207 m_line_new_idx = m_line_size / 2 ;
208+ #ifdef WITH_CTT
209+ m_ctt_controller.init (parser.m_dev , 0 );
210+ #endif /* WITH_CTT */
195211
196212 if (m_export_queue == nullptr ) {
197213 throw PluginError (" output queue must be set before init" );
@@ -256,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue)
256272
257273void NHTFlowCache::export_flow (size_t index)
258274{
275+ if (m_flow_table[index]->m_flow .is_delayed ) {
276+ return ;
277+ }
278+ if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow .is_delayed ) {
279+ m_total_exported++;
280+ update_flow_end_reason_stats (m_flow_table[index]->m_delayed_flow .end_reason );
281+ update_flow_record_stats (
282+ m_flow_table[index]->m_delayed_flow .src_packets
283+ + m_flow_table[index]->m_delayed_flow .dst_packets );
284+ ipx_ring_push (m_export_queue, &m_flow_table[index]->m_delayed_flow );
285+ }
259286 m_total_exported++;
260287 update_flow_end_reason_stats (m_flow_table[index]->m_flow .end_reason );
261288 update_flow_record_stats (
@@ -502,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts)
502529 m_flow_table[i]->m_flow .end_reason = get_export_reason (m_flow_table[i]->m_flow );
503530 plugins_pre_export (m_flow_table[i]->m_flow );
504531 export_flow (i);
532+ if (!m_flow_table[i]->is_empty () && m_flow_table[i]->m_flow .is_delayed && m_flow_table[i]->m_flow .delay_time >= ts) {
533+ m_flow_table[i]->m_flow .is_delayed = false ;
534+ plugins_pre_export (m_flow_table[i]->m_flow );
535+ export_flow (i);
536+ }
537+ if (!m_flow_table[i]->is_empty () && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow .delay_time >= ts) {
538+ m_flow_table[i]->m_delayed_flow_waiting = false ;
539+ plugins_pre_export (m_flow_table[i]->m_delayed_flow );
540+ export_flow (i);
541+ }
505542#ifdef FLOW_CACHE_STATS
506543 m_expired++;
507544#endif /* FLOW_CACHE_STATS */
@@ -658,4 +695,61 @@ void NHTFlowCache::prefetch_export_expired() const
658695 __builtin_prefetch (m_flow_table[i], 0 , 1 );
659696 }
660697}
698+
699+ #ifdef WITH_CTT
700+
701+ void CttController::create_record (uint64_t flow_hash_ctt, const struct timeval & ts)
702+ {
703+ try {
704+ std::vector<std::byte> key = assemble_key (flow_hash_ctt);
705+ std::vector<std::byte> state = assemble_state (
706+ OffloadMode::PACKET_OFFLOAD,
707+ MetaType::FULL,
708+ ts);
709+ m_commander->write_record (std::move (key), std::move (state));
710+ }
711+ catch (const std::exception& e) {
712+ throw ;
713+ }
714+ }
715+
716+ void CttController::export_record (uint64_t flow_hash_ctt)
717+ {
718+ try {
719+ std::vector<std::byte> key = assemble_key (flow_hash_ctt);
720+ m_commander->export_and_delete_record (std::move (key));
721+ }
722+ catch (const std::exception& e) {
723+ throw ;
724+ }
725+ }
726+
727+ std::vector<std::byte> CttController::assemble_key (uint64_t flow_hash_ctt)
728+ {
729+ std::vector<std::byte> key (key_size_bytes, std::byte (0 ));
730+ for (size_t i = 0 ; i < sizeof (flow_hash_ctt) && i < key_size_bytes; ++i) {
731+ key[i] = static_cast <std::byte>((flow_hash_ctt >> (8 * i)) & 0xFF );
732+ }
733+ return key;
734+ }
735+
736+ std::vector<std::byte> CttController::assemble_state (
737+ OffloadMode offload_mode, MetaType meta_type, const struct timeval & ts)
738+ {
739+ std::vector<std::byte> state (state_size_bytes, std::byte (0 ));
740+ std::vector<std::byte> state_mask (state_mask_size_bytes, std::byte (0 ));
741+
742+ state[0 ] = static_cast <std::byte>(offload_mode);
743+ state[1 ] = static_cast <std::byte>(meta_type);
744+
745+ // timestamp in sec/ns format, 32+32 bits - 64 bits in total
746+ for (size_t i = 0 ; i < sizeof (ts.tv_sec ) && i < 4 ; ++i) {
747+ state[2 + i] = static_cast <std::byte>((ts.tv_sec >> (8 * i)) & 0xFF );
748+ }
749+ for (size_t i = 0 ; i < sizeof (ts.tv_usec ) && i < 4 ; ++i) {
750+ state[6 + i] = static_cast <std::byte>((ts.tv_usec >> (8 * i)) & 0xFF );
751+ }
752+ return state;
661753}
754+ #endif // WITH_CTT
755+ }
0 commit comments