Skip to content

Commit 82ea4af

Browse files
committed
++ cache
1 parent 1938819 commit 82ea4af

File tree

6 files changed

+125
-104
lines changed

6 files changed

+125
-104
lines changed

storage/cache.cpp

Lines changed: 39 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -169,24 +169,17 @@ void NHTFlowCache::push_to_export_queue(size_t flow_index) noexcept
169169

170170
void NHTFlowCache::finish()
171171
{
172-
/*auto it = std::find_if(m_hashes_in_ctt.begin(), m_hashes_in_ctt.end(), [](const auto& pair) {
173-
return pair.second <= 0;
174-
});*/
175-
for (decltype(m_cache_size) i = 0; i < m_cache_size; i++) {
176-
if (!m_flow_table[i]->is_empty()) {
172+
std::for_each_n(m_flow_table.begin(), m_cache_size, [this](FlowRecord*& flow_record) {
173+
if (!flow_record->is_empty()) {
177174
#ifdef WITH_CTT
178-
if (m_flow_table[i]->is_in_ctt && !m_flow_table[i]->is_waiting_for_export) {
179-
send_export_request_to_ctt(m_flow_table[i]->m_flow.flow_hash_ctt);
175+
if (flow_record->is_in_ctt && !flow_record->is_waiting_for_export) {
176+
m_ctt_controller->remove_record_without_notification(flow_record->m_flow.flow_hash_ctt);
180177
}
181178
#endif /* WITH_CTT */
182-
plugins_pre_export(m_flow_table[i]->m_flow);
183-
export_flow(i, FLOW_END_FORCED);
179+
plugins_pre_export(flow_record->m_flow);
180+
export_flow(flow_record->m_flow.flow_hash, FLOW_END_FORCED);
184181
}
185-
}
186-
/*if (m_hashes_in_ctt.size() > 0){
187-
throw "bad CTT size";
188-
}
189-
std::cout << "CTT hash collisions: " << m_ctt_hash_collision << std::endl;*/
182+
});
190183
}
191184

192185
void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int return_flags)
@@ -195,10 +188,10 @@ void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int return_flags)
195188

196189
if (return_flags == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) {
197190
#ifdef WITH_CTT
198-
if (m_flow_table[flow_index]->is_in_ctt && !m_flow_table[flow_index]->is_waiting_for_export) {
191+
/*if (m_flow_table[flow_index]->is_in_ctt && !m_flow_table[flow_index]->is_waiting_for_export) {
199192
m_flow_table[flow_index]->is_waiting_for_export = true;
200-
send_export_request_to_ctt(m_flow_table[flow_index]->m_flow.flow_hash_ctt);
201-
}
193+
m_ctt_controller->remove_record_without_notification(m_flow_table[flow_index]->m_flow.flow_hash_ctt);
194+
}*/
202195
#endif /* WITH_CTT */
203196
push_to_export_queue(flow_index);
204197
m_flow_table[flow_index]->m_flow.remove_extensions();
@@ -293,19 +286,10 @@ void NHTFlowCache::create_record(const Packet& packet, size_t flow_index, size_t
293286
}
294287
m_flow_table[flow_index]->m_flow.flow_hash_ctt = packet.cttmeta.flow_hash;
295288
if (needs_to_be_offloaded(flow_index)) {
296-
/*m_hashes_in_ctt[m_flow_table[flow_index]->m_flow.flow_hash_ctt]++;
297-
if (m_hashes_in_ctt[m_flow_table[flow_index]->m_flow.flow_hash_ctt] >= 2) {
298-
m_ctt_hash_collision++;
299-
std::vector<FlowRecord*> filtered;
300-
301-
std::copy_if(m_flow_table.begin(), m_flow_table.end(), std::back_inserter(filtered),
302-
[&](FlowRecord* flow) { return flow->m_flow.flow_hash_ctt == m_flow_table[flow_index]->m_flow.flow_hash_ctt; });
303-
filtered.size();
304-
}
305-
auto x = m_hashes_in_ctt[m_flow_table[flow_index]->m_flow.flow_hash_ctt];*/
306-
m_ctt_controller->create_record(m_flow_table[flow_index]->m_flow, m_dma_channel);
289+
m_ctt_controller->create_record(m_flow_table[flow_index]->m_flow, m_dma_channel, OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT);
307290
m_ctt_stats.flows_offloaded++;
308291
m_flow_table[flow_index]->is_in_ctt = true;
292+
m_flow_table[flow_index]->offload_mode = OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT;
309293
}
310294
#endif /* WITH_CTT */
311295
}
@@ -317,19 +301,10 @@ void NHTFlowCache::try_to_add_flow_to_ctt(size_t flow_index) noexcept
317301
return;
318302
}
319303
if (needs_to_be_offloaded(flow_index)) {
320-
/*m_hashes_in_ctt[m_flow_table[flow_index]->m_flow.flow_hash_ctt]++;
321-
auto x = m_hashes_in_ctt[m_flow_table[flow_index]->m_flow.flow_hash_ctt];
322-
if (m_hashes_in_ctt[m_flow_table[flow_index]->m_flow.flow_hash_ctt] >= 2) {
323-
m_ctt_hash_collision++;
324-
std::vector<FlowRecord*> filtered;
325-
326-
std::copy_if(m_flow_table.begin(), m_flow_table.end(), std::back_inserter(filtered),
327-
[&](FlowRecord* flow) { return flow->m_flow.flow_hash_ctt == m_flow_table[flow_index]->m_flow.flow_hash_ctt; });
328-
filtered.size();
329-
}*/
330-
m_ctt_controller->create_record(m_flow_table[flow_index]->m_flow, m_dma_channel);
304+
m_ctt_controller->create_record(m_flow_table[flow_index]->m_flow, m_dma_channel, OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT);
331305
m_ctt_stats.flows_offloaded++;
332306
m_flow_table[flow_index]->is_in_ctt = true;
307+
m_flow_table[flow_index]->offload_mode = OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT;
333308
}
334309
}
335310
#endif /* WITH_CTT */
@@ -399,13 +374,6 @@ bool NHTFlowCache::try_to_export(size_t flow_index, bool call_pre_export, const
399374
#ifdef WITH_CTT
400375
void NHTFlowCache::send_export_request_to_ctt(size_t ctt_flow_hash) noexcept
401376
{
402-
/*if (--m_hashes_in_ctt[ctt_flow_hash] < 0)
403-
{
404-
throw "missing hash in send_export_request_to_ctt!";
405-
}
406-
if (m_hashes_in_ctt[ctt_flow_hash] == 0) {
407-
m_hashes_in_ctt.erase(ctt_flow_hash);
408-
}*/
409377
m_ctt_controller->export_record(ctt_flow_hash);
410378
}
411379
#endif /* WITH_CTT */
@@ -416,7 +384,7 @@ bool NHTFlowCache::try_to_export(size_t flow_index, bool call_pre_export, const
416384
if (m_flow_table[flow_index]->is_in_ctt) {
417385
if (!m_flow_table[flow_index]->is_waiting_for_export) {
418386
m_flow_table[flow_index]->is_waiting_for_export = true;
419-
send_export_request_to_ctt(m_flow_table[flow_index]->m_flow.flow_hash_ctt);
387+
m_ctt_controller->export_record(m_flow_table[flow_index]->m_flow.flow_hash_ctt);
420388
m_flow_table[flow_index]->export_time = {now.tv_sec + 1, now.tv_usec};
421389
return false;
422390
}
@@ -483,29 +451,49 @@ void NHTFlowCache::update_ctt_export_stats(CttExportReason ctt_reason, Managemen
483451
}
484452
}
485453

454+
static bool is_counter_overflow(CttExportReason ctt_reason, ManagementUnitExportReason mu_reason) noexcept
455+
{
456+
return ctt_reason == CttExportReason::MANAGEMENT_UNIT && (mu_reason & ManagementUnitExportReason::COUNTER_OVERFLOW);
457+
}
458+
486459
void NHTFlowCache::export_external(const Packet& pkt) noexcept
487460
{
488461
m_ctt_stats.export_packets++;
489-
std::optional<CttExport> export_data = CttExport::parse(pkt.packet, pkt.packet_len);
462+
const std::optional<CttExport> export_data = CttExport::parse(pkt.packet, pkt.packet_len);
490463
if (!export_data.has_value()) {
491464
m_ctt_stats.export_packets_parsing_failed++;
492465
return;
493466
}
494467

495-
IP ip_version = export_data->state.ip_version == 0 ? IP::v4 : IP::v6;
468+
const IP ip_version = export_data->state.ip_version == 0 ? IP::v4 : IP::v6;
496469
std::variant<FlowKeyv4, FlowKeyv6> key = *FlowKeyFactory::create_direct_key(&export_data->state.src_ip, &export_data->state.dst_ip,
497470
export_data->state.src_port, export_data->state.dst_port, export_data->state.ip_proto, ip_version);
471+
//CTT keeps ip addresses in LE
498472
std::visit([](auto& key) {
499473
std::reverse(key.src_ip.data(), key.src_ip.data() + sizeof(key.src_ip));
500474
std::reverse(key.dst_ip.data(), key.dst_ip.data() + sizeof(key.dst_ip));
501475
}, key);
502476
const auto [row, flow_index, hash_value] = find_row(key);
503-
if (!flow_index.has_value()) {
477+
if (!flow_index.has_value()
478+
|| !m_flow_table[flow_index.value()]->is_in_ctt
479+
|| !m_flow_table[flow_index.value()]->offload_mode.has_value())
504480
m_ctt_stats.export_packets_for_missing_flow++;
505481
return;
506482
}
507483

508484
update_ctt_export_stats(export_data->reason, export_data->mu_reason);
485+
486+
if ((is_counter_overflow(export_data->reason, export_data->mu_reason)) {
487+
if (m_flow_table[flow_index.value()]->offload_mode == OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT) {
488+
return;
489+
}
490+
}
491+
if (export_data->reason == CttExportReason::CTT_FULL) {
492+
m_flow_table[flow_index.value()]->is_in_ctt = false;
493+
m_flow_table[flow_index.value()]->is_waiting_for_export = false;
494+
m_flow_table[flow_index.value()]->offload_mode = std::nullopt;
495+
}
496+
509497
export_flow(flow_index.value(), convert_ctt_export_reason_to_ipfxiprobe(export_data->reason, export_data->mu_reason));
510498
m_ctt_stats.flows_removed++;
511499
}
@@ -581,7 +569,7 @@ int NHTFlowCache::put_pkt(Packet& packet)
581569
#ifdef WITH_CTT
582570
if (m_flow_table[flow_index]->is_in_ctt && !m_flow_table[flow_index]->is_waiting_for_export) {
583571
m_flow_table[flow_index]->is_waiting_for_export = true;
584-
send_export_request_to_ctt(m_flow_table[flow_index]->m_flow.flow_hash_ctt);
572+
remove_record_without_notification(m_flow_table[flow_index]->m_flow.flow_hash_ctt);
585573
m_flow_table[flow_index]->export_time = {packet.ts.tv_sec + 1, packet.ts.tv_usec};
586574
}
587575
#endif /* WITH_CTT */

storage/cache.hpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
namespace ipxp {
5252

53+
5354
class NHTFlowCache : TelemetryUtils, public StoragePlugin
5455
{
5556
public:
@@ -86,8 +87,6 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
8687
uint32_t m_inactive{0};
8788
bool m_split_biflow{false};
8889
bool m_enable_fragmentation_cache{true};
89-
//std::variant<FlowKeyv4, FlowKeyv6> m_key;
90-
//std::variant<FlowKeyv4, FlowKeyv6> m_key_reversed;
9190
std::vector<FlowRecord*> m_flow_table;
9291
std::vector<FlowRecord> m_flows;
9392
std::function<size_t(const uint8_t* data, size_t length)> m_hash_function;
@@ -98,13 +97,10 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
9897
FlowCacheStats m_cache_stats = {};
9998
#ifdef WITH_CTT
10099
CttStats m_ctt_stats = {};
101-
void set_ctt_config(const std::shared_ptr<CttController>& ctt_controller, uint8_t dma_channel) override;
102-
//std::string m_ctt_device;
103-
//unsigned m_ctt_comp_index;
104100
uint8_t m_dma_channel;
105101
std::shared_ptr<CttController> m_ctt_controller;
106-
//std::unordered_map<size_t, int> m_hashes_in_ctt;
107-
//size_t m_ctt_hash_collision{0};
102+
103+
void set_ctt_config(const std::shared_ptr<CttController>& ctt_controller, uint8_t dma_channel) override;
108104
void update_ctt_export_stats(CttExportReason ctt_reason, ManagementUnitExportReason mu_reason) noexcept;
109105
#endif /* WITH_CTT */
110106

@@ -122,7 +118,7 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
122118
void push_to_export_queue(size_t flow_index) noexcept;
123119
std::pair<CacheRowSpan, std::variant<std::pair<size_t, bool>, size_t>>
124120
find_flow_index(const std::variant<FlowKeyv4, FlowKeyv6>& key,
125-
const std::variant<FlowKeyv4, FlowKeyv6>& key_reversed, const std::optional<uint16_t>& vlan_id = std::nullopt) noexcept;
121+
const std::variant<FlowKeyv4, FlowKeyv6>& key_reversed, const std::optional<uint16_t>& vlan_id = std::nullopt) noexcept;
126122
std::tuple<CacheRowSpan, std::optional<size_t>, size_t>
127123
find_row(const std::variant<FlowKeyv4, FlowKeyv6>& key, const std::optional<uint16_t>& vlan_id = std::nullopt) noexcept;
128124
bool try_to_export_on_inactive_timeout(size_t flow_index, const timeval& now) noexcept;
@@ -140,6 +136,5 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
140136
void try_to_add_flow_to_ctt(size_t flow_index) noexcept;
141137
bool needs_to_be_offloaded(size_t flow_index) const noexcept;
142138
};
143-
144139
}
145140
#endif /* IPXP_STORAGE_CACHE_HPP */

storage/cttController.cpp

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,12 @@ CttController::CttController(const std::string& nfb_dev, unsigned ctt_comp_index
5050
}
5151
}
5252

53-
void CttController::create_record(const Flow& flow, uint8_t dma_channel)
53+
void CttController::create_record(const Flow& flow, uint8_t dma_channel, OffloadMode offload_mode)
5454
{
5555
try {
5656
std::vector<std::byte> key = assemble_key(flow.flow_hash_ctt);
5757
std::vector<std::byte> state = assemble_state(
58-
//OffloadMode::PACKET_OFFLOAD,
59-
OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT,
58+
offload_mode,
6059
MetadataType::FULL_METADATA,
6160
flow, dma_channel);
6261
m_commander->write_record(std::move(key), std::move(state));
@@ -66,6 +65,28 @@ void CttController::create_record(const Flow& flow, uint8_t dma_channel)
6665
}
6766
}
6867

68+
void CttController::get_state(uint64_t flow_hash_ctt)
69+
{
70+
try {
71+
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
72+
m_commander->export_record(std::move(key));
73+
}
74+
catch (const std::exception& e) {
75+
throw;
76+
}
77+
}
78+
79+
void CttController::remove_record_without_notification(uint64_t flow_hash_ctt)
80+
{
81+
try {
82+
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
83+
m_commander->delete_record(std::move(key));
84+
}
85+
catch (const std::exception& e) {
86+
throw;
87+
}
88+
}
89+
6990
void CttController::export_record(uint64_t flow_hash_ctt)
7091
{
7192
try {
@@ -77,8 +98,18 @@ void CttController::export_record(uint64_t flow_hash_ctt)
7798
}
7899
}
79100

101+
std::pair<std::vector<std::byte>, std::vector<std::byte>>
102+
CttController::get_key_and_state(uint64_t flow_hash_ctt, const Flow& flow, uint8_t dma_channel)
103+
{
104+
return {assemble_key(flow_hash_ctt), assemble_state(
105+
OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT,
106+
MetadataType::FULL_METADATA,
107+
flow, dma_channel)};
108+
}
109+
80110
std::vector<std::byte> CttController::assemble_key(uint64_t flow_hash_ctt)
81111
{
112+
return std::vector<std::byte>(&flow_hash_ctt, &flow_hash_ctt + m_key_size_bytes);
82113
std::vector<std::byte> key(m_key_size_bytes, std::byte(0));
83114
for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < m_key_size_bytes; ++i) {
84115
key[i] = static_cast<std::byte>((flow_hash_ctt >> (8 * i)) & 0xFF);

storage/cttController.hpp

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -41,54 +41,57 @@ namespace ipxp {
4141

4242
class CttController {
4343
public:
44-
/**
45-
* @brief init the CTT.
46-
*
47-
* @param nfb_dev The NFB device file (e.g., "/dev/nfb0").
48-
* @param ctt_comp_index The index of the CTT component.
49-
*/
50-
CttController(const std::string& nfb_dev, unsigned ctt_comp_index);
44+
/**
45+
* @brief init the CTT.
46+
*
47+
* @param nfb_dev The NFB device file (e.g., "/dev/nfb0").
48+
* @param ctt_comp_index The index of the CTT component.
49+
*/
50+
CttController(const std::string& nfb_dev, unsigned ctt_comp_index);
5151

52-
/**
53-
* @brief Command: mark a flow for offload.
54-
*
55-
* @param flow_hash_ctt The flow hash to be offloaded.
56-
*/
57-
void create_record(const Flow& flow, uint8_t dma_channel);
52+
/**
53+
* @brief Command: mark a flow for offload.
54+
*
55+
* @param flow_hash_ctt The flow hash to be offloaded.
56+
*/
57+
void create_record(const Flow& flow, uint8_t dma_channel, OffloadMode offload_mode = OffloadMode::TRIMMED_PACKET_WITH_METADATA_AND_EXPORT);
5858

59-
/**
60-
* @brief Command: export a flow from the CTT.
61-
*
62-
* @param flow_hash_ctt The flow hash to be exported.
63-
*/
64-
void export_record(uint64_t flow_hash_ctt);
59+
/**
60+
* @brief Command: export a flow from the CTT.
61+
*
62+
* @param flow_hash_ctt The flow hash to be exported.
63+
*/
64+
void export_record(uint64_t flow_hash_ctt);
6565

66-
~CttController() noexcept;
66+
~CttController() noexcept;
6767

6868
private:
69-
std::unique_ptr<ctt::AsyncCommander> m_commander;
70-
size_t m_key_size_bytes;
71-
size_t m_state_size_bytes;
72-
size_t m_state_mask_size_bytes;
69+
std::unique_ptr<ctt::AsyncCommander> m_commander;
70+
size_t m_key_size_bytes;
71+
size_t m_state_size_bytes;
72+
size_t m_state_mask_size_bytes;
7373

74-
/**
75-
* @brief Assembles the state vector from the given values.
76-
*
77-
* @param offload_mode The offload mode.
78-
* @param meta_type The metadata type.
79-
* @param timestamp_first The first timestamp of the flow.
80-
* @return A byte vector representing the assembled state vector.
81-
*/
82-
std::vector<std::byte>
83-
assemble_state(OffloadMode offload_mode, MetadataType meta_type, const Flow& flow, uint8_t dma_channel);
74+
/**
75+
* @brief Assembles the state vector from the given values.
76+
*
77+
* @param offload_mode The offload mode.
78+
* @param meta_type The metadata type.
79+
* @param timestamp_first The first timestamp of the flow.
80+
* @return A byte vector representing the assembled state vector.
81+
*/
82+
std::vector<std::byte>
83+
assemble_state(OffloadMode offload_mode, MetadataType meta_type, const Flow& flow, uint8_t dma_channel);
8484

85-
/**
86-
* @brief Assembles the key vector from the given flow hash.
87-
*
88-
* @param flow_hash_ctt The flow hash.
89-
* @return A byte vector representing the assembled key vector.
90-
*/
91-
std::vector<std::byte> assemble_key(uint64_t flow_hash_ctt);
85+
/**
86+
* @brief Assembles the key vector from the given flow hash.
87+
*
88+
* @param flow_hash_ctt The flow hash.
89+
* @return A byte vector representing the assembled key vector.
90+
*/
91+
std::vector<std::byte> assemble_key(uint64_t flow_hash_ctt);
92+
93+
std::pair<std::vector<std::byte>, std::vector<std::byte>>
94+
get_key_and_state(uint64_t flow_hash_ctt, const Flow& flow, uint8_t dma_channel)
9295
};
9396

9497
} // ipxp

0 commit comments

Comments
 (0)