Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines),
"Allows to skip empty lines."
},

{
FLB_CONFIG_MAP_BOOL, "truncate_long_lines", "false",
0, FLB_TRUE, offsetof(struct flb_tail_config, truncate_long_lines),
"Truncate overlong lines after input encoding to UTF-8"
},
#ifdef __linux__
{
FLB_CONFIG_MAP_BOOL, "file_cache_advise", "true",
Expand Down
28 changes: 23 additions & 5 deletions plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
if (sec == 0 && nsec == 0) {
flb_plg_error(ctx->ins, "invalid 'refresh_interval' config "
"value (%s)", tmp);
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}

Expand All @@ -192,7 +192,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
/* Config: seconds interval to monitor file after rotation */
if (ctx->rotate_wait <= 0) {
flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value");
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}

Expand All @@ -215,7 +215,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
}
else {
flb_plg_error(ctx->ins, "invalid encoding 'unicode.encoding' value");
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}
}
Expand All @@ -230,11 +230,20 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
}
else {
flb_plg_error(ctx->ins, "invalid encoding 'generic.encoding' value %s", tmp);
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}
}

#ifdef FLB_HAVE_UNICODE_ENCODER
if (ctx->preferred_input_encoding != FLB_UNICODE_ENCODING_UNSPECIFIED &&
ctx->generic_input_encoding_type != FLB_GENERIC_UNSPECIFIED) {
flb_plg_error(ctx->ins,
"'unicode.encoding' and 'generic.encoding' cannot be specified at the same time");
flb_tail_config_destroy(ctx);
return NULL;
}
#endif
#ifdef FLB_HAVE_PARSER
/* Config: multi-line support */
if (ctx->multiline == FLB_TRUE) {
Expand All @@ -258,7 +267,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
/* Validate buffer limit */
if (ctx->buf_chunk_size > ctx->buf_max_size) {
flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk");
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}

Expand Down Expand Up @@ -485,6 +494,13 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
"multiline_truncated_total",
"Total number of truncated occurences for multilines",
1, (char *[]) {"name"});
ctx->cmt_long_line_truncated = \
cmt_counter_create(ins->cmt,
"fluentbit", "input",
"long_line_truncated_total",
"Total number of truncated occurences for long lines",
1, (char *[]) {"name"});

/* OLD metrics */
flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
"files_opened", ctx->ins->metrics);
Expand All @@ -494,6 +510,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
"files_rotated", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED,
"multiline_truncated", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_L_TRUNCATED,
"long_line_truncated", ctx->ins->metrics);
#endif

return ctx;
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */
#define FLB_TAIL_METRIC_L_TRUNCATED 104 /* number of truncated occurrences of long lines */
#endif

struct flb_tail_config {
Expand All @@ -54,6 +55,7 @@ struct flb_tail_config {
/* Buffer Config */
size_t buf_chunk_size; /* allocation chunks */
size_t buf_max_size; /* max size of a buffer */
int truncate_long_lines; /* truncate long lines after re-encode */

/* Static files processor */
size_t static_batch_size;
Expand Down Expand Up @@ -169,6 +171,7 @@ struct flb_tail_config {
struct cmt_counter *cmt_files_closed;
struct cmt_counter *cmt_files_rotated;
struct cmt_counter *cmt_multiline_truncated;
struct cmt_counter *cmt_long_line_truncated;

/* Hash: hash tables for quick acess to registered files */
struct flb_hash_table *static_hash;
Expand Down
136 changes: 132 additions & 4 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,26 @@ static FLB_INLINE const char *flb_skip_leading_zeros_simd(const char *data, cons
return data;
}

/* Return a UTF-8 safe cut position <= max */
static size_t utf8_safe_truncate_pos(const char *s, size_t len, size_t max)
{
size_t cut = 0;

cut = (len <= max) ? len : max;
if (cut == len) {
return cut;
}

/* backtrack over continuation bytes 10xxxxxx
* NOTE: check the last INCLUDED byte => s[cut-1], not s[cut].
*/
while (cut > 0 && ((unsigned char)s[cut - 1] & 0xC0) == 0x80) {
cut--;
}

return cut;
}

static int process_content(struct flb_tail_file *file, size_t *bytes)
{
size_t len;
Expand All @@ -481,6 +501,14 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
#ifdef FLB_HAVE_UNICODE_ENCODER
size_t decoded_len;
#endif
size_t cut = 0;
size_t eff_max = 0;
size_t cur_cap = 0;
size_t dec_len = 0;
size_t window = 0;
int truncation_happened = FLB_FALSE;
size_t bytes_override = 0;
void *nl = NULL;
#ifdef FLB_HAVE_METRICS
uint64_t ts;
char *name;
Expand Down Expand Up @@ -530,7 +558,8 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
end - data);
if (ret > 0) {
data = decoded;
end = data + strlen(decoded);
/* Generic encoding conversion returns decoded length precisely with ret. */
end = data + (size_t) ret;
}
else {
flb_plg_error(ctx->ins, "encoding failed '%.*s' with status %d", end - data, data, ret);
Expand All @@ -542,6 +571,76 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
data = (char *)flb_skip_leading_zeros_simd(data, end, &processed_bytes);
}

if (ctx->truncate_long_lines == FLB_TRUE) {
/* Determine the effective truncation threshold (eff_max) */
/* Use the smaller of buf_max_size or the current buf_size */
if (ctx->buf_max_size > 0) {
eff_max = ctx->buf_max_size - 1;
}
else {
eff_max = 0;
}
if (file->buf_size > 0) {
cur_cap = file->buf_size - 1;
}
else {
cur_cap = 0;
}
if (cur_cap > 0 && (eff_max == 0 || cur_cap < eff_max)) {
eff_max = cur_cap;
}

/* Set the search window for memchr. Add 1 because memchr is (ptr, char, size) */
if (eff_max > 0) {
window = eff_max + 1;
}
else {
window = 0;
}
dec_len = (size_t)(end - data);
window = ctx->buf_max_size + 1;
if (window > dec_len) {
window = dec_len;
}

nl = memchr(data, '\n', window);
if (file->buf_size >= ctx->buf_max_size &&
nl == NULL && eff_max > 0 && dec_len >= eff_max) {
if (file->skip_next == FLB_TRUE) {
bytes_override = (original_len > 0) ? original_len : file->buf_len;
goto truncation_end;
}
cut = utf8_safe_truncate_pos(data, dec_len, eff_max);

if (cut > 0) {
if (ctx->multiline == FLB_TRUE) {
flb_tail_mult_flush(file, ctx);
}

flb_tail_file_pack_line(NULL, data, cut, file, processed_bytes);
file->skip_next = FLB_TRUE;

#ifdef FLB_HAVE_METRICS
cmt_counter_inc(ctx->cmt_long_line_truncated,
cfl_time_now(), 1,
(char*[]){ (char*) flb_input_name(ctx->ins) });
/* Old api */
flb_metrics_sum(FLB_TAIL_METRIC_L_TRUNCATED, 1, ctx->ins->metrics);
#endif
if (original_len > 0) {
bytes_override = original_len;
}
else {
bytes_override = file->buf_len;
}
truncation_happened = FLB_TRUE;

lines++;
goto truncation_end;
}
}
}

while (data < end && (p = memchr(data, '\n', end - data))) {
len = (p - data);
crlf = 0;
Expand Down Expand Up @@ -700,6 +799,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
file->last_processed_bytes = processed_bytes;
}

truncation_end:
if (decoded) {
flb_free(decoded);
decoded = NULL;
Expand All @@ -709,9 +809,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)

if (lines > 0) {
/* Append buffer content to a chunk */
if (original_len > 0) {
if (truncation_happened) {
*bytes = bytes_override;
}
else if (original_len > 0) {
*bytes = original_len;
} else {
}
else {
*bytes = processed_bytes;
}

Expand Down Expand Up @@ -1506,12 +1610,13 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
size_t file_buffer_capacity;
size_t stream_data_length;
ssize_t raw_data_length;
size_t processed_bytes;
size_t processed_bytes = 0;
uint8_t *read_buffer;
size_t read_size;
size_t size;
char *tmp;
int ret;
int lines;
struct flb_tail_config *ctx;

/* Check if we the engine issued a pause */
Expand All @@ -1529,6 +1634,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
* If there is no more room for more data, try to increase the
* buffer under the limit of buffer_max_size.
*/
if (ctx->truncate_long_lines == FLB_TRUE) {
lines = process_content(file, &processed_bytes);
if (lines < 0) {
flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR",
file->inode, file->name);
return FLB_TAIL_ERROR;
}

if (lines > 0) {
file->stream_offset += processed_bytes;
file->last_processed_bytes = 0;
consume_bytes(file->buf_data, processed_bytes, file->buf_len);
file->buf_len -= processed_bytes;
file->buf_data[file->buf_len] = '\0';

#ifdef FLB_HAVE_SQLDB
if (file->config->db) {
flb_tail_db_file_offset(file, file->config);
}
#endif
return adjust_counters(ctx, file);
}
}
if (file->buf_size >= ctx->buf_max_size) {
if (ctx->skip_long_lines == FLB_FALSE) {
flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, "
Expand Down
Loading
Loading