Skip to content

Commit 9bdfb9f

Browse files
Adrian Duriskasedmicha
authored andcommitted
Core: Implementation of periodic message generation
1 parent 2f330ac commit 9bdfb9f

File tree

10 files changed

+104
-13
lines changed

10 files changed

+104
-13
lines changed

include/ipfixcol2/plugins.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ ipx_ctx_msg_pass(ipx_ctx_t *ctx, ipx_msg_t *msg);
381381
* types of messages:
382382
* - ::IPX_MSG_IPFIX (IPFIX Message)
383383
* - ::IPX_MSG_SESSION (Transport Session Message)
384+
* - ::IPX_MSG_PERIODIC (Periodic Message)
384385
*
385386
* If \p mask_new is non-NULL, the new subscription mask is installed from \p mask_new.
386387
* If \p mask_old is non-NULL, the previous mask is saved in \p mask_old.

src/core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ set(CORE_SOURCE
4242
message_ipfix.c
4343
message_ipfix.h
4444
message_periodic.c
45+
message_periodic.h
4546
message_session.c
4647
message_terminate.c
4748
message_terminate.h

src/core/configurator/configurator.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@
4646
#include <cstdlib>
4747
#include <dlfcn.h>
4848
#include <signal.h>
49+
#include <sys/time.h>
4950

5051
#include "configurator.hpp"
5152
#include "extensions.hpp"
5253

5354
extern "C" {
5455
#include "../message_terminate.h"
56+
#include "../message_periodic.h"
5557
#include "../plugin_parser.h"
5658
#include "../plugin_output_mgr.h"
5759
#include "../verbose.h"
@@ -88,7 +90,7 @@ termination_handler(int sig)
8890
cnt++;
8991

9092
// Send a termination request to the configurator
91-
int rc = ipx_cpipe_send_term(NULL, request_type);
93+
int rc = ipx_cpipe_send(NULL, request_type);
9294
if (rc != IPX_OK) {
9395
static const char *msg = "ERROR: Signal handler: failed to send a termination request";
9496
write(STDOUT_FILENO, msg, strlen(msg));
@@ -568,6 +570,22 @@ ipx_configurator::termination_send_msg()
568570
m_term_sent = m_running_inputs.size();
569571
}
570572

573+
void
574+
ipx_configurator::periodic_send_msg(uint32_t *periodic_message_sequence)
575+
{
576+
for (auto &input : m_running_inputs) {
577+
ipx_msg_periodic_t *msg = ipx_msg_periodic_create(*periodic_message_sequence);
578+
if (!msg) {
579+
IPX_ERROR(comp_str, "Can't create periodic message!", '\0');
580+
termination_send_msg();
581+
return;
582+
}
583+
584+
ipx_fpipe_write(input->get_feedback(), ipx_msg_periodic2base(msg));
585+
}
586+
(*periodic_message_sequence)++;
587+
}
588+
571589
int
572590
ipx_configurator::run(ipx_controller *ctrl)
573591
{
@@ -595,6 +613,9 @@ ipx_configurator::run(ipx_controller *ctrl)
595613
// Collector is running -> process termination/reconfiguration requests
596614
m_state = STATUS::RUNNING;
597615
bool terminate = false;
616+
617+
uint32_t periodic_message_sequence = 0;
618+
598619
while (!terminate) {
599620
struct ipx_cpipe_req req;
600621
if (ipx_cpipe_receive(&req) != IPX_OK) {
@@ -609,6 +630,9 @@ ipx_configurator::run(ipx_controller *ctrl)
609630
case IPX_CPIPE_TYPE_TERM_DONE:
610631
terminate = termination_handle(req, ctrl);
611632
break;
633+
case IPX_CPIPE_TYPE_PERIODIC:
634+
periodic_send_msg(&periodic_message_sequence);
635+
break;
612636
default:
613637
IPX_ERROR(comp_str, "Ignoring unknown configuration request!", '\0');
614638
continue;

src/core/configurator/configurator.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ class ipx_configurator {
150150
void
151151
termination_send_msg();
152152

153+
void
154+
periodic_send_msg(uint32_t *periodic_message_sequence);
155+
153156
void
154157
termination_stop_all();
155158
void

src/core/configurator/cpipe.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,22 @@
1414
#include <fcntl.h>
1515
#include <errno.h>
1616
#include <limits.h> // PIPE_BUF
17+
#include <sys/timerfd.h>
18+
#include <poll.h>
1719

1820
#include <ipfixcol2.h>
1921

2022
#include "cpipe.h"
2123
#include "../verbose.h"
2224

25+
/// Periodic message delay in milliseconds
26+
#define IPX_PERIODIC_MESSAGE_DELAY 100
2327
/// Invalid file descriptor value
2428
#define INVALID_FD (-1)
2529
/// Configuration pipe - cpipe_fd[0] for read, cpipe_fd[1] for write
2630
static int cpipe_fd[2] = {INVALID_FD, INVALID_FD};
31+
/// Periodic timer file descriptor
32+
static int periodic_timer_fd = INVALID_FD;
2733
/// Identification of the module (just for log!)
2834
static const char *module = "Configuration pipe";
2935

@@ -61,6 +67,21 @@ ipx_cpipe_init()
6167
return IPX_ERR_DENIED;
6268
}
6369

70+
periodic_timer_fd = timerfd_create(CLOCK_MONOTONIC, 0);
71+
if (periodic_timer_fd == -1) {
72+
ipx_strerror(errno, err_str);
73+
IPX_ERROR(module, "timerfd_create() failed: %s", err_str);
74+
ipx_cpipe_destroy();
75+
return IPX_ERR_DENIED;
76+
}
77+
struct itimerspec timer_settings;
78+
struct timespec interval;
79+
interval.tv_nsec = (IPX_PERIODIC_MESSAGE_DELAY % 1000) * 1000000;
80+
interval.tv_sec = IPX_PERIODIC_MESSAGE_DELAY / 1000;
81+
timer_settings.it_interval = interval;
82+
timer_settings.it_value = interval;
83+
timerfd_settime(periodic_timer_fd, 0, &timer_settings, NULL);
84+
6485
return IPX_OK;
6586
}
6687

@@ -75,16 +96,31 @@ ipx_cpipe_destroy()
7596
close(cpipe_fd[i]);
7697
cpipe_fd[i] = INVALID_FD;
7798
}
99+
if (periodic_timer_fd != INVALID_FD) {
100+
close(periodic_timer_fd);
101+
periodic_timer_fd = INVALID_FD;
102+
}
78103
}
79104

80105
int
81106
ipx_cpipe_receive(struct ipx_cpipe_req *msg)
82107
{
83108
const size_t buffer_size = sizeof(*msg);
84109
size_t buffer_read = 0;
110+
uint64_t periodic_timer_buf;
111+
112+
struct pollfd fds;
113+
fds.fd = periodic_timer_fd;
114+
fds.events = POLLIN;
85115

86116
errno = 0;
87117
while (buffer_read < buffer_size) {
118+
if (poll(&fds, 1, 0) != -1) {
119+
// Reset timer expiration counter
120+
if (read(periodic_timer_fd, &periodic_timer_buf, sizeof(periodic_timer_buf)) != -1) {
121+
ipx_cpipe_send(NULL, IPX_CPIPE_TYPE_PERIODIC);
122+
}
123+
}
88124
uint8_t *ptr = ((uint8_t *) msg) + buffer_read;
89125
ssize_t rc = read(cpipe_fd[0], ptr, buffer_size - buffer_read);
90126
if (rc > 0) {
@@ -113,16 +149,15 @@ ipx_cpipe_receive(struct ipx_cpipe_req *msg)
113149
}
114150

115151
int
116-
ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
152+
ipx_cpipe_send(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
117153
{
118154
// WARNING: Keep on mind that this function can be called from signal handler!
119155

120156
// In case we change 'errno' (e.g. write())
121157
int errno_backup = errno;
122158

123-
if (type != IPX_CPIPE_TYPE_TERM_SLOW
124-
&& type != IPX_CPIPE_TYPE_TERM_FAST
125-
&& type != IPX_CPIPE_TYPE_TERM_DONE) {
159+
if (type != IPX_CPIPE_TYPE_TERM_SLOW && type != IPX_CPIPE_TYPE_TERM_FAST
160+
&& type != IPX_CPIPE_TYPE_TERM_DONE && type != IPX_CPIPE_TYPE_PERIODIC) {
126161
return IPX_ERR_ARG;
127162
}
128163

@@ -140,4 +175,3 @@ ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
140175
errno = errno_backup;
141176
return (rc == -1) ? IPX_ERR_DENIED : IPX_OK;
142177
}
143-

src/core/configurator/cpipe.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ enum ipx_cpipe_type {
8383
*
8484
* Sending this request before termination of all plugin instances is considered as fatal.
8585
*/
86-
IPX_CPIPE_TYPE_TERM_DONE ///< Terminate request - complete
86+
IPX_CPIPE_TYPE_TERM_DONE, ///< Terminate request - complete
8787

8888
// Proposed types for the future runtime reconfiguration
8989
// IPX_CPIPE_TYPE_RECONF_START,
9090
// IPX_CPIPE_TYPE_RECONF_DONE
91+
92+
IPX_CPIPE_TYPE_PERIODIC
9193
};
9294

9395
/// Configuration request
@@ -152,7 +154,7 @@ ipx_cpipe_receive(struct ipx_cpipe_req *msg);
152154
* @return #IPX_ERR_DENIED if the request failed to be sent
153155
*/
154156
int
155-
ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type);
157+
ipx_cpipe_send(ipx_ctx_t *ctx, enum ipx_cpipe_type type);
156158

157159
#ifdef __cplusplus
158160
}

src/core/configurator/instance_outmgr.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ static const struct ipx_ctx_callbacks output_mgr_callbacks = {
5858
nullptr // No feedback
5959
};
6060

61-
6261
ipx_instance_outmgr::ipx_instance_outmgr(uint32_t bsize)
6362
: ipx_instance_intermediate("Output manager", &output_mgr_callbacks, bsize)
6463
{

src/core/context.c

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include <pthread.h>
4646
#include <errno.h>
4747
#include <signal.h>
48+
#include <sys/time.h>
4849
#if defined(__OpenBSD__) || defined(__FreeBSD__)
4950
#include <pthread_np.h>
5051
#else
@@ -58,6 +59,7 @@
5859
#include "fpipe.h"
5960
#include "ring.h"
6061
#include "message_ipfix.h"
62+
#include "message_periodic.h"
6163
#include "configurator/cpipe.h"
6264

6365
/** Identification of this component (for log) */
@@ -191,7 +193,7 @@ ipx_ctx_create(const char *name, const struct ipx_ctx_callbacks *callbacks)
191193
ctx->cfg_system.vlevel = ipx_verb_level_get();
192194
ctx->cfg_system.rec_size = IPX_MSG_IPFIX_BASE_REC_SIZE;
193195
ctx->cfg_system.msg_mask_selected = 0; // No messages to process selected
194-
ctx->cfg_system.msg_mask_allowed = IPX_MSG_IPFIX | IPX_MSG_SESSION;
196+
ctx->cfg_system.msg_mask_allowed = IPX_MSG_IPFIX | IPX_MSG_SESSION | IPX_MSG_PERIODIC;
195197
ctx->cfg_system.term_msg_cnt = 1; // By default, wait for 1 termination message
196198

197199
ctx->cfg_extension.items = NULL;
@@ -816,14 +818,14 @@ thread_handle_rc(struct ipx_ctx *ctx, int rc)
816818
// No more data -> stop the collector
817819
IPX_CTX_DEBUG(ctx, "The instance has signalized end-of-file/stream.", '\0');
818820
ipx_ctx_processing_set(ctx, false);
819-
ipx_cpipe_send_term(ctx, IPX_CPIPE_TYPE_TERM_SLOW);
821+
ipx_cpipe_send(ctx, IPX_CPIPE_TYPE_TERM_SLOW);
820822
break;
821823
case IPX_ERR_DENIED:
822824
// Fatal error -> stop the collector as fast as possible
823825
IPX_CTX_ERROR(ctx, "ipx_plugin_get()/ipx_plugin_process() failed! The collector cannot "
824826
"work properly anymore!", '\0');
825827
ipx_ctx_processing_set(ctx, false);
826-
ipx_cpipe_send_term(ctx, IPX_CPIPE_TYPE_TERM_FAST);
828+
ipx_cpipe_send(ctx, IPX_CPIPE_TYPE_TERM_FAST);
827829
break;
828830
default:
829831
IPX_CTX_ERROR(ctx, "ipx_plugin_get()/ipx_plugin_process() returned unexpected return "
@@ -876,6 +878,13 @@ thread_input_process_pipe(struct ipx_ctx *ctx)
876878
return IPX_OK;
877879
}
878880

881+
if (msg_type == IPX_MSG_PERIODIC) {
882+
ipx_msg_periodic_t *periodic_message = ipx_msg_base2periodic(msg_ptr);
883+
ipx_msg_periodic_update_last_processed(periodic_message);
884+
ipx_ctx_msg_pass(ctx, msg_ptr);
885+
return IPX_OK;
886+
}
887+
879888
if (msg_type == IPX_MSG_TERMINATE) {
880889
// Destroy the instance (usually produce garbage messages, etc)
881890
const char *plugin_name = ctx->plugin_cbs->info->name;
@@ -954,6 +963,8 @@ thread_intermediate(void *arg)
954963
ipx_msg_t *msg_ptr;
955964
enum ipx_msg_type msg_type;
956965

966+
uint64_t waiting_for_seq = 0;
967+
957968
bool terminate = false;
958969
while (!terminate) {
959970
// Get a new message for the buffer
@@ -978,6 +989,16 @@ thread_intermediate(void *arg)
978989
}
979990
}
980991

992+
if (msg_type == IPX_MSG_PERIODIC) {
993+
ipx_msg_periodic_t *periodic_message = ipx_msg_base2periodic(msg_ptr);
994+
if (ipx_msg_periodic_get_seq_num(periodic_message) != waiting_for_seq) {
995+
ipx_msg_periodic_destroy(periodic_message);
996+
continue;
997+
}
998+
waiting_for_seq++;
999+
ipx_msg_periodic_update_last_processed(periodic_message);
1000+
}
1001+
9811002
if (!ipx_ctx_processing_get(ctx)
9821003
&& (msg_type == IPX_MSG_IPFIX || msg_type == IPX_MSG_SESSION)) {
9831004
// Data processing is disabled -> drop IPFIX and Session messages
@@ -1048,6 +1069,11 @@ thread_output(void *arg)
10481069
thread_handle_rc(ctx, rc);
10491070
}
10501071

1072+
if (msg_type == IPX_MSG_PERIODIC) {
1073+
ipx_msg_periodic_t *periodic_message = ipx_msg_base2periodic(msg_ptr);
1074+
ipx_msg_periodic_update_last_processed(periodic_message);
1075+
}
1076+
10511077
if (msg_type == IPX_MSG_TERMINATE) {
10521078
ipx_msg_terminate_t *terminate_msg = ipx_msg_base2terminate(msg_ptr);
10531079
enum ipx_msg_terminate_type type = ipx_msg_terminate_get_type(terminate_msg);

src/core/message_base.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include <ipfixcol2.h>
4343
#include "message_base.h"
4444
#include "message_terminate.h"
45+
#include "message_periodic.h"
4546

4647
// Get the type of a message for the collector pipeline
4748
enum ipx_msg_type

src/core/message_terminate.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void
7777
ipx_msg_terminate_destroy(ipx_msg_terminate_t *msg)
7878
{
7979
ipx_msg_header_destroy((ipx_msg_t *) msg);
80-
ipx_cpipe_send_term(NULL, IPX_CPIPE_TYPE_TERM_DONE);
80+
ipx_cpipe_send(NULL, IPX_CPIPE_TYPE_TERM_DONE);
8181
free(msg);
8282
}
8383

0 commit comments

Comments
 (0)