1414#include <fcntl.h>
1515#include <errno.h>
1616#include <limits.h> // PIPE_BUF
17+ #include <sys/timerfd.h>
18+ #include <poll.h>
1719
1820#include <ipfixcol2.h>
1921
2022#include "cpipe.h"
2123#include "../verbose.h"
2224
25+ /// Periodic message delay in milliseconds
26+ #define IPX_PERIODIC_MESSAGE_DELAY 100
2327/// Invalid file descriptor value
2428#define INVALID_FD (-1)
2529/// Configuration pipe - cpipe_fd[0] for read, cpipe_fd[1] for write
2630static int cpipe_fd [2 ] = {INVALID_FD , INVALID_FD };
31+ /// Periodic timer file descriptor
32+ static int periodic_timer_fd = INVALID_FD ;
2733/// Identification of the module (just for log!)
2834static const char * module = "Configuration pipe" ;
2935
@@ -33,7 +39,8 @@ static_assert(sizeof(struct ipx_cpipe_req) <= PIPE_BUF, "non-atomic write!");
3339int
3440ipx_cpipe_init ()
3541{
36- assert (cpipe_fd [0 ] == INVALID_FD && cpipe_fd [1 ] == INVALID_FD && "Already initialized!" );
42+ assert (cpipe_fd [0 ] == INVALID_FD && cpipe_fd [1 ] == INVALID_FD && cpipe_fd [2 ] == INVALID_FD
43+ && "Already initialized!" );
3744 int rc ;
3845 const char * err_str ;
3946
@@ -61,6 +68,21 @@ ipx_cpipe_init()
6168 return IPX_ERR_DENIED ;
6269 }
6370
71+ periodic_timer_fd = timerfd_create (CLOCK_MONOTONIC , 0 );
72+ if (periodic_timer_fd == -1 ) {
73+ ipx_strerror (errno , err_str );
74+ IPX_ERROR (module , "timerfd_create() failed: %s" , err_str );
75+ ipx_cpipe_destroy ();
76+ return IPX_ERR_DENIED ;
77+ }
78+ struct itimerspec timer_settings ;
79+ struct timespec interval ;
80+ interval .tv_nsec = (IPX_PERIODIC_MESSAGE_DELAY % 1000 ) * 1000000 ;
81+ interval .tv_sec = IPX_PERIODIC_MESSAGE_DELAY / 1000 ;
82+ timer_settings .it_interval = interval ;
83+ timer_settings .it_value = interval ;
84+ timerfd_settime (periodic_timer_fd , 0 , & timer_settings , NULL );
85+
6486 return IPX_OK ;
6587}
6688
@@ -75,16 +97,31 @@ ipx_cpipe_destroy()
7597 close (cpipe_fd [i ]);
7698 cpipe_fd [i ] = INVALID_FD ;
7799 }
100+ if (periodic_timer_fd != INVALID_FD ) {
101+ close (periodic_timer_fd );
102+ periodic_timer_fd = INVALID_FD ;
103+ }
78104}
79105
80106int
81107ipx_cpipe_receive (struct ipx_cpipe_req * msg )
82108{
83109 const size_t buffer_size = sizeof (* msg );
84110 size_t buffer_read = 0 ;
111+ uint64_t periodic_timer_buf ;
112+
113+ struct pollfd fds ;
114+ fds .fd = periodic_timer_fd ;
115+ fds .events = POLLIN ;
85116
86117 errno = 0 ;
87118 while (buffer_read < buffer_size ) {
119+ if (poll (& fds , 1 , 0 ) != -1 ) {
120+ // Reset timer expiration counter
121+ if (read (periodic_timer_fd , & periodic_timer_buf , sizeof (periodic_timer_buf )) != -1 ) {
122+ ipx_cpipe_send (NULL , IPX_CPIPE_TYPE_PERIODIC );
123+ }
124+ }
88125 uint8_t * ptr = ((uint8_t * ) msg ) + buffer_read ;
89126 ssize_t rc = read (cpipe_fd [0 ], ptr , buffer_size - buffer_read );
90127 if (rc > 0 ) {
@@ -113,16 +150,15 @@ ipx_cpipe_receive(struct ipx_cpipe_req *msg)
113150}
114151
115152int
116- ipx_cpipe_send_term (ipx_ctx_t * ctx , enum ipx_cpipe_type type )
153+ ipx_cpipe_send (ipx_ctx_t * ctx , enum ipx_cpipe_type type )
117154{
118155 // WARNING: Keep on mind that this function can be called from signal handler!
119156
120157 // In case we change 'errno' (e.g. write())
121158 int errno_backup = errno ;
122159
123- if (type != IPX_CPIPE_TYPE_TERM_SLOW
124- && type != IPX_CPIPE_TYPE_TERM_FAST
125- && type != IPX_CPIPE_TYPE_TERM_DONE ) {
160+ if (type != IPX_CPIPE_TYPE_TERM_SLOW && type != IPX_CPIPE_TYPE_TERM_FAST
161+ && type != IPX_CPIPE_TYPE_TERM_DONE && type != IPX_CPIPE_TYPE_PERIODIC ) {
126162 return IPX_ERR_ARG ;
127163 }
128164
@@ -140,4 +176,3 @@ ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type)
140176 errno = errno_backup ;
141177 return (rc == -1 ) ? IPX_ERR_DENIED : IPX_OK ;
142178}
143-
0 commit comments