5252#include < sys/stat.h>
5353#include < unistd.h>
5454#include < climits>
55+ #include < zlib.h>
5556
5657/* *
5758 * \brief Class constructor
@@ -69,6 +70,7 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
6970 _thread->storage_path = cfg.path_pattern ;
7071 _thread->file_prefix = cfg.prefix ;
7172 _thread->window_size = cfg.window_size ;
73+ _thread->m_calg = cfg.m_calg ;
7274 time (&_thread->window_time );
7375
7476 if (cfg.window_size < _WINDOW_MIN_SIZE) {
@@ -88,8 +90,8 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
8890 }
8991
9092 // Create directory & first file
91- FILE *new_file = file_create (ctx, _thread->storage_path , _thread->file_prefix ,
92- _thread->window_time );
93+ void *new_file = file_create (ctx, _thread->storage_path , _thread->file_prefix ,
94+ _thread->window_time , _thread-> m_calg );
9395 if (!new_file) {
9496 delete _thread;
9597 throw std::runtime_error (" (File output) Failed to create a time window file." );
@@ -99,28 +101,44 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
99101
100102 pthread_rwlockattr_t attr;
101103 if (pthread_rwlockattr_init (&attr) != 0 ) {
102- fclose (_thread->file );
104+ if (_thread->m_calg == calg::GZIP) {
105+ gzclose ((gzFile)_thread->file );
106+ } else {
107+ fclose ((FILE *)_thread->file );
108+ }
103109 delete _thread;
104110 throw std::runtime_error (" (File output) Rwlockattr initialization failed!" );
105111 }
106112
107113 if (pthread_rwlockattr_setkind_np (&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) != 0 ) {
108- fclose (_thread->file );
114+ if (_thread->m_calg == calg::GZIP) {
115+ gzclose ((gzFile)_thread->file );
116+ } else {
117+ fclose ((FILE *)_thread->file );
118+ }
109119 pthread_rwlockattr_destroy (&attr);
110120 delete _thread;
111121 throw std::runtime_error (" (File output) Rwlockattr setkind failed!" );
112122 }
113123
114124 if (pthread_rwlock_init (&_thread->rwlock , &attr) != 0 ) {
115- fclose (_thread->file );
125+ if (_thread->m_calg == calg::GZIP) {
126+ gzclose ((gzFile)_thread->file );
127+ } else {
128+ fclose ((FILE *)_thread->file );
129+ }
116130 pthread_rwlockattr_destroy (&attr);
117131 delete _thread;
118132 throw std::runtime_error (" (File output) Rwlock initialization failed!" );
119133 }
120134
121135 pthread_rwlockattr_destroy (&attr);
122136 if (pthread_create (&_thread->thread , NULL , &File::thread_window, _thread) != 0 ) {
123- fclose (_thread->file );
137+ if (_thread->m_calg == calg::GZIP) {
138+ gzclose ((gzFile)_thread->file );
139+ } else {
140+ fclose ((FILE *)_thread->file );
141+ }
124142 pthread_rwlock_destroy (&_thread->rwlock );
125143 delete _thread;
126144 throw std::runtime_error (" (File output) Failed to start a thread for changing time "
@@ -141,7 +159,11 @@ File::~File()
141159 pthread_rwlock_destroy (&_thread->rwlock );
142160
143161 if (_thread->file ) {
144- fclose (_thread->file );
162+ if (_thread->m_calg == calg::GZIP) {
163+ gzclose ((gzFile)_thread->file );
164+ } else {
165+ fclose ((FILE *)_thread->file );
166+ }
145167 }
146168
147169 delete _thread;
@@ -177,12 +199,16 @@ File::thread_window(void *context)
177199 // New time window
178200 pthread_rwlock_wrlock (&data->rwlock );
179201 if (data->file ) {
180- fclose (data->file );
202+ if (data->m_calg == calg::GZIP) {
203+ gzclose ((gzFile)data->file );
204+ } else {
205+ fclose ((FILE *)data->file );
206+ }
181207 data->file = nullptr ;
182208 }
183209
184210 data->window_time += data->window_size ;
185- FILE *file = file_create (data->ctx , data->storage_path , data->file_prefix , data->window_time );
211+ void *file = file_create (data->ctx , data->storage_path , data->file_prefix , data->window_time , data-> m_calg );
186212 if (!file) {
187213 IPX_CTX_ERROR (data->ctx , " (File output) Failed to create a time window file." , ' \0 ' );
188214 }
@@ -209,7 +235,11 @@ File::process(const char *str, size_t len)
209235 pthread_rwlock_rdlock (&_thread->rwlock );
210236 if (_thread->file ) {
211237 // Store the record
212- fwrite (str, len, 1 , _thread->file );
238+ if (_thread->m_calg == calg::GZIP) {
239+ gzfwrite (str, len, 1 , (gzFile)_thread->file );
240+ } else {
241+ fwrite (str, len, 1 , (FILE *)_thread->file );
242+ }
213243 }
214244 pthread_rwlock_unlock (&_thread->rwlock );
215245 return IPX_OK;
@@ -220,7 +250,11 @@ File::flush()
220250{
221251 pthread_rwlock_rdlock (&_thread->rwlock );
222252 if (_thread->file ) {
223- fflush (_thread->file );
253+ if (_thread->m_calg == calg::GZIP) {
254+ gzflush ((gzFile)_thread->file , Z_SYNC_FLUSH);
255+ } else {
256+ fflush ((FILE *)_thread->file );
257+ }
224258 }
225259 pthread_rwlock_unlock (&_thread->rwlock );
226260}
@@ -342,9 +376,9 @@ File::dir_create(ipx_ctx_t *ctx, const std::string &path)
342376 * \param[in] tm Timestamp
343377 * \return On success returns pointer to the file, Otherwise returns NULL.
344378 */
345- FILE *
379+ void *
346380File::file_create (ipx_ctx_t *ctx, const std::string &tmplt, const std::string &prefix,
347- const time_t &tm)
381+ const time_t &tm, calg m_calg )
348382{
349383 char file_fmt[20 ];
350384
@@ -372,8 +406,15 @@ File::file_create(ipx_ctx_t *ctx, const std::string &tmplt, const std::string &p
372406 return NULL ;
373407 }
374408
375- std::string file_name = directory + prefix + file_fmt;
376- FILE *file = fopen (file_name.c_str (), " a" );
409+ std::string file_name;
410+ void *file;
411+ if (m_calg == calg::GZIP) {
412+ file_name = directory + prefix + file_fmt + " .gz" ;
413+ file = gzopen (file_name.c_str (), " a9" );
414+ } else {
415+ file_name = directory + prefix + file_fmt;
416+ file = fopen (file_name.c_str (), " a" );
417+ }
377418 if (!file) {
378419 // Failed to create a flow file
379420 char buffer[128 ];
0 commit comments