Skip to content

Commit b0b3a00

Browse files
authored
Merge pull request #32 from CESNET/hutak-ipfix-file
IPFIX input: add an input buffer to reduce small I/O operations
2 parents 4cd508a + b2ac300 commit b0b3a00

File tree

4 files changed

+139
-31
lines changed

4 files changed

+139
-31
lines changed

src/plugins/input/ipfix/README.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,8 @@ Parameters
2727
Path to file(s) in IPFIX File format. It is possible to use asterisk instead of
2828
a filename/directory, tilde character (i.e. "~") instead of the home directory of
2929
the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix").
30-
Directories and non-IPFIX Files that match the file pattern are skipped/ignored.
30+
Directories and non-IPFIX Files that match the file pattern are skipped/ignored.
31+
32+
:``bufferSize``:
33+
Optional size of the internal buffer to which the content of the file is partly
34+
preloaded. [default: 1048576, min: 131072]

src/plugins/input/ipfix/config.c

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,25 @@
4747
/*
4848
* <params>
4949
* <path>...</path> // required, exactly once
50+
* <bufferSize>...</bufferSize> // optional
5051
* </params>
5152
*/
5253

54+
/** Default buffer size */
55+
#define BSIZE_DEF (1048576U)
56+
#define BSIZE_MIN (131072U)
57+
5358
/** XML nodes */
5459
enum params_xml_nodes {
55-
NODE_PATH = 1
60+
NODE_PATH = 1,
61+
NODE_BSIZE
5662
};
5763

5864
/** Definition of the \<params\> node */
5965
static const struct fds_xml_args args_params[] = {
6066
FDS_OPTS_ROOT("params"),
6167
FDS_OPTS_ELEM(NODE_PATH, "path", FDS_OPTS_T_STRING, 0),
68+
FDS_OPTS_ELEM(NODE_BSIZE, "bufferSize", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
6269
FDS_OPTS_END
6370
};
6471

@@ -81,6 +88,10 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg
8188
assert(content->type == FDS_OPTS_T_STRING);
8289
cfg->path = strdup(content->ptr_string);
8390
break;
91+
case NODE_BSIZE:
92+
assert(content->type == FDS_OPTS_T_UINT);
93+
cfg->bsize = content->val_uint;
94+
break;
8495
default:
8596
// Internal error
8697
assert(false);
@@ -89,6 +100,12 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg
89100

90101
if (!cfg->path) {
91102
IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__);
103+
return IPX_ERR_FORMAT;
104+
}
105+
106+
if (cfg->bsize < BSIZE_MIN) {
107+
IPX_CTX_ERROR(ctx, "Buffer size must be at least %u bytes!" , (unsigned int) BSIZE_MIN);
108+
return IPX_ERR_FORMAT;
92109
}
93110

94111
return IPX_OK;
@@ -102,6 +119,7 @@ static void
102119
config_default_set(struct ipfix_config *cfg)
103120
{
104121
cfg->path = NULL;
122+
cfg->bsize = BSIZE_DEF;
105123
}
106124

107125
struct ipfix_config *

src/plugins/input/ipfix/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
struct ipfix_config {
5151
/** File pattern */
5252
char *path;
53+
/** Read buffer size */
54+
uint64_t bsize;
5355
};
5456

5557
/**

src/plugins/input/ipfix/ipfix.c

Lines changed: 113 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,17 @@ struct plugin_data {
7979
FILE *current_file;
8080
/// Name/path of the current file
8181
const char *current_name;
82-
/// Transport Session identification of the
83-
struct ipx_session *ts;
82+
/// Transport Session identification
83+
struct ipx_session *current_ts;
84+
85+
/// Buffer of preloaded data
86+
uint8_t *buffer_data;
87+
/// Size of the buffer
88+
size_t buffer_size;
89+
/// Valid size of the buffer
90+
size_t buffer_valid;
91+
/// Position of the reader in the buffer
92+
size_t buffer_offset;
8493
};
8594

8695
/**
@@ -175,7 +184,7 @@ files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list)
175184
* @param[in] filename New file which corresponds to the new Transport Session
176185
* @return New transport session
177186
*/
178-
struct ipx_session *
187+
static struct ipx_session *
179188
session_open(ipx_ctx_t *ctx, const char *filename)
180189
{
181190
struct ipx_session *res;
@@ -215,7 +224,7 @@ session_open(ipx_ctx_t *ctx, const char *filename)
215224
* @param[in] ctx Plugin context (for sending notification and log)
216225
* @param[in] session Transport Session to close
217226
*/
218-
void
227+
static void
219228
session_close(ipx_ctx_t *ctx, struct ipx_session *session)
220229
{
221230
ipx_msg_session_t *msg_session;
@@ -275,7 +284,7 @@ session_close(ipx_ctx_t *ctx, struct ipx_session *session)
275284
* @return #IPX_ERR_EOF if no more files are available
276285
* @return #iPX_ERR_NOMEM in case of a memory allocation error
277286
*/
278-
int
287+
static int
279288
next_file(struct plugin_data *data)
280289
{
281290
size_t idx_next;
@@ -284,8 +293,8 @@ next_file(struct plugin_data *data)
284293
const char *name_new = NULL;
285294

286295
// Signalize close of the current Transport Session
287-
session_close(data->ctx, data->ts);
288-
data->ts = NULL;
296+
session_close(data->ctx, data->current_ts);
297+
data->current_ts = NULL;
289298
if (data->current_file) {
290299
fclose(data->current_file);
291300
data->current_file = NULL;
@@ -328,15 +337,81 @@ next_file(struct plugin_data *data)
328337
}
329338

330339
// Signalize open of the new Transport Session
331-
data->ts = session_open(data->ctx, name_new);
332-
if (!data->ts) {
340+
data->current_ts = session_open(data->ctx, name_new);
341+
if (!data->current_ts) {
333342
fclose(file_new);
334343
return IPX_ERR_NOMEM;
335344
}
336345

337346
IPX_CTX_INFO(data->ctx, "Reading from file '%s'...", name_new);
338347
data->current_file = file_new;
339348
data->current_name = name_new;
349+
350+
data->buffer_valid = 0;
351+
data->buffer_offset = 0;
352+
return IPX_OK;
353+
}
354+
355+
/**
356+
* @brief Get the next chunk of data
357+
*
358+
* Reads the chunk from the internal buffer with preloaded content of the file.
359+
* If the buffer doesn't contain required amount of data, new content will be
360+
* loaded from the file. Nevertheless, if the end-of-file has been reached, return
361+
* codes #IPX_ERR_EOF or #IPX_ERR_FORMAT might be returned.
362+
*
363+
* @param[in] data Plugin data
364+
* @param[out] out Output buffer to fill
365+
* @param[in] out_size Size of the output buffer (i.e. required amount of data)
366+
*
367+
* @return #IPX_OK on success
368+
* @return #IPX_ERR_EOF if the end-of-file has been reached (no more data)
369+
* @return #IPX_ERR_FORMAT if the end-of-file has been reached but the internal
370+
* buffer doesn't contain required amount of data
371+
*/
372+
static int
373+
next_chunk(struct plugin_data *data, uint8_t *out, uint16_t out_size)
374+
{
375+
size_t buffer_avail = data->buffer_valid - data->buffer_offset;
376+
uint8_t *reader_ptr = &data->buffer_data[data->buffer_offset];
377+
378+
size_t new_size;
379+
uint8_t *new_ptr;
380+
size_t ret;
381+
382+
// Check if the chunk is fully in the buffer
383+
if (buffer_avail >= out_size) {
384+
memcpy(out, reader_ptr, out_size);
385+
data->buffer_offset += out_size;
386+
return IPX_OK;
387+
}
388+
389+
// We need to load new data to the buffer
390+
if (buffer_avail > 0) {
391+
// A fragment of an unprocessed IPFIX Message must be preserved
392+
memcpy(data->buffer_data, reader_ptr, buffer_avail);
393+
data->buffer_valid = buffer_avail;
394+
} else {
395+
data->buffer_valid = 0;
396+
}
397+
398+
new_size = data->buffer_size - data->buffer_valid;
399+
new_ptr = &data->buffer_data[data->buffer_valid];
400+
ret = fread(new_ptr, 1, new_size, data->current_file);
401+
data->buffer_valid += ret;
402+
data->buffer_offset = 0;
403+
404+
// Check whether the EOF has been reached
405+
if (data->buffer_valid == 0 && feof(data->current_file)) {
406+
return IPX_ERR_EOF;
407+
}
408+
409+
if (data->buffer_valid < out_size) {
410+
return IPX_ERR_FORMAT;
411+
}
412+
413+
memcpy(out, data->buffer_data, out_size);
414+
data->buffer_offset += out_size;
340415
return IPX_OK;
341416
}
342417

@@ -346,10 +421,11 @@ next_file(struct plugin_data *data)
346421
* @param[in] data Plugin data
347422
* @param[out] msg IPFIX Message extracted from the file
348423
* @return #IPX_OK on success
424+
* @return #IPX_ERR_EOF if the end-of-file has been reached
349425
* @return #IPX_ERR_FORMAT if the file is malformed
350426
* @return #IPX_ERR_NOMEM in case of a memory allocation error
351427
*/
352-
int
428+
static int
353429
next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
354430
{
355431
struct fds_ipfix_msg_hdr ipfix_hdr;
@@ -358,23 +434,21 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
358434

359435
struct ipx_msg_ctx ipfix_ctx;
360436
ipx_msg_ipfix_t *ipfix_msg;
361-
size_t rc;
437+
int ret;
362438

363439
if (!data->current_file) {
364440
return IPX_ERR_EOF;
365441
}
366442

367-
// Get the IPFIX header
368-
rc = fread(&ipfix_hdr, 1, FDS_IPFIX_MSG_HDR_LEN, data->current_file);
369-
if (rc != FDS_IPFIX_MSG_HDR_LEN) {
370-
// Check if the end of file has been reached
371-
if (rc == 0 && feof(data->current_file)) {
372-
return IPX_ERR_EOF;
443+
// Get the IPFIX Message header
444+
ret = next_chunk(data, (uint8_t *) &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);
445+
if (ret != IPX_OK) {
446+
if (ret == IPX_ERR_FORMAT) {
447+
IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
448+
data->current_name);
373449
}
374450

375-
IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
376-
data->current_name);
377-
return IPX_ERR_FORMAT;
451+
return ret;
378452
}
379453

380454
ipfix_size = ntohs(ipfix_hdr.length);
@@ -389,15 +463,14 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
389463
IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
390464
return IPX_ERR_NOMEM;
391465
}
392-
393466
memcpy(ipfix_data, &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);
394467

395-
// Get the rest of the message body
468+
// Get the rest of the IPFIX Message body
396469
if (ipfix_size > FDS_IPFIX_MSG_HDR_LEN) {
397470
uint8_t *data_ptr = ipfix_data + FDS_IPFIX_MSG_HDR_LEN;
398-
size_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;
471+
uint16_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;
399472

400-
if (fread(data_ptr, 1, size_remain, data->current_file) != size_remain) {
473+
if (next_chunk(data, data_ptr, size_remain) != IPX_OK) {
401474
IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
402475
data->current_name);
403476
free(ipfix_data);
@@ -407,7 +480,7 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
407480

408481
// Wrap the IPFIX Message
409482
memset(&ipfix_ctx, 0, sizeof(ipfix_ctx));
410-
ipfix_ctx.session = data->ts;
483+
ipfix_ctx.session = data->current_ts;
411484
ipfix_ctx.odid = ntohl(ipfix_hdr.odid);
412485
ipfix_ctx.stream = 0;
413486

@@ -441,8 +514,18 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
441514
return IPX_ERR_DENIED;
442515
}
443516

517+
// Initialize reader buffer
518+
data->buffer_size = data->cfg->bsize;
519+
data->buffer_data = malloc(sizeof(uint8_t) * data->buffer_size);
520+
if (!data->buffer_data) {
521+
IPX_CTX_ERROR(ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
522+
free(data);
523+
return IPX_ERR_DENIED;
524+
}
525+
444526
// Prepare list of all files to read
445527
if (files_list_get(ctx, data->cfg->path, &data->file_list) != IPX_OK) {
528+
free(data->buffer_data);
446529
config_destroy(data->cfg);
447530
free(data);
448531
return IPX_ERR_DENIED;
@@ -458,14 +541,15 @@ ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg)
458541
struct plugin_data *data = (struct plugin_data *) cfg;
459542

460543
// Close the current session and file
461-
session_close(ctx, data->ts);
544+
session_close(ctx, data->current_ts);
462545
if (data->current_file) {
463546
fclose(data->current_file);
464547
}
465548

466549
// Final cleanup
467550
files_list_free(&data->file_list);
468551
config_destroy(data->cfg);
552+
free(data->buffer_data);
469553
free(data);
470554
}
471555

@@ -509,18 +593,18 @@ ipx_plugin_session_close(ipx_ctx_t *ctx, void *cfg, const struct ipx_session *se
509593
{
510594
struct plugin_data *data = (struct plugin_data *) cfg;
511595
// Do NOT dereference the session pointer because it can be already freed!
512-
if (session != data->ts) {
596+
if (session != data->current_ts) {
513597
// The session has been already closed
514598
return;
515599
}
516600

517601
// Close the current session and file
518-
session_close(ctx, data->ts);
602+
session_close(ctx, data->current_ts);
519603
if (data->current_file) {
520604
fclose(data->current_file);
521605
}
522606

523-
data->ts = NULL;
607+
data->current_ts = NULL;
524608
data->current_file = NULL;
525609
data->current_name = NULL;
526610
}

0 commit comments

Comments
 (0)