@@ -310,9 +310,9 @@ void NHTFlowCache::try_to_add_flow_to_ctt(size_t flow_index) noexcept
310310}
311311#endif /* WITH_CTT */
312312
313- int NHTFlowCache::process_flow (Packet& packet, size_t flow_index, bool flow_is_waiting_for_export) noexcept
313+ int NHTFlowCache::update_flow (Packet& packet, size_t flow_index, bool flow_is_waiting_for_export) noexcept
314314{
315- if (is_tcp_connection_restart (packet, m_flow_table[flow_index]->m_flow ) && !flow_is_waiting_for_export ) {
315+ if (!flow_is_waiting_for_export && is_tcp_connection_restart (packet, m_flow_table[flow_index]->m_flow )) {
316316 if (try_to_export (flow_index, false , packet.ts , FLOW_END_EOF)) {
317317 put_pkt (packet);
318318 return 0 ;
@@ -361,9 +361,9 @@ bool NHTFlowCache::try_to_export_delayed_flow(const Packet& packet, size_t flow_
361361 ((packet.cttmeta_valid && !packet.cttmeta .ctt_rec_matched ) || packet.ts > m_flow_table[flow_index]->export_time )) {
362362 plugins_pre_export (m_flow_table[flow_index]->m_flow );
363363 export_flow (flow_index);
364- return false ;
364+ return true ;
365365 }
366- return m_flow_table[flow_index]-> is_waiting_for_export ;
366+ return false ;
367367}
368368#endif /* WITH_CTT */
369369
@@ -508,6 +508,7 @@ static bool check_ip_version(const Packet& pkt) noexcept
508508int NHTFlowCache::put_pkt (Packet& packet)
509509{
510510 plugins_pre_create (packet);
511+ packet.source_pkt = true ;
511512
512513 if (m_enable_fragmentation_cache) {
513514 try_to_fill_ports_to_fragmented_packet (packet);
@@ -524,64 +525,63 @@ int NHTFlowCache::put_pkt(Packet& packet)
524525 packet.src_port , packet.dst_port , packet.ip_proto , static_cast <IP>(packet.ip_version ));
525526
526527 auto [row, flow_id] = find_flow_index (direct_key, reversed_key, packet.vlan_id );
527- const bool flow_found = std::holds_alternative<std::pair<size_t , bool >>(flow_id);
528528
529+ if (std::holds_alternative<size_t >(flow_id)) {
530+ const size_t empty_place = get_empty_place (row, packet.ts );
531+ create_record (packet, empty_place, std::get<size_t >(flow_id));
532+ export_expired (packet.ts );
533+ return 0 ;
534+ }
535+
536+ const auto [flow_index, source_to_destination] = std::get<std::pair<size_t , bool >>(flow_id);
529537#ifdef WITH_CTT
530- const bool flow_is_waiting_for_export = flow_found && try_to_export_delayed_flow (packet, std::get<std::pair< size_t , bool >>(flow_id). first ) ;
538+ const bool flow_is_waiting_for_export = ! try_to_export_delayed_flow (packet, flow_index) && m_flow_table[flow_index]-> is_waiting_for_export ;
531539#else
532540 constexpr bool flow_is_waiting_for_export = false ;
533541#endif /* WITH_CTT */
534542
535- if (flow_found && !m_flow_table[std::get<std::pair<size_t , bool >>(flow_id).first ]->is_empty ()) {
536- /* Existing flow record was found, put flow record at the first index of flow line. */
537- const auto & [flow_index, source_to_destination] = std::get<std::pair<size_t , bool >>(flow_id);
538- packet.source_pkt = source_to_destination;
543+ if (m_flow_table[flow_index]->is_empty ()) {
544+ create_record (packet, flow_index, std::get<size_t >(flow_id));
545+ export_expired (packet.ts );
546+ return 0 ;
547+ }
548+
549+ packet.source_pkt = source_to_destination;
550+ /* Existing flow record was found, put flow record at the first index of flow line. */
539551
540- const size_t relative_flow_index = flow_index % m_line_size;
541- m_cache_stats.lookups += relative_flow_index + 1 ;
542- m_cache_stats.lookups2 += (relative_flow_index + 1 ) * (relative_flow_index + 1 );
543- m_cache_stats.hits ++;
552+ const size_t relative_flow_index = flow_index % m_line_size;
553+ m_cache_stats.lookups += relative_flow_index + 1 ;
554+ m_cache_stats.lookups2 += (relative_flow_index + 1 ) * (relative_flow_index + 1 );
555+ m_cache_stats.hits ++;
544556
545- row.advance_flow (relative_flow_index);
546- return process_flow (packet, flow_index - relative_flow_index, flow_is_waiting_for_export);
547- }
548- /* Existing flow record was not found. Find free place in flow line. */
549- packet.source_pkt = true ;
550- const std::optional<size_t > empty_index = flow_found
551- && m_flow_table[std::get<std::pair<size_t , bool >>(flow_id).first ]->is_empty ()
552- ? std::get<std::pair<size_t , bool >>(flow_id).first
553- : row.find_empty ();
554- const bool empty_found = empty_index.has_value ();
555-
556- size_t flow_index = -1 ;
557- const size_t hash_value = std::get<size_t >(flow_id);
558- const size_t row_begin = hash_value & m_line_mask;
559- if (empty_found) {
560- flow_index = row_begin + empty_index.value ();
557+ row.advance_flow (relative_flow_index);
558+ return update_flow (packet, flow_index - relative_flow_index, flow_is_waiting_for_export);
559+ }
560+
561+ size_t NHTFlowCache::get_empty_place (CacheRowSpan& row, const timeval& now) noexcept
562+ {
563+ if (const std::optional<size_t > empty_index = row.find_empty (); empty_index.has_value ()) {
561564 m_cache_stats.empty ++;
562- } else {
565+ return empty_index.value ();
566+ }
567+ m_cache_stats.not_empty ++;
568+
563569#ifdef WITH_CTT
564- const size_t victim_index = row.find_victim (packet.ts );
565- #else
566- const size_t victim_index = m_line_size - 1 ;
570+ const size_t victim_index = row.find_victim (packet.ts );
571+ #else /* WITH_CTT */
572+ const size_t victim_index = m_line_size - 1 ;
567573#endif /* WITH_CTT */
568- row.advance_flow_to (victim_index, m_new_flow_insert_index);
569- flow_index = row_begin + m_new_flow_insert_index;
574+ row.advance_flow_to (victim_index, m_new_flow_insert_index);
570575#ifdef WITH_CTT
571- if (m_flow_table[flow_index]->is_in_ctt && !m_flow_table[flow_index]->is_waiting_for_export ) {
572- m_flow_table[flow_index]->is_waiting_for_export = true ;
573- m_ctt_controller->remove_record_without_notification (m_flow_table[flow_index]->m_flow .flow_hash_ctt );
574- m_flow_table[flow_index]->export_time = {packet.ts .tv_sec + 1 , packet.ts .tv_usec };
575- }
576- #endif /* WITH_CTT */
577- plugins_pre_export (m_flow_table[flow_index]->m_flow );
578- export_flow (flow_index, FLOW_END_NO_RES);
579-
580- m_cache_stats.not_empty ++;
576+ if (row[m_new_flow_insert_index]->is_in_ctt && !row[m_new_flow_insert_index]->is_waiting_for_export ) {
577+ row[m_new_flow_insert_index]->is_waiting_for_export = true ;
578+ m_ctt_controller->remove_record_without_notification (row[m_new_flow_insert_index]->m_flow .flow_hash_ctt );
579+ row[m_new_flow_insert_index]->export_time = {now.tv_sec + 1 , now.tv_usec };
581580 }
582- create_record (packet, flow_index, hash_value);
583- export_expired (packet.ts );
584- return 0 ;
581+ #endif /* WITH_CTT */
582+ plugins_pre_export (row[m_new_flow_insert_index]->m_flow );
583+ export_flow (&row[m_new_flow_insert_index] - m_flow_table.data (), FLOW_END_NO_RES);
584+ return &row[m_new_flow_insert_index] + m_new_flow_insert_index - m_flow_table.data ();
585585}
586586
587587bool NHTFlowCache::try_to_export_on_active_timeout (size_t flow_index, const timeval& now) noexcept
0 commit comments