Skip to content

Commit 33cfbd3

Browse files
authored
Merge pull request #89 from CESNET/merge_input_storage_workers
WIP: Merge input and storage workers thread.
2 parents 3aad9a4 + 54f75bd commit 33cfbd3

File tree

4 files changed

+44
-120
lines changed

4 files changed

+44
-120
lines changed

ipfixprobe.cpp

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ namespace ipxp {
7070
volatile sig_atomic_t stop = 0;
7171

7272
volatile sig_atomic_t terminate_export = 0;
73-
volatile sig_atomic_t terminate_storage = 0;
7473
volatile sig_atomic_t terminate_input = 0;
7574

7675
const uint32_t DEFAULT_IQUEUE_SIZE = 64;
@@ -350,34 +349,24 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
350349
storage_process_plugins.push_back(tmp);
351350
}
352351

353-
ipx_ring_t *input_queue = ipx_ring_init(conf.iqueue_size, 0);
354-
if (input_queue == nullptr) {
355-
throw IPXPError("unable to initialize ring buffer");
356-
}
357-
358352
std::promise<WorkerResult> *input_res = new std::promise<WorkerResult>();
359-
std::promise<WorkerResult> *storage_res = new std::promise<WorkerResult>();
360353
conf.input_fut.push_back(input_res->get_future());
361-
conf.storage_fut.push_back(storage_res->get_future());
362354

363355
auto input_stats = new std::atomic<InputStats>();
364356
conf.input_stats.push_back(input_stats);
365357

366358
WorkPipeline tmp = {
367359
{
368360
input_plugin,
369-
new std::thread(input_worker, input_plugin, &conf.blocks[pipeline_idx * (conf.iqueue_size + 1)],
370-
conf.iqueue_size + 1, conf.max_pkts, input_queue, input_res, input_stats),
361+
new std::thread(input_storage_worker, input_plugin, storage_plugin, &conf.blocks[pipeline_idx * (conf.iqueue_size + 1)],
362+
conf.iqueue_size + 1, conf.max_pkts, input_res, input_stats),
371363
input_res,
372364
input_stats
373365
},
374366
{
375367
storage_plugin,
376-
new std::thread(storage_worker, storage_plugin, input_queue, storage_res),
377-
storage_res,
378368
storage_process_plugins
379-
},
380-
input_queue
369+
}
381370
};
382371
conf.pipelines.push_back(tmp);
383372
pipeline_idx++;
@@ -398,9 +387,7 @@ void finish(ipxp_conf_t &conf)
398387
}
399388

400389
// Terminate all storages
401-
terminate_storage = 1;
402390
for (auto &it : conf.pipelines) {
403-
it.storage.thread->join();
404391
for (auto &itp : it.storage.plugins) {
405392
itp->close();
406393
}
@@ -444,29 +431,6 @@ void finish(ipxp_conf_t &conf)
444431
std::setw(6) << status << std::endl;
445432
}
446433

447-
std::ostringstream oss;
448-
oss << "Storage stats:" << std::endl <<
449-
std::setw(3) << "#" <<
450-
std::setw(7) << "status" << std::endl;
451-
452-
idx = 0;
453-
bool storage_ok = true;
454-
for (auto &it : conf.storage_fut) {
455-
WorkerResult res = it.get();
456-
std::string status = "ok";
457-
if (res.error) {
458-
ok = false;
459-
storage_ok = false;
460-
status = res.msg;
461-
}
462-
oss <<
463-
std::setw(3) << idx++ << " " <<
464-
std::setw(6) << status << std::endl;
465-
}
466-
if (!storage_ok) {
467-
std::cout << oss.str();
468-
}
469-
470434
std::cout << "Output stats:" << std::endl <<
471435
std::setw(3) << "#" <<
472436
std::setw(10) << "biflows" <<
@@ -583,13 +547,6 @@ void main_loop(ipxp_conf_t &conf)
583547
break;
584548
}
585549
}
586-
for (auto &it : conf.storage_fut) {
587-
std::future_status status = it.wait_for(std::chrono::seconds(0));
588-
if (status == std::future_status::ready) {
589-
stop = 1;
590-
break;
591-
}
592-
}
593550
for (auto &it : conf.output_fut) {
594551
std::future_status status = it.wait_for(std::chrono::seconds(0));
595552
if (status == std::future_status::ready) {

ipfixprobe.hpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ extern const uint32_t DEFAULT_FPS;
7272

7373
// global termination variable
7474
extern volatile sig_atomic_t terminate_export;
75-
extern volatile sig_atomic_t terminate_storage;
7675
extern volatile sig_atomic_t terminate_input;
7776

7877
class IpfixprobeOptParser;
@@ -212,7 +211,6 @@ struct ipxp_conf_t {
212211
std::vector<std::atomic<OutputStats> *> output_stats;
213212

214213
std::vector<std::shared_future<WorkerResult>> input_fut;
215-
std::vector<std::future<WorkerResult>> storage_fut;
216214
std::vector<std::future<WorkerResult>> output_fut;
217215

218216
size_t pkt_bufsize;
@@ -243,15 +241,8 @@ struct ipxp_conf_t {
243241
delete it.input.promise;
244242
}
245243

246-
terminate_storage = 1;
247244
for (auto &it : pipelines) {
248-
if (it.storage.thread->joinable()) {
249-
it.storage.thread->join();
250-
}
251245
delete it.storage.plugin;
252-
delete it.storage.thread;
253-
delete it.storage.promise;
254-
ipx_ring_destroy(it.queue);
255246
}
256247

257248
for (auto &it : pipelines) {

workers.cpp

Lines changed: 40 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,26 @@ namespace ipxp {
5151

5252
#define MICRO_SEC 1000000L
5353

54-
void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit, ipx_ring_t *queue,
54+
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit,
5555
std::promise<WorkerResult> *out, std::atomic<InputStats> *out_stats)
5656
{
57-
struct timespec start;
58-
struct timespec end;
57+
struct timespec start_cache;
58+
struct timespec end_cache;
59+
struct timespec begin = {0, 0};
60+
struct timespec end = {0, 0};
61+
struct timeval ts = {0, 0};
62+
bool timeout = false;
5963
size_t i = 0;
6064
InputPlugin::Result ret;
6165
InputStats stats = {0, 0, 0, 0, 0};
6266
WorkerResult res = {false, ""};
67+
68+
#ifdef __linux__
69+
const clockid_t clk_id = CLOCK_MONOTONIC_COARSE;
70+
#else
71+
const clockid_t clk_id = CLOCK_MONOTONIC;
72+
#endif
73+
6374
while (!terminate_input) {
6475
PacketBlock *block = &pkts[i];
6576
block->cnt = 0;
@@ -79,24 +90,40 @@ void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint
7990
break;
8091
}
8192
if (ret == InputPlugin::Result::TIMEOUT) {
93+
clock_gettime(clk_id, &end);
94+
if (!timeout) {
95+
timeout = true;
96+
begin = end;
97+
}
98+
struct timespec diff = {end.tv_sec - begin.tv_sec, end.tv_nsec - begin.tv_nsec};
99+
if (diff.tv_nsec < 0) {
100+
diff.tv_nsec += 1000000000;
101+
diff.tv_sec--;
102+
}
103+
cache->export_expired(ts.tv_sec + diff.tv_sec);
82104
usleep(1);
83105
continue;
84106
} else if (ret == InputPlugin::Result::PARSED) {
85107
stats.packets = plugin->m_seen;
86108
stats.parsed = plugin->m_parsed;
87109
stats.dropped = plugin->m_dropped;
88110
stats.bytes += block->bytes;
89-
#ifdef __linux__
90-
const clockid_t clk_id = CLOCK_MONOTONIC_COARSE;
91-
#else
92-
const clockid_t clk_id = CLOCK_MONOTONIC;
93-
#endif
94-
clock_gettime(clk_id, &start);
95-
ipx_ring_push(queue, static_cast<void *>(block));
96-
clock_gettime(clk_id, &end);
111+
clock_gettime(clk_id, &start_cache);
112+
try {
113+
for (unsigned i = 0; i < block->cnt; i++) {
114+
cache->put_pkt(block->pkts[i]);
115+
}
116+
ts = block->pkts[block->cnt - 1].ts;
117+
} catch (PluginError &e) {
118+
res.error = true;
119+
res.msg = e.what();
120+
break;
121+
}
122+
timeout = false;
123+
clock_gettime(clk_id, &end_cache);
97124

98-
int64_t time = end.tv_nsec - start.tv_nsec;
99-
if (start.tv_sec != end.tv_sec) {
125+
int64_t time = end_cache.tv_nsec - start_cache.tv_nsec;
126+
if (start_cache.tv_sec != end_cache.tv_sec) {
100127
time += 1000000000;
101128
}
102129
stats.qtime += time;
@@ -116,53 +143,6 @@ void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint
116143
stats.parsed = plugin->m_parsed;
117144
stats.dropped = plugin->m_dropped;
118145
out_stats->store(stats);
119-
out->set_value(res);
120-
}
121-
122-
void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promise<WorkerResult> *out)
123-
{
124-
WorkerResult res = {false, ""};
125-
bool timeout = false;
126-
struct timeval ts = {0, 0};
127-
struct timespec begin = {0, 0};
128-
struct timespec end = {0, 0};
129-
#ifdef __linux__
130-
const clockid_t clk_id = CLOCK_MONOTONIC_COARSE;
131-
#else
132-
const clockid_t clk_id = CLOCK_MONOTONIC;
133-
#endif
134-
while (1) {
135-
PacketBlock *block = static_cast<PacketBlock *>(ipx_ring_pop(queue));
136-
if (block) {
137-
try {
138-
for (unsigned i = 0; i < block->cnt; i++) {
139-
cache->put_pkt(block->pkts[i]);
140-
}
141-
ts = block->pkts[block->cnt - 1].ts;
142-
} catch (PluginError &e) {
143-
res.error = true;
144-
res.msg = e.what();
145-
break;
146-
}
147-
timeout = false;
148-
} else if (terminate_storage && !ipx_ring_cnt(queue)) {
149-
break;
150-
} else {
151-
clock_gettime(clk_id, &end);
152-
if (!timeout) {
153-
timeout = true;
154-
begin = end;
155-
}
156-
struct timespec diff = {end.tv_sec - begin.tv_sec, end.tv_nsec - begin.tv_nsec};
157-
if (diff.tv_nsec < 0) {
158-
diff.tv_nsec += 1000000000;
159-
diff.tv_sec--;
160-
}
161-
cache->export_expired(ts.tv_sec + diff.tv_sec);
162-
usleep(1);
163-
}
164-
}
165-
166146
cache->finish();
167147
auto outq = cache->get_queue();
168148
while (ipx_ring_cnt(outq)) {

workers.hpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,8 @@ struct WorkPipeline {
7474
} input;
7575
struct {
7676
StoragePlugin *plugin;
77-
std::thread *thread;
78-
std::promise<WorkerResult> *promise;
7977
std::vector<ProcessPlugin *> plugins;
8078
} storage;
81-
ipx_ring_t *queue;
8279
};
8380

8481
struct OutputWorker {
@@ -89,9 +86,8 @@ struct OutputWorker {
8986
ipx_ring_t *queue;
9087
};
9188

92-
void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit, ipx_ring_t *queue,
89+
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit,
9390
std::promise<WorkerResult> *out, std::atomic<InputStats> *out_stats);
94-
void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promise<WorkerResult> *out);
9591
void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise<WorkerResult> *out, std::atomic<OutputStats> *out_stats,
9692
uint32_t fps);
9793

0 commit comments

Comments
 (0)