Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ if(FLB_ALL)
# Global
set(FLB_DEBUG 1)
set(FLB_TLS 1)
set(FLB_UTF8_ENCODER 1)

# Input plugins
set(FLB_IN_CPU 1)
Expand Down Expand Up @@ -402,7 +403,8 @@ endif()

# tutf8e
if(FLB_UTF8_ENCODER)
add_subdirectory(${FLB_PATH_LIB_TUTF8E} EXCLUDE_FROM_ALL)
add_subdirectory(${FLB_PATH_LIB_TUTF8E})
FLB_DEFINITION(FLB_HAVE_UTF8_ENCODER)
endif()

# xxHash
Expand Down
44 changes: 44 additions & 0 deletions include/fluent-bit/flb_encoding.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_ENCODING_H
#define FLB_ENCODING_H

#include <tutf8e.h>


#define FLB_ENCODING_SUCCESS 0
#define FLB_ENCODING_FAILURE -1

struct flb_encoding {
TUTF8encoder encoder;
const char *invalid;
};


struct flb_encoding *flb_encoding_open(const char *encoding, const char *replacement);

int flb_encoding_decode(struct flb_encoding *ec,
char *str, size_t slen,
char **result, size_t *result_len);

void flb_encoding_close(struct flb_encoding *ic);

#endif
9 changes: 9 additions & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>

#ifdef FLB_HAVE_UTF8_ENCODER
#include <fluent-bit/flb_encoding.h>
#endif

/* Syslog modes */
#define FLB_SYSLOG_UNIX_TCP 1
#define FLB_SYSLOG_UNIX_UDP 2
Expand Down Expand Up @@ -63,6 +67,11 @@ struct flb_syslog {
struct mk_list connections;
struct mk_event_loop *evl;
struct flb_input_instance *ins;

#ifdef FLB_HAVE_UTF8_ENCODER
struct flb_encoding *encoding;
#endif

};

#endif
24 changes: 24 additions & 0 deletions plugins/in_syslog/syslog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
struct flb_config *config)
{
const char *tmp;
#ifdef FLB_HAVE_UTF8_ENCODER
const char *tmp2;
#endif
char port[16];
struct flb_syslog *ctx;

Expand Down Expand Up @@ -134,6 +137,20 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
return NULL;
}

#ifdef FLB_HAVE_UTF8_ENCODER
/* utf8 encoder */
tmp = flb_input_get_property("encoding", ins);
if (tmp) {
tmp2 = flb_input_get_property("encoding_replacement", ins);
ctx->encoding = flb_encoding_open(tmp,tmp2);
if (!ctx->encoding) {
flb_error("[in_syslog] illegal encoding: %s", tmp);
syslog_conf_destroy(ctx);
return NULL;
}
}
#endif

return ctx;
}

Expand All @@ -143,6 +160,13 @@ int syslog_conf_destroy(struct flb_syslog *ctx)
flb_free(ctx->buffer_data);
ctx->buffer_data = NULL;
}

#ifdef FLB_HAVE_UTF8_ENCODER
if(ctx->encoding) {
flb_encoding_close(ctx->encoding);
}
#endif

syslog_server_destroy(ctx);
flb_free(ctx);

Expand Down
97 changes: 54 additions & 43 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,64 @@ static inline int pack_line(struct flb_syslog *ctx,
return 0;
}


int syslog_prot_process_msg(struct flb_syslog *ctx, char *p, size_t len)
{
int ret;
void *out_buf;
size_t out_size;
struct flb_time out_time;
#ifdef FLB_HAVE_UTF8_ENCODER
char *decoded = NULL;
size_t decoded_size;
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
if (ctx->encoding) {
ret = flb_encoding_decode(ctx->encoding, p, len, &decoded, &decoded_size);
if (ret != FLB_ENCODING_SUCCESS) {
flb_plg_error(ctx->ins, "decoding failed '%.*s'", p, len);
goto finish;
}
p = decoded;
len = decoded_size;
}
#endif

/* Process the string */
ret = flb_parser_do(ctx->parser, p, len,
&out_buf, &out_size, &out_time);
if (ret < 0) {
flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'",
ctx->parser->name);
flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p);
goto finish;
}

if (flb_time_to_double(&out_time) == 0.0) {
flb_time_get(&out_time);
}

pack_line(ctx, &out_time, out_buf, out_size);
ret = 0;

finish:

#ifdef FLB_HAVE_UTF8_ENCODER
if (decoded) {
flb_free(decoded);
}
#endif
return ret;

}

int syslog_prot_process(struct syslog_conn *conn)
{
int len;
int ret;
char *p;
char *eof;
char *end;
void *out_buf;
size_t out_size;
struct flb_time out_time;
struct flb_syslog *ctx = conn->ctx;

eof = conn->buf_data;
end = conn->buf_data + conn->buf_len;
Expand Down Expand Up @@ -96,21 +143,7 @@ int syslog_prot_process(struct syslog_conn *conn)
continue;
}

/* Process the string */
ret = flb_parser_do(ctx->parser, p, len,
&out_buf, &out_size, &out_time);
if (ret >= 0) {
if (flb_time_to_double(&out_time) == 0.0) {
flb_time_get(&out_time);
}
pack_line(ctx, &out_time, out_buf, out_size);
flb_free(out_buf);
}
else {
flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'",
ctx->parser->name);
flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p);
}
syslog_prot_process_msg(conn->ctx, p, len);

conn->buf_parsed += len + 1;
end = conn->buf_data + conn->buf_len;
Expand All @@ -129,27 +162,5 @@ int syslog_prot_process(struct syslog_conn *conn)

int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
{
int ret;
void *out_buf;
size_t out_size;
struct flb_time out_time = {0};

ret = flb_parser_do(ctx->parser, buf, size,
&out_buf, &out_size, &out_time);
if (ret >= 0) {
if (flb_time_to_double(&out_time) == 0) {
flb_time_get(&out_time);
}
pack_line(ctx, &out_time, out_buf, out_size);
flb_free(out_buf);
}
else {
flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'",
ctx->parser->name);
flb_plg_debug(ctx->ins, "unparsed log message: %.*s",
(int) size, buf);
return -1;
}

return 0;
return syslog_prot_process_msg(ctx, buf, size);
}
13 changes: 13 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,19 @@ static struct flb_config_map config_map[] = {
},
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
{
FLB_CONFIG_MAP_STR, "encoding", NULL,
0, FLB_FALSE, 0,
"specify the input encoding for converting to UTF-8",
},
{
FLB_CONFIG_MAP_STR, "encoding_replacement", NULL,
0, FLB_FALSE, 0,
"Replacement in case of decoding error (default: unicode replacement char)",
},
#endif

/* Multiline Options */
#ifdef FLB_HAVE_PARSER
{
Expand Down
26 changes: 25 additions & 1 deletion plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
int i;
long nsec;
const char *tmp;
#ifdef FLB_HAVE_UTF8_ENCODER
const char *tmp2;
#endif
struct flb_tail_config *ctx;

ctx = flb_calloc(1, sizeof(struct flb_tail_config));
if (!ctx) {
flb_errno();
Expand All @@ -96,6 +98,9 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
#ifdef FLB_HAVE_SQLDB
ctx->db_sync = 1; /* sqlite sync 'normal' */
#endif
#ifdef FLB_HAVE_UTF8_ENCODER
ctx->encoding = NULL;
#endif

/* Load the config map */
ret = flb_input_config_map_set(ins, (void *) ctx);
Expand Down Expand Up @@ -190,6 +195,19 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
}
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
tmp = flb_input_get_property("encoding", ins);
if (tmp) {
tmp2 = flb_input_get_property("encoding_relacement", ins);
ctx->encoding = flb_encoding_open(tmp,tmp2);
if (!ctx->encoding) {
flb_plg_error(ctx->ins,"illegal encoding: %s", tmp);
flb_tail_config_destroy(ctx);
return NULL;
}
}
#endif

/* Config: Docker mode */
if(ctx->docker_mode == FLB_TRUE) {
ret = flb_tail_dmode_create(ctx, ins, config);
Expand Down Expand Up @@ -445,6 +463,12 @@ int flb_tail_config_destroy(struct flb_tail_config *config)
}
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
if(config->encoding) {
flb_encoding_close(config->encoding);
}
#endif

flb_free(config);
return 0;
}
7 changes: 7 additions & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#ifdef FLB_HAVE_PARSER
#include <fluent-bit/multiline/flb_ml.h>
#endif
#ifdef FLB_HAVE_UTF8_ENCODER
#include <fluent-bit/flb_encoding.h>
#endif


/* Metrics */
Expand Down Expand Up @@ -104,6 +107,10 @@ struct flb_tail_config {
sqlite3_stmt *stmt_offset;
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
struct flb_encoding *encoding;
#endif

/* Parser / Format */
struct flb_parser *parser;

Expand Down
Loading