Skip to content

Commit 8bff0a2

Browse files
committed
in_tail: add flb_encoding support to in_tail-plugin
* Adds new option: "encoding" to in_tail. * If encoding fails. message is skipped. Signed-off-by: Jukka Pihl <[email protected]>
1 parent c8ed626 commit 8bff0a2

File tree

4 files changed

+62
-1
lines changed

4 files changed

+62
-1
lines changed

plugins/in_tail/tail.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,14 @@ static struct flb_config_map config_map[] = {
576576
},
577577
#endif
578578

579+
#ifdef FLB_HAVE_UTF8_ENCODER
580+
{
581+
FLB_CONFIG_MAP_STR, "encoding", NULL,
582+
0, FLB_FALSE, 0,
583+
"specify the charset encoder to decode message",
584+
},
585+
#endif
586+
579587
/* Multiline Options */
580588
#ifdef FLB_HAVE_PARSER
581589
{

plugins/in_tail/tail_config.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
5656
#ifdef FLB_HAVE_SQLDB
5757
ctx->db_sync = -1;
5858
#endif
59+
#ifdef FLB_HAVE_UTF8_ENCODER
60+
ctx->encoding = NULL;
61+
#endif
5962

6063
/* Load the config map */
6164
ret = flb_input_config_map_set(ins, (void *) ctx);
@@ -151,6 +154,18 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
151154
}
152155
#endif
153156

157+
#ifdef FLB_HAVE_UTF8_ENCODER
158+
tmp = flb_input_get_property("encoding", ins);
159+
if (tmp) {
160+
ctx->encoding = flb_encoding_open(tmp);
161+
if (!ctx->encoding) {
162+
flb_plg_error(ctx->ins,"illegal encoding: %s", tmp);
163+
flb_tail_config_destroy(ctx);
164+
return NULL;
165+
}
166+
}
167+
#endif
168+
154169
/* Config: Docker mode */
155170
if(ctx->docker_mode == FLB_TRUE) {
156171
ret = flb_tail_dmode_create(ctx, ins, config);
@@ -344,6 +359,12 @@ int flb_tail_config_destroy(struct flb_tail_config *config)
344359
}
345360
#endif
346361

362+
#ifdef FLB_HAVE_UTF8_ENCODER
363+
if(config->encoding) {
364+
flb_encoding_close(config->encoding);
365+
}
366+
#endif
367+
347368
flb_free(config);
348369
return 0;
349370
}

plugins/in_tail/tail_config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
#ifdef FLB_HAVE_REGEX
3030
#include <fluent-bit/flb_regex.h>
3131
#endif
32+
#ifdef FLB_HAVE_UTF8_ENCODER
33+
#include <fluent-bit/flb_encoding.h>
34+
#endif
3235

3336
/* Metrics */
3437
#ifdef FLB_HAVE_METRICS
@@ -94,6 +97,10 @@ struct flb_tail_config {
9497
sqlite3_stmt *stmt_offset;
9598
#endif
9699

100+
#ifdef FLB_HAVE_UTF8_ENCODER
101+
struct flb_encoding *encoding;
102+
#endif
103+
97104
/* Parser / Format */
98105
struct flb_parser *parser;
99106

plugins/in_tail/tail_file.c

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
196196
msgpack_sbuffer *out_sbuf;
197197
msgpack_packer *out_pck;
198198
struct flb_tail_config *ctx = file->config;
199+
#ifdef FLB_HAVE_UTF8_ENCODER
200+
char *decoded;
201+
size_t decoded_len;
202+
#endif
199203

200204
/* Create a temporary msgpack buffer */
201205
msgpack_sbuffer_init(&mp_sbuf);
@@ -238,6 +242,20 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
238242
line_len = len - crlf;
239243
repl_line = NULL;
240244

245+
246+
#ifdef FLB_HAVE_UTF8_ENCODER
247+
decoded = NULL;
248+
if(ctx->encoding) {
249+
ret = flb_encoding_decode(ctx->encoding, line, line_len, &decoded, &decoded_len);
250+
if (ret != FLB_ENCODING_SUCCESS) {
251+
flb_plg_error(ctx->ins, "encoding failed '%.*s'", line_len, line);
252+
goto go_next;
253+
}
254+
line = decoded;
255+
line_len = decoded_len;
256+
}
257+
#endif
258+
241259
if (ctx->docker_mode) {
242260
ret = flb_tail_dmode_process_content(now, line, line_len,
243261
&repl_line, &repl_line_len,
@@ -288,7 +306,7 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
288306
/* Parser failed, pack raw text */
289307
flb_time_get(&out_time);
290308
flb_tail_file_pack_line(out_sbuf, out_pck, &out_time,
291-
data, len, file);
309+
line, len, file);
292310
}
293311
}
294312
else if (ctx->multiline == FLB_TRUE) {
@@ -331,6 +349,12 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
331349
processed_bytes += len + 1;
332350
file->parsed = 0;
333351
lines++;
352+
#ifdef FLB_HAVE_UTF8_ENCODER
353+
if(decoded) {
354+
flb_free(decoded);
355+
decoded = NULL;
356+
}
357+
#endif
334358
}
335359
file->parsed = file->buf_len;
336360
*bytes = processed_bytes;
@@ -343,6 +367,7 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
343367
out_sbuf->size);
344368

345369
msgpack_sbuffer_destroy(out_sbuf);
370+
346371
return lines;
347372
}
348373

0 commit comments

Comments
 (0)