Skip to content

Commit 35d2160

Browse files
committed
JSON output: add support for GZIP compression of files
1 parent 0b6080a commit 35d2160

File tree

7 files changed

+95
-22
lines changed

7 files changed

+95
-22
lines changed

README.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ Second, install build dependencies of the collector
7171

7272
.. code-block::
7373
74-
yum install gcc gcc-c++ cmake make python-docutils
74+
yum install gcc gcc-c++ cmake make python-docutils zlib-devel
7575
# Optionally: doxygen pkg-config
7676
7777
* Note: latest systems (e.g. Fedora) use ``dnf`` instead of ``yum``.
@@ -82,7 +82,7 @@ Second, install build dependencies of the collector
8282

8383
.. code-block::
8484
85-
apt-get install gcc g++ cmake make python-docutils
85+
apt-get install gcc g++ cmake make python-docutils libz-dev
8686
# Optionally: doxygen pkg-config
8787
8888
Finally, build and install the collector:

src/plugins/output/json/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ add_library(json-output MODULE
1515
src/Server.hpp
1616
)
1717

18+
find_package(ZLIB REQUIRED)
19+
include_directories(${ZLIB_INCLUDE_DIRS})
20+
target_link_libraries(json-output ${ZLIB_LIBRARIES})
21+
1822
install(
1923
TARGETS json-output
2024
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"

src/plugins/output/json/README.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@ outputs can be used at the same time if the outputs are not in collision with ea
223223
Specifies the time interval in seconds to rotate files [minimum 60, default 300]
224224
:``timeAlignment``:
225225
Align file rotation with next N minute interval (yes/no).
226+
:``compression``:
227+
Data compression helps to significantly reduce size of output files.
228+
Following compression algorithms are available:
229+
230+
:``none``: Compression disabled [default]
231+
:``gzip``: GZIP compression
226232

227233
:``print``:
228234
Write data on standard output.

src/plugins/output/json/src/Config.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ enum params_xml_nodes {
8484
FILE_PATH, /**< Path specification format */
8585
FILE_PREFIX, /**< File prefix */
8686
FILE_WINDOW, /**< Window interval */
87-
FILE_ALIGN /**< Window alignment */
87+
FILE_ALIGN, /**< Window alignment */
88+
FILE_COMPRESS /**< Compression */
8889
};
8990

9091
/** Definition of the \<print\> node */
@@ -118,6 +119,7 @@ static const struct fds_xml_args args_file[] = {
118119
FDS_OPTS_ELEM(FILE_PREFIX, "prefix", FDS_OPTS_T_STRING, 0),
119120
FDS_OPTS_ELEM(FILE_WINDOW, "timeWindow", FDS_OPTS_T_UINT, 0),
120121
FDS_OPTS_ELEM(FILE_ALIGN, "timeAlignment", FDS_OPTS_T_BOOL, 0),
122+
FDS_OPTS_ELEM(FILE_COMPRESS, "compression", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
121123
FDS_OPTS_END
122124
};
123125

@@ -339,6 +341,7 @@ Config::parse_file(fds_xml_ctx_t *file)
339341
struct cfg_file output;
340342
output.window_align = true;
341343
output.window_size = 300;
344+
output.m_calg = calg::NONE;
342345

343346
const struct fds_xml_cont *content;
344347
while (fds_xml_next(file, &content) != FDS_EOC) {
@@ -368,6 +371,18 @@ Config::parse_file(fds_xml_ctx_t *file)
368371
assert(content->type == FDS_OPTS_T_BOOL);
369372
output.window_align = content->val_bool;
370373
break;
374+
case FILE_COMPRESS:
375+
// Compression method
376+
assert(content->type == FDS_OPTS_T_STRING);
377+
if (strcasecmp(content->ptr_string, "none") == 0) {
378+
output.m_calg = calg::NONE;
379+
} else if (strcasecmp(content->ptr_string, "gzip") == 0) {
380+
output.m_calg = calg::GZIP;
381+
} else {
382+
const std::string inv_str = content->ptr_string;
383+
throw std::invalid_argument("Unknown compression algorithm '" + inv_str + "'");
384+
}
385+
break;
371386
default:
372387
throw std::invalid_argument("Unexpected element within <file>!");
373388
}

src/plugins/output/json/src/Config.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ struct cfg_server : cfg_output {
104104
bool blocking;
105105
};
106106

107+
enum class calg {
108+
NONE, ///< Do not use compression
109+
GZIP ///< GZIP compression
110+
};
111+
107112
/** Configuration of file writer */
108113
struct cfg_file : cfg_output {
109114
/** Path pattern */
@@ -114,6 +119,8 @@ struct cfg_file : cfg_output {
114119
uint32_t window_size;
115120
/** Enable/disable window alignment */
116121
bool window_align;
122+
/** Compression algorithm */
123+
calg m_calg;
117124
};
118125

119126
/** Parsed configuration of an instance */

src/plugins/output/json/src/File.cpp

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include <sys/stat.h>
5353
#include <unistd.h>
5454
#include <climits>
55+
#include <zlib.h>
5556

5657
/**
5758
* \brief Class constructor
@@ -69,6 +70,7 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
6970
_thread->storage_path = cfg.path_pattern;
7071
_thread->file_prefix = cfg.prefix;
7172
_thread->window_size = cfg.window_size;
73+
_thread->m_calg = cfg.m_calg;
7274
time(&_thread->window_time);
7375

7476
if (cfg.window_size < _WINDOW_MIN_SIZE) {
@@ -88,8 +90,8 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
8890
}
8991

9092
// Create directory & first file
91-
FILE *new_file = file_create(ctx, _thread->storage_path, _thread->file_prefix,
92-
_thread->window_time);
93+
void *new_file = file_create(ctx, _thread->storage_path, _thread->file_prefix,
94+
_thread->window_time, _thread->m_calg);
9395
if (!new_file) {
9496
delete _thread;
9597
throw std::runtime_error("(File output) Failed to create a time window file.");
@@ -99,28 +101,44 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
99101

100102
pthread_rwlockattr_t attr;
101103
if (pthread_rwlockattr_init(&attr) != 0) {
102-
fclose(_thread->file);
104+
if (_thread->m_calg == calg::GZIP) {
105+
gzclose((gzFile)_thread->file);
106+
} else {
107+
fclose((FILE *)_thread->file);
108+
}
103109
delete _thread;
104110
throw std::runtime_error("(File output) Rwlockattr initialization failed!");
105111
}
106112

107113
if (pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) != 0) {
108-
fclose(_thread->file);
114+
if (_thread->m_calg == calg::GZIP) {
115+
gzclose((gzFile)_thread->file);
116+
} else {
117+
fclose((FILE *)_thread->file);
118+
}
109119
pthread_rwlockattr_destroy(&attr);
110120
delete _thread;
111121
throw std::runtime_error("(File output) Rwlockattr setkind failed!");
112122
}
113123

114124
if (pthread_rwlock_init(&_thread->rwlock, &attr) != 0) {
115-
fclose(_thread->file);
125+
if (_thread->m_calg == calg::GZIP) {
126+
gzclose((gzFile)_thread->file);
127+
} else {
128+
fclose((FILE *)_thread->file);
129+
}
116130
pthread_rwlockattr_destroy(&attr);
117131
delete _thread;
118132
throw std::runtime_error("(File output) Rwlock initialization failed!");
119133
}
120134

121135
pthread_rwlockattr_destroy(&attr);
122136
if (pthread_create(&_thread->thread, NULL, &File::thread_window, _thread) != 0) {
123-
fclose(_thread->file);
137+
if (_thread->m_calg == calg::GZIP) {
138+
gzclose((gzFile)_thread->file);
139+
} else {
140+
fclose((FILE *)_thread->file);
141+
}
124142
pthread_rwlock_destroy(&_thread->rwlock);
125143
delete _thread;
126144
throw std::runtime_error("(File output) Failed to start a thread for changing time "
@@ -141,7 +159,11 @@ File::~File()
141159
pthread_rwlock_destroy(&_thread->rwlock);
142160

143161
if (_thread->file) {
144-
fclose(_thread->file);
162+
if (_thread->m_calg == calg::GZIP) {
163+
gzclose((gzFile)_thread->file);
164+
} else {
165+
fclose((FILE *)_thread->file);
166+
}
145167
}
146168

147169
delete _thread;
@@ -177,12 +199,16 @@ File::thread_window(void *context)
177199
// New time window
178200
pthread_rwlock_wrlock(&data->rwlock);
179201
if (data->file) {
180-
fclose(data->file);
202+
if (data->m_calg == calg::GZIP) {
203+
gzclose((gzFile)data->file);
204+
} else {
205+
fclose((FILE *)data->file);
206+
}
181207
data->file = nullptr;
182208
}
183209

184210
data->window_time += data->window_size;
185-
FILE *file = file_create(data->ctx, data->storage_path, data->file_prefix, data->window_time);
211+
void *file = file_create(data->ctx, data->storage_path, data->file_prefix, data->window_time, data->m_calg);
186212
if (!file) {
187213
IPX_CTX_ERROR(data->ctx, "(File output) Failed to create a time window file.", '\0');
188214
}
@@ -209,7 +235,11 @@ File::process(const char *str, size_t len)
209235
pthread_rwlock_rdlock(&_thread->rwlock);
210236
if (_thread->file) {
211237
// Store the record
212-
fwrite(str, len, 1, _thread->file);
238+
if (_thread->m_calg == calg::GZIP) {
239+
gzfwrite(str, len, 1, (gzFile)_thread->file);
240+
} else {
241+
fwrite(str, len, 1, (FILE *)_thread->file);
242+
}
213243
}
214244
pthread_rwlock_unlock(&_thread->rwlock);
215245
return IPX_OK;
@@ -220,7 +250,11 @@ File::flush()
220250
{
221251
pthread_rwlock_rdlock(&_thread->rwlock);
222252
if (_thread->file) {
223-
fflush(_thread->file);
253+
if (_thread->m_calg == calg::GZIP) {
254+
gzflush((gzFile)_thread->file, Z_SYNC_FLUSH);
255+
} else {
256+
fflush((FILE *)_thread->file);
257+
}
224258
}
225259
pthread_rwlock_unlock(&_thread->rwlock);
226260
}
@@ -342,9 +376,9 @@ File::dir_create(ipx_ctx_t *ctx, const std::string &path)
342376
* \param[in] tm Timestamp
343377
* \return On success returns pointer to the file, Otherwise returns NULL.
344378
*/
345-
FILE *
379+
void *
346380
File::file_create(ipx_ctx_t *ctx, const std::string &tmplt, const std::string &prefix,
347-
const time_t &tm)
381+
const time_t &tm, calg m_calg)
348382
{
349383
char file_fmt[20];
350384

@@ -372,8 +406,15 @@ File::file_create(ipx_ctx_t *ctx, const std::string &tmplt, const std::string &p
372406
return NULL;
373407
}
374408

375-
std::string file_name = directory + prefix + file_fmt;
376-
FILE *file = fopen(file_name.c_str(), "a");
409+
std::string file_name;
410+
void *file;
411+
if (m_calg == calg::GZIP) {
412+
file_name = directory + prefix + file_fmt + ".gz";
413+
file = gzopen(file_name.c_str(), "a9");
414+
} else {
415+
file_name = directory + prefix + file_fmt;
416+
file = fopen(file_name.c_str(), "a");
417+
}
377418
if (!file) {
378419
// Failed to create a flow file
379420
char buffer[128];

src/plugins/output/json/src/File.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
#include <atomic>
4646
#include <string>
4747
#include <ctime>
48-
#include <cstdio>
4948

5049
#include <pthread.h>
5150
#include "Storage.hpp"
@@ -79,8 +78,9 @@ class File : public Output {
7978
time_t window_time; /**< Current time window */
8079
std::string storage_path; /**< Storage path (template) */
8180
std::string file_prefix; /**< File prefix */
81+
calg m_calg; /**< Compression */
8282

83-
FILE *file; /**< File descriptor */
83+
void *file; /**< File descriptor */
8484
} thread_ctx_t;
8585

8686
/** Thread for changing time windows */
@@ -92,8 +92,8 @@ class File : public Output {
9292
// Create a directory for a time window
9393
static int dir_create(ipx_ctx_t *ctx, const std::string &path);
9494
// Create a file for a time window
95-
static FILE *file_create(ipx_ctx_t *ctx, const std::string &tmplt, const std::string &prefix,
96-
const time_t &tm);
95+
static void *file_create(ipx_ctx_t *ctx, const std::string &tmplt, const std::string &prefix,
96+
const time_t &tm, calg m_calg);
9797
// Window changer
9898
static void *thread_window(void *context);
9999
};

0 commit comments

Comments
 (0)