Skip to content

Commit 777e6d9

Browse files
jaroslavpesekSiskaPavel
authored andcommitted
ctt - solving hazard during inconsistent state
1 parent 775bf8f commit 777e6d9

File tree

5 files changed

+79
-29
lines changed

5 files changed

+79
-29
lines changed

include/ipfixprobe/flowifc.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <stdint.h>
3939
#include <stdlib.h>
4040
#include <sys/time.h>
41+
#include <chrono>
4142

4243
#ifdef WITH_NEMEA
4344
#include <unirec/unirec.h>
@@ -265,9 +266,10 @@ struct Flow : public Record {
265266
uint64_t flow_hash;
266267

267268
#ifdef WITH_CTT
268-
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
269-
bool ctt_valid; /**< CTT validity flag. */
270-
int ctt_state; /**< CTT - offload or not. */
269+
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
270+
bool record_in_ctt; /**< CTT - offload or not. */
271+
bool is_delayed; /**< Delayed export flag. */
272+
time_t delay_time; /**< Time until export of the flow is delayed. */
271273
#endif
272274

273275
PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check
@@ -297,4 +299,5 @@ struct Flow : public Record {
297299
};
298300

299301
}
302+
300303
#endif /* IPXP_FLOWIFC_HPP */

include/ipfixprobe/packet.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ namespace ipxp {
4646
* \brief Structure for storing parsed packet fields
4747
*/
4848
struct Packet : public Record {
49-
Metadata_CTT cttmeta; /**< Metadata from CTT */
50-
bool cttmeta_valid; /**< True if CTT metadata is valid */
49+
#ifdef WITH_CTT
50+
Metadata_CTT cttmeta; /**< Metadata from CTT */
51+
bool cttmeta_valid; /**< True if CTT metadata is valid */
52+
#endif /* WITH_CTT */
5153
struct timeval ts;
5254

5355
uint8_t dst_mac[6];

include/ipfixprobe/storage.hpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,7 @@ class StoragePlugin : public Plugin
191191
{
192192
// if metadata are valid, add flow hash ctt to the flow record
193193
if (pkt.cttmeta_valid) {
194-
rec.ctt_valid = true;
195194
rec.flow_hash_ctt = pkt.cttmeta.flow_hash;
196-
} else {
197-
rec.ctt_valid = false;
198195
}
199196
PluginStatusConverter plugin_status_converter(m_plugins_status);
200197
int ret = 0;

storage/cache.cpp

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash)
139139
m_flow.src_port = pkt.src_port;
140140
m_flow.dst_port = pkt.dst_port;
141141
}
142+
#ifdef WITH_CTT
143+
m_flow.is_delayed = false;
144+
m_delayed_flow_waiting = false;
145+
#endif /* WITH_CTT */
142146
}
143147

144148
void FlowRecord::update(const Packet &pkt, bool src)
145149
{
150+
if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow
151+
auto flow_hash = m_hash;
152+
m_delayed_flow = m_flow;
153+
m_delayed_flow_waiting = true;
154+
erase(); // erase the old flow, keeping the delayed flow
155+
create(pkt, flow_hash);
156+
return;
157+
}
146158
m_flow.time_last = pkt.ts;
147159
if (src) {
148160
m_flow.src_packets++;
@@ -260,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue)
260272

261273
void NHTFlowCache::export_flow(size_t index)
262274
{
275+
if (m_flow_table[index]->m_flow.is_delayed) {
276+
return;
277+
}
278+
if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) {
279+
m_total_exported++;
280+
update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason);
281+
update_flow_record_stats(
282+
m_flow_table[index]->m_delayed_flow.src_packets
283+
+ m_flow_table[index]->m_delayed_flow.dst_packets);
284+
ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow);
285+
}
263286
m_total_exported++;
264287
update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason);
265288
update_flow_record_stats(
@@ -506,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts)
506529
m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow);
507530
plugins_pre_export(m_flow_table[i]->m_flow);
508531
export_flow(i);
532+
if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) {
533+
m_flow_table[i]->m_flow.is_delayed = false;
534+
plugins_pre_export(m_flow_table[i]->m_flow);
535+
export_flow(i);
536+
}
537+
if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) {
538+
m_flow_table[i]->m_delayed_flow_waiting = false;
539+
plugins_pre_export(m_flow_table[i]->m_delayed_flow);
540+
export_flow(i);
541+
}
509542
#ifdef FLOW_CACHE_STATS
510543
m_expired++;
511544
#endif /* FLOW_CACHE_STATS */
@@ -668,18 +701,12 @@ void NHTFlowCache::prefetch_export_expired() const
668701
void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts)
669702
{
670703
try {
671-
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
672-
std::vector<std::byte> state = assemble_state(
704+
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
705+
std::vector<std::byte> state = assemble_state(
673706
OffloadMode::PACKET_OFFLOAD,
674707
MetaType::FULL,
675708
ts);
676-
677-
std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: ";
678-
for (auto& byte : state) {
679-
std::cout << std::hex << static_cast<int>(byte) << " ";
680-
}
681-
std::cout << std::endl;
682-
m_commander->write_record(std::move(key), std::move(state));
709+
m_commander->write_record(std::move(key), std::move(state));
683710
}
684711
catch (const std::exception& e) {
685712
throw;
@@ -691,7 +718,6 @@ void CttController::export_record(uint64_t flow_hash_ctt)
691718
try {
692719
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
693720
m_commander->export_and_delete_record(std::move(key));
694-
std::cout << "Exported record with key: " << flow_hash_ctt << std::endl;
695721
}
696722
catch (const std::exception& e) {
697723
throw;

storage/cache.hpp

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
#ifndef IPXP_STORAGE_CACHE_HPP
3333
#define IPXP_STORAGE_CACHE_HPP
3434

35+
#include <bits/types/struct_timeval.h>
36+
#include <chrono>
37+
#include <ctime>
3538
#include <string>
3639

3740
#include <ipfixprobe/storage.hpp>
@@ -49,6 +52,8 @@
4952
#include <ctt_exceptions.hpp>
5053
#include <ctt_modes.hpp>
5154
#include <ctt.hpp>
55+
#include <queue>
56+
#include <tuple>
5257
#endif /* WITH_CTT */
5358

5459
namespace ipxp {
@@ -269,6 +274,10 @@ class alignas(64) FlowRecord
269274

270275
public:
271276
Flow m_flow;
277+
#ifdef WITH_CTT
278+
Flow m_delayed_flow;
279+
bool m_delayed_flow_waiting;
280+
#endif /* WITH_CTT */
272281

273282
FlowRecord();
274283
~FlowRecord();
@@ -322,29 +331,42 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
322331

323332
int plugins_post_create(Flow &rec, Packet &pkt) {
324333
int ret = StoragePlugin::plugins_post_create(rec, pkt);
325-
rec.ctt_state = static_cast<int>(CttController::OffloadMode::NO_OFFLOAD);
326-
if (no_data_required(rec)) {
327-
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
328-
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
329-
}
334+
rec.record_in_ctt = false;
335+
//if (only_metadata_required(rec)) {
336+
if (only_metadata_required(rec)) {
337+
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
338+
rec.record_in_ctt = true;
339+
}
330340
return ret;
331341
}
332342

333343
// override post_update method
334344
int plugins_post_update(Flow &rec, Packet &pkt) {
335345
int ret = StoragePlugin::plugins_post_update(rec, pkt);
336-
if (no_data_required(rec) && (rec.ctt_state == static_cast<int>(CttController::OffloadMode::NO_OFFLOAD))) {
337-
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
338-
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
339-
}
346+
//if (only_metadata_required(rec) && !rec.ctt_state) {
347+
if (!rec.record_in_ctt) { // only for debug!!!!! line above is correct for production
348+
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
349+
rec.record_in_ctt = true;
350+
}
340351
return ret;
341352
}
342353

343354
// override pre_export method
344355
void plugins_pre_export(Flow &rec) {
345-
StoragePlugin::plugins_pre_export(rec);
346-
m_ctt_controller.export_record(rec.flow_hash_ctt);
356+
if (rec.record_in_ctt) {
357+
rec.is_delayed = true;
358+
rec.delay_time = time(nullptr) + 1;
359+
m_ctt_controller.export_record(rec.flow_hash_ctt);
360+
rec.record_in_ctt = false;
361+
return;
362+
}
363+
if (rec.is_delayed) {
364+
return;
365+
} else {
366+
StoragePlugin::plugins_pre_export(rec);
367+
}
347368
}
369+
348370
#endif /* WITH_CTT */
349371

350372
private:

0 commit comments

Comments
 (0)