Skip to content

Commit 922adfd

Browse files
committed
++ cache
1 parent 6e22ef2 commit 922adfd

File tree

2 files changed

+31
-34
lines changed

2 files changed

+31
-34
lines changed

storage/cache.cpp

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ std::string NHTFlowCache::get_name() const noexcept
6666

6767
NHTFlowCache::NHTFlowCache()
6868
{
69-
m_hash_function = [](const uint8_t* data, size_t length) -> uint64_t {
70-
return XXH64(data, length, 0);
71-
};
7269
}
7370

7471
NHTFlowCache::~NHTFlowCache()
@@ -210,13 +207,13 @@ void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int return_flags)
210207
try_to_export(flow_index, false, pkt.ts, FLOW_END_FORCED);
211208
}
212209

213-
std::tuple<CacheRowSpan, std::optional<size_t>, size_t>
210+
NHTFlowCache::FlowSearch
214211
NHTFlowCache::find_row(const std::variant<FlowKeyv4, FlowKeyv6>& key, const std::optional<uint16_t>& vlan_id) noexcept
215212
{
216213
const auto [data, length] = std::visit([](const auto& key) {
217214
return std::make_pair(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
218215
}, key);
219-
const size_t hash_value = m_hash_function(data, length);
216+
const size_t hash_value = XXH64(data, length, 0);
220217
const size_t first_flow_in_row = hash_value & m_line_mask;
221218
const CacheRowSpan row(&m_flow_table[first_flow_in_row], m_line_size);
222219
if (const std::optional<size_t> flow_index = row.find_by_hash(hash_value, vlan_id); flow_index.has_value()) {
@@ -225,25 +222,21 @@ NHTFlowCache::find_row(const std::variant<FlowKeyv4, FlowKeyv6>& key, const std:
225222
return {row, std::nullopt, hash_value};
226223
}
227224

228-
std::pair<CacheRowSpan, std::variant<std::pair<size_t, bool>, size_t>>
225+
std::pair<NHTFlowCache::FlowSearch, bool>
229226
NHTFlowCache::find_flow_index(const std::variant<FlowKeyv4, FlowKeyv6>& key,
230-
const std::variant<FlowKeyv4, FlowKeyv6>& key_reversed, const std::optional<uint16_t>& vlan_id) noexcept
227+
const std::variant<FlowKeyv4, FlowKeyv6>& key_reversed, const std::optional<uint16_t>& vlan_id) noexcept
231228
{
232-
233-
const auto [direct_row, direct_flow_index, direct_hash_value] = find_row(key, vlan_id);
234-
if (direct_flow_index.has_value()) {
235-
return {direct_row, std::make_pair(*direct_flow_index, true)};
236-
}
237-
if (m_split_biflow) {
238-
return {direct_row, direct_hash_value};
229+
const FlowSearch direct_search = find_row(key, vlan_id);
230+
if (direct_search.flow_index.has_value() || m_split_biflow) {
231+
return {direct_search, true};
239232
}
240233

241-
const auto [reversed_row, reversed_flow_index, reversed_hash_value] = find_row(key_reversed, vlan_id);
242-
if (reversed_flow_index.has_value()) {
243-
return {reversed_row, std::make_pair(*reversed_flow_index, false)};
234+
const FlowSearch reverse_search = find_row(key_reversed, vlan_id);
235+
if (reverse_search.flow_index.has_value()) {
236+
return {reverse_search, false};
244237
}
245238

246-
return {direct_row, direct_hash_value};
239+
return {direct_search, true};
247240
}
248241

249242
static bool is_tcp_connection_restart(const Packet& packet, const Flow& flow) noexcept
@@ -509,7 +502,6 @@ static bool check_ip_version(const Packet& pkt) noexcept
509502
int NHTFlowCache::put_pkt(Packet& packet)
510503
{
511504
plugins_pre_create(packet);
512-
packet.source_pkt = true;
513505

514506
if (m_enable_fragmentation_cache) {
515507
try_to_fill_ports_to_fragmented_packet(packet);
@@ -524,44 +516,42 @@ int NHTFlowCache::put_pkt(Packet& packet)
524516
packet.src_port, packet.dst_port, packet.ip_proto, static_cast<IP>(packet.ip_version));
525517

526518
prefetch_export_expired();
527-
528-
auto [row, flow_identification] =
519+
520+
auto [flow_search, source_to_destination] =
529521
find_flow_index(direct_key, reversed_key, packet.vlan_id);
530522

531-
if (std::holds_alternative<size_t>(flow_identification)) {
532-
const size_t hash_value = std::get<size_t>(flow_identification);
533-
const size_t empty_place = get_empty_place(row, packet.ts) + (hash_value & m_line_mask);
534-
create_record(packet, empty_place, hash_value);
523+
packet.source_pkt = source_to_destination;
524+
525+
if (!flow_search.flow_index.has_value()) {
526+
const size_t empty_place = get_empty_place(flow_search.cache_row, packet.ts) + (flow_search.hash_value & m_line_mask);
527+
create_record(packet, empty_place, flow_search.hash_value);
535528
export_expired(packet.ts);
536529
return 0;
537530
}
538531

539-
const auto& [flow_index, source_to_destination] = std::get<std::pair<size_t, bool>>(flow_identification);
532+
size_t flow_index = *flow_search.flow_index;
540533

541534
#ifdef WITH_CTT
542-
const size_t hash_value = m_flow_table[flow_index]->m_flow.flow_hash;
543535
const bool flow_is_waiting_for_export = !try_to_export_delayed_flow(packet, flow_index) && m_flow_table[flow_index]->is_waiting_for_export;
544536
#else
545537
constexpr bool flow_is_waiting_for_export = false;
546538
#endif /* WITH_CTT */
547539

548540
#ifdef WITH_CTT
549541
if (m_flow_table[flow_index]->is_empty()) {
550-
create_record(packet, flow_index, hash_value);
542+
create_record(packet, flow_index, flow_search.hash_value);
551543
export_expired(packet.ts);
552544
return 0;
553545
}
554546
#endif /* WITH_CTT */
555547

556-
packet.source_pkt = source_to_destination;
557548
/* Existing flow record was found, put flow record at the first index of flow line. */
558-
559549
const size_t relative_flow_index = flow_index % m_line_size;
560550
m_cache_stats.lookups += relative_flow_index + 1;
561551
m_cache_stats.lookups2 += (relative_flow_index + 1) * (relative_flow_index + 1);
562552
m_cache_stats.hits++;
563553

564-
row.advance_flow(relative_flow_index);
554+
flow_search.cache_row.advance_flow(relative_flow_index);
565555
return update_flow(packet, flow_index - relative_flow_index, flow_is_waiting_for_export);
566556
}
567557

storage/cache.hpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
8989
bool m_enable_fragmentation_cache{true};
9090
std::vector<FlowRecord*> m_flow_table;
9191
std::vector<FlowRecord> m_flows;
92-
std::function<size_t(const uint8_t* data, size_t length)> m_hash_function;
9392

9493
FragmentationCache m_fragmentation_cache{0,0};
9594
FlowEndReasonStats m_flow_end_reason_stats = {};
@@ -116,10 +115,18 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
116115
void prefetch_export_expired() const;
117116
void get_parser_options(CacheOptParser& parser) noexcept;
118117
void push_to_export_queue(size_t flow_index) noexcept;
119-
std::pair<CacheRowSpan, std::variant<std::pair<size_t, bool>, size_t>>
118+
119+
struct FlowSearch {
120+
CacheRowSpan cache_row; // Cache row where the flow to which packet belongs must be stored
121+
std::optional<size_t> flow_index; // Index of the flow in the table, if found
122+
size_t hash_value; // Hash value of the flow
123+
};
124+
125+
std::pair<FlowSearch, bool>
120126
find_flow_index(const std::variant<FlowKeyv4, FlowKeyv6>& key,
121127
const std::variant<FlowKeyv4, FlowKeyv6>& key_reversed, const std::optional<uint16_t>& vlan_id = std::nullopt) noexcept;
122-
std::tuple<CacheRowSpan, std::optional<size_t>, size_t>
128+
129+
FlowSearch
123130
find_row(const std::variant<FlowKeyv4, FlowKeyv6>& key, const std::optional<uint16_t>& vlan_id = std::nullopt) noexcept;
124131
bool try_to_export_on_inactive_timeout(size_t flow_index, const timeval& now) noexcept;
125132
bool try_to_export_on_active_timeout(size_t flow_index, const timeval& now) noexcept;

0 commit comments

Comments
 (0)