Skip to content

Commit 9200f13

Browse files
committed
Merge branch 'periodic-message' into 'devel'
Periodic message See merge request monitoring/ipfixcol2!15
2 parents b0212b2 + 46402d2 commit 9200f13

21 files changed

+358
-17
lines changed

include/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ set(SUB_HEADERS
1414
ipfixcol2/message_garbage.h
1515
ipfixcol2/message_ipfix.h
1616
ipfixcol2/message_session.h
17+
ipfixcol2/message_periodic.h
1718
ipfixcol2/plugins.h
1819
ipfixcol2/session.h
1920
ipfixcol2/utils.h

include/ipfixcol2.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
#include <ipfixcol2/message_garbage.h>
6161
#include <ipfixcol2/message_session.h>
6262
#include <ipfixcol2/message_ipfix.h>
63+
#include <ipfixcol2/message_periodic.h>
6364

6465
#include <ipfixcol2/plugins.h>
6566
#include <ipfixcol2/session.h>

include/ipfixcol2/message.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ enum ipx_msg_type {
7777
IPX_MSG_TERMINATE = (1 << 3),
7878
// An internal configuration message (only for internal usage)
7979
//IPX_MSG_CONFIG = (1 << 4)
80+
/** A periodic message */
81+
IPX_MSG_PERIODIC = (1 << 5)
8082
};
8183

8284
/** The data type of the base message */
@@ -107,6 +109,7 @@ ipx_msg_destroy(ipx_msg_t *msg);
107109
#include <ipfixcol2/message_ipfix.h>
108110
#include <ipfixcol2/message_garbage.h>
109111
#include <ipfixcol2/message_session.h>
112+
#include <ipfixcol2/message_periodic.h>
110113

111114
/**
112115
* \brief Cast from a base message to a session message
@@ -147,6 +150,19 @@ ipx_msg_base2ipfix(ipx_msg_t *msg)
147150
return (ipx_msg_ipfix_t *) msg;
148151
}
149152

153+
/**
154+
* \brief Cast base message to periodic message
155+
*
156+
* \param[in] msg Pointer to the base message
157+
* \return A pointer to a periodic message
158+
*/
159+
static inline ipx_msg_periodic_t *
160+
ipx_msg_base2periodic(ipx_msg_t *msg)
161+
{
162+
assert(ipx_msg_get_type(msg) == IPX_MSG_PERIODIC);
163+
return (ipx_msg_periodic_t *) msg;
164+
}
165+
150166
/**@}*/
151167

152168
#ifdef __cplusplus

include/ipfixcol2/message_periodic.h

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* @file
3+
* @author Adrian Duriska <[email protected]>
4+
* @brief Periodic message
5+
*
6+
* Copyright: (C) 2024 CESNET, z.s.p.o.
7+
* SPDX-License-Identifier: BSD-3-Clause
8+
*/
9+
10+
#ifndef IPX_MESSAGE_PERIODIC_H
11+
#define IPX_MESSAGE_PERIODIC_H
12+
13+
#ifdef __cplusplus
14+
extern "C" {
15+
#endif
16+
17+
#include <ipfixcol2/message.h>
18+
19+
typedef struct ipx_msg_periodic ipx_msg_periodic_t;
20+
21+
/**
22+
* \brief Cast periodic message to base message
23+
*
24+
* \param[in] msg Pointer to the periodic message
25+
* \return A pointer to a base message
26+
*/
27+
static inline ipx_msg_t *
28+
ipx_msg_periodic2base(ipx_msg_periodic_t *msg)
29+
{
30+
return (ipx_msg_t *) msg;
31+
}
32+
33+
/**
34+
* \brief Get sequence number of periodic message
35+
*
36+
* \param[in] msg Pointer to the periodic message
37+
* \return Periodic messages sequence number
38+
*/
39+
IPX_API uint64_t
40+
ipx_msg_periodic_get_seq_num(ipx_msg_periodic_t *msg);
41+
42+
/**
43+
* \brief Get created time of periodic message
44+
*
45+
* \param[in] msg Pointer to the periodic message
46+
* \return Periodic messages created time
47+
*/
48+
IPX_API struct timespec
49+
ipx_msg_periodic_get_created(ipx_msg_periodic_t *msg);
50+
51+
/**
52+
* \brief Get last intermediate plugin processed time of periodic message
53+
*
54+
* \param[in] msg Pointer to the periodic message
55+
* \return Periodic messages last intermediate plugin processed time
56+
*/
57+
IPX_API struct timespec
58+
ipx_msg_periodic_get_last_processed(ipx_msg_periodic_t *msg);
59+
60+
#ifdef __cplusplus
61+
}
62+
#endif
63+
#endif

include/ipfixcol2/plugins.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ ipx_ctx_msg_pass(ipx_ctx_t *ctx, ipx_msg_t *msg);
381381
* types of messages:
382382
* - ::IPX_MSG_IPFIX (IPFIX Message)
383383
* - ::IPX_MSG_SESSION (Transport Session Message)
384+
* - ::IPX_MSG_PERIODIC (Periodic Message)
384385
*
385386
* If \p mask_new is non-NULL, the new subscription mask is installed from \p mask_new.
386387
* If \p mask_old is non-NULL, the previous mask is saved in \p mask_old.

src/core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ set(CORE_SOURCE
4141
message_garbage.c
4242
message_ipfix.c
4343
message_ipfix.h
44+
message_periodic.c
45+
message_periodic.h
4446
message_session.c
4547
message_terminate.c
4648
message_terminate.h

src/core/configurator/configurator.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@
4646
#include <cstdlib>
4747
#include <dlfcn.h>
4848
#include <signal.h>
49+
#include <sys/time.h>
4950

5051
#include "configurator.hpp"
5152
#include "extensions.hpp"
5253

5354
extern "C" {
5455
#include "../message_terminate.h"
56+
#include "../message_periodic.h"
5557
#include "../plugin_parser.h"
5658
#include "../plugin_output_mgr.h"
5759
#include "../verbose.h"
@@ -88,7 +90,7 @@ termination_handler(int sig)
8890
cnt++;
8991

9092
// Send a termination request to the configurator
91-
int rc = ipx_cpipe_send_term(NULL, request_type);
93+
int rc = ipx_cpipe_send(NULL, request_type);
9294
if (rc != IPX_OK) {
9395
static const char *msg = "ERROR: Signal handler: failed to send a termination request";
9496
write(STDOUT_FILENO, msg, strlen(msg));
@@ -568,6 +570,22 @@ ipx_configurator::termination_send_msg()
568570
m_term_sent = m_running_inputs.size();
569571
}
570572

573+
void
574+
ipx_configurator::periodic_send_msg(uint32_t *periodic_message_sequence)
575+
{
576+
for (auto &input : m_running_inputs) {
577+
ipx_msg_periodic_t *msg = ipx_msg_periodic_create(*periodic_message_sequence);
578+
if (!msg) {
579+
IPX_ERROR(comp_str, "Can't create periodic message!", '\0');
580+
termination_send_msg();
581+
return;
582+
}
583+
584+
ipx_fpipe_write(input->get_feedback(), ipx_msg_periodic2base(msg));
585+
}
586+
(*periodic_message_sequence)++;
587+
}
588+
571589
int
572590
ipx_configurator::run(ipx_controller *ctrl)
573591
{
@@ -595,6 +613,9 @@ ipx_configurator::run(ipx_controller *ctrl)
595613
// Collector is running -> process termination/reconfiguration requests
596614
m_state = STATUS::RUNNING;
597615
bool terminate = false;
616+
617+
uint32_t periodic_message_sequence = 0;
618+
598619
while (!terminate) {
599620
struct ipx_cpipe_req req;
600621
if (ipx_cpipe_receive(&req) != IPX_OK) {
@@ -609,6 +630,9 @@ ipx_configurator::run(ipx_controller *ctrl)
609630
case IPX_CPIPE_TYPE_TERM_DONE:
610631
terminate = termination_handle(req, ctrl);
611632
break;
633+
case IPX_CPIPE_TYPE_PERIODIC:
634+
periodic_send_msg(&periodic_message_sequence);
635+
break;
612636
default:
613637
IPX_ERROR(comp_str, "Ignoring unknown configuration request!", '\0');
614638
continue;

src/core/configurator/configurator.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ class ipx_configurator {
150150
void
151151
termination_send_msg();
152152

153+
void
154+
periodic_send_msg(uint32_t *periodic_message_sequence);
155+
153156
void
154157
termination_stop_all();
155158
void

src/core/configurator/cpipe.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,22 @@
1414
#include <fcntl.h>
1515
#include <errno.h>
1616
#include <limits.h> // PIPE_BUF
17+
#include <sys/timerfd.h>
18+
#include <poll.h>
1719

1820
#include <ipfixcol2.h>
1921

2022
#include "cpipe.h"
2123
#include "../verbose.h"
2224

25+
/// Periodic message delay in milliseconds
26+
#define IPX_PERIODIC_MESSAGE_DELAY 100
2327
/// Invalid file descriptor value
2428
#define INVALID_FD (-1)
2529
/// Configuration pipe - cpipe_fd[0] for read, cpipe_fd[1] for write
2630
static int cpipe_fd[2] = {INVALID_FD, INVALID_FD};
31+
/// Periodic timer file descriptor
32+
static int periodic_timer_fd = INVALID_FD;
2733
/// Identification of the module (just for log!)
2834
static const char *module = "Configuration pipe";
2935

@@ -61,6 +67,21 @@ ipx_cpipe_init()
6167
return IPX_ERR_DENIED;
6268
}
6369

70+
periodic_timer_fd = timerfd_create(CLOCK_MONOTONIC, 0);
71+
if (periodic_timer_fd == -1) {
72+
ipx_strerror(errno, err_str);
73+
IPX_ERROR(module, "timerfd_create() failed: %s", err_str);
74+
ipx_cpipe_destroy();
75+
return IPX_ERR_DENIED;
76+
}
77+
struct itimerspec timer_settings;
78+
struct timespec interval;
79+
interval.tv_nsec = (IPX_PERIODIC_MESSAGE_DELAY % 1000) * 1000000;
80+
interval.tv_sec = IPX_PERIODIC_MESSAGE_DELAY / 1000;
81+
timer_settings.it_interval = interval;
82+
timer_settings.it_value = interval;
83+
timerfd_settime(periodic_timer_fd, 0, &timer_settings, NULL);
84+
6485
return IPX_OK;
6586
}
6687

@@ -75,16 +96,31 @@ ipx_cpipe_destroy()
7596
close(cpipe_fd[i]);
7697
cpipe_fd[i] = INVALID_FD;
7798
}
99+
if (periodic_timer_fd != INVALID_FD) {
100+
close(periodic_timer_fd);
101+
periodic_timer_fd = INVALID_FD;
102+
}
78103
}
79104

80105
int
81106
ipx_cpipe_receive(struct ipx_cpipe_req *msg)
82107
{
83108
const size_t buffer_size = sizeof(*msg);
84109
size_t buffer_read = 0;
110+
uint64_t periodic_timer_buf;
111+
112+
struct pollfd fds;
113+
fds.fd = periodic_timer_fd;
114+
fds.events = POLLIN;
85115

86116
errno = 0;
87117
while (buffer_read < buffer_size) {
118+
if (poll(&fds, 1, 0) != -1) {
119+
// Reset timer expiration counter
120+
if (read(periodic_timer_fd, &periodic_timer_buf, sizeof(periodic_timer_buf)) != -1) {
121+
ipx_cpipe_send(NULL, IPX_CPIPE_TYPE_PERIODIC);
122+
}
123+
}
88124
uint8_t *ptr = ((uint8_t *) msg) + buffer_read;
89125
ssize_t rc = read(cpipe_fd[0], ptr, buffer_size - buffer_read);
90126
if (rc > 0) {
@@ -113,16 +149,15 @@ ipx_cpipe_receive(struct ipx_cpipe_req *msg)
113149
}
114150

115151
int
116-
ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
152+
ipx_cpipe_send(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
117153
{
118154
// WARNING: Keep on mind that this function can be called from signal handler!
119155

120156
// In case we change 'errno' (e.g. write())
121157
int errno_backup = errno;
122158

123-
if (type != IPX_CPIPE_TYPE_TERM_SLOW
124-
&& type != IPX_CPIPE_TYPE_TERM_FAST
125-
&& type != IPX_CPIPE_TYPE_TERM_DONE) {
159+
if (type != IPX_CPIPE_TYPE_TERM_SLOW && type != IPX_CPIPE_TYPE_TERM_FAST
160+
&& type != IPX_CPIPE_TYPE_TERM_DONE && type != IPX_CPIPE_TYPE_PERIODIC) {
126161
return IPX_ERR_ARG;
127162
}
128163

@@ -140,4 +175,3 @@ ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
140175
errno = errno_backup;
141176
return (rc == -1) ? IPX_ERR_DENIED : IPX_OK;
142177
}
143-

src/core/configurator/cpipe.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ enum ipx_cpipe_type {
8383
*
8484
* Sending this request before termination of all plugin instances is considered as fatal.
8585
*/
86-
IPX_CPIPE_TYPE_TERM_DONE ///< Terminate request - complete
86+
IPX_CPIPE_TYPE_TERM_DONE, ///< Terminate request - complete
8787

8888
// Proposed types for the future runtime reconfiguration
8989
// IPX_CPIPE_TYPE_RECONF_START,
9090
// IPX_CPIPE_TYPE_RECONF_DONE
91+
92+
IPX_CPIPE_TYPE_PERIODIC
9193
};
9294

9395
/// Configuration request
@@ -152,7 +154,7 @@ ipx_cpipe_receive(struct ipx_cpipe_req *msg);
152154
* @return #IPX_ERR_DENIED if the request failed to be sent
153155
*/
154156
int
155-
ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type);
157+
ipx_cpipe_send(ipx_ctx_t *ctx, enum ipx_cpipe_type type);
156158

157159
#ifdef __cplusplus
158160
}

0 commit comments

Comments
 (0)