Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 134 additions & 3 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_utils.h>
#include <mbedtls/sha256.h>

#include "kafka_config.h"
#include "kafka_topic.h"
Expand Down Expand Up @@ -66,6 +67,39 @@ void cb_kafka_logger(const rd_kafka_t *rk, int level,
}
}

/* update mbedtls_sha256_context with a key/value object from the message */
static int kafka_hash(struct flb_out_kafka *ctx, char* *hash_buf, int *hash_buf_size,
mbedtls_sha256_context *sha256_ctx, msgpack_object *msg_obj)
{
int ret = 0;
int max_increase = 14; // max hash_buffer memory: hash_buf_size(512) * 2^14 = 8MB
int increase_count = 0;

while (increase_count < max_increase) {
ret = msgpack_object_print_buffer(*hash_buf, *hash_buf_size, *msg_obj);
if (ret != 0) {
ret = mbedtls_sha256_update_ret(sha256_ctx, (const unsigned char *) *hash_buf, ret);
if (ret != 0) {
flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size);
break;
}
return 1;
}
increase_count++;
flb_plg_debug(ctx->ins, "increasing hash_buf: %d * 2", *hash_buf_size);
*hash_buf_size = *hash_buf_size * 2;
*hash_buf = flb_realloc(*hash_buf, *hash_buf_size);
if (!*hash_buf) {
flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size);
break;
}
}
flb_plg_warn(ctx->ins, "hash: max_increase reached - can't increase hash_buffer");
flb_free(*hash_buf);
*hash_buf = NULL;
return 0;
}

static int cb_kafka_init(struct flb_output_instance *ins,
struct flb_config *config,
void *data)
Expand Down Expand Up @@ -105,6 +139,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_object key;
msgpack_object val;
flb_sds_t s;
uint8_t hash[32] = {0};
char *hash_buf = NULL;
char hash_formatted[64] = {'\0'};
int *hash_buf_size = &(int){512};
int hash_hex_length;
mbedtls_sha256_context sha256_ctx;
int sha256_ret = 0;
int j;
int size_for_hash = 0;

#ifdef FLB_HAVE_AVRO_ENCODER
// used to flag when a buffer needs to be freed for avro
Expand Down Expand Up @@ -141,13 +184,50 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
if (flb_log_check(FLB_LOG_DEBUG))
msgpack_object_print(stderr, *map);

/* increase size for the timestamp */
size = map->via.map.size;
size++;

if (ctx->hash) {
/* init mbedtls_sha256 and hash_buffer */
mbedtls_sha256_init(&sha256_ctx);
sha256_ret = mbedtls_sha256_starts_ret(&sha256_ctx, 0);
if (sha256_ret != 0) {
flb_plg_warn(ctx->ins, "can't init mbedtls_sha256, disable hash generation");
}
else {
hash_buf = flb_malloc(*hash_buf_size);
if (!hash_buf) {
flb_errno();
flb_plg_warn(ctx->ins, "can't init hash_buffer, disable hash generation");
mbedtls_sha256_free(&sha256_ctx);
}
/* update hash with msg timestamp */
else {
snprintf(hash_formatted, sizeof(hash_formatted)-1, "%"PRIu64 , (uint64_t) tm->tm.tv_nsec);
ret = mbedtls_sha256_update_ret(&sha256_ctx, (const unsigned char *) hash_formatted, sizeof(hash_formatted)-1);
if (ret != 0) {
flb_plg_warn(ctx->ins, "can't update mbedtls_sha256 with timestamp, disable hash generation");
if (hash_buf) {
flb_free(hash_buf);
hash_buf = NULL;
}
mbedtls_sha256_free(&sha256_ctx);
}
/* hash init successful, increase msgpack size for additional hash */
else {
size++;
size_for_hash = 1;
}
}
}
}
/* Init temporal buffers */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
/* Make room for the timestamp */
size = map->via.map.size + 1;
msgpack_pack_map(&mp_pck, size);

/* Pack timestamp */
Expand Down Expand Up @@ -181,7 +261,6 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
}
}
else {
size = map->via.map.size;
msgpack_pack_map(&mp_pck, size);
}

Expand All @@ -192,6 +271,19 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_pack_object(&mp_pck, key);
msgpack_pack_object(&mp_pck, val);

if (ctx->hash && hash_buf) {
ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &val);
if (ret) {
ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &key);
}
if (!ret) {
if (hash_buf) {
flb_free(hash_buf);
}
mbedtls_sha256_free(&sha256_ctx);
}
}

/* Lookup message key */
if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
if (key.via.str.size == ctx->message_key_field_len &&
Expand Down Expand Up @@ -259,6 +351,35 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
}
}

if (ctx->hash) {
if (hash_buf) {
hash_hex_length = 64;
sha256_ret = mbedtls_sha256_finish_ret(&sha256_ctx, hash);
if (sha256_ret != 0) {
flb_plg_warn(ctx->ins, "hash generation error\n");
}
mbedtls_sha256_free(&sha256_ctx);
i = 0;
for(j = 0; j < 32; j++) {
sprintf(hash_formatted+(j+i), "%02x", (int) hash[j]);
i++;
}
flb_plg_debug(ctx->ins, "generated hash: <%s>\n", hash_formatted);
flb_free(hash_buf);
}
else {
flb_plg_warn(ctx->ins, "set hash to 'failed'\n");
hash_hex_length = 6;
strncpy(hash_formatted, "failed", hash_hex_length + 1);
}
if (size_for_hash) {
msgpack_pack_str(&mp_pck, ctx->hash_key_len);
msgpack_pack_str_body(&mp_pck, ctx->hash_key, ctx->hash_key_len);
msgpack_pack_str(&mp_pck, hash_hex_length);
msgpack_pack_str_body(&mp_pck, hash_formatted, hash_hex_length);
}
}

if (ctx->format == FLB_KAFKA_FMT_JSON) {
s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
if (!s) {
Expand Down Expand Up @@ -569,6 +690,16 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the level key for gelf output."
},
{
FLB_CONFIG_MAP_BOOL, "hash", "false",
0, FLB_TRUE, offsetof(struct flb_out_kafka, hash),
"Add sha256 hash to the message."
},
{
FLB_CONFIG_MAP_STR, "hash_key", FLB_KAFKA_HASH_KEY,
0, FLB_TRUE, offsetof(struct flb_out_kafka, hash_key),
"Set the hash key for the message hash."
},
#ifdef FLB_HAVE_AVRO_ENCODER
{
FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL,
Expand Down Expand Up @@ -619,4 +750,4 @@ struct flb_output_plugin out_kafka_plugin = {
.cb_exit = cb_kafka_exit,
.config_map = config_map,
.flags = 0
};
};
26 changes: 26 additions & 0 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
ctx->topic_key_len = strlen(ctx->topic_key);
}

/* Config: Hash */
if (ctx->hash) {
ctx->hash_key_len = strlen(ctx->hash_key);
}
if (ctx->hash_key) {
ctx->hash_key_len = strlen(ctx->hash_key);
}
else {
ctx->hash_key_len = 0;
}

/* Config: Hash_Key */
tmp = flb_output_get_property("hash_key", ins);
if (tmp) {
ctx->hash_key = flb_strdup(tmp);
ctx->hash_key_len = strlen(tmp);
}
else {
ctx->hash_key = flb_strdup(FLB_KAFKA_HASH_KEY);
ctx->hash_key_len = strlen(FLB_KAFKA_HASH_KEY);
}

/* Config: Format */
if (ctx->format_str) {
if (strcasecmp(ctx->format_str, "json") == 0) {
Expand Down Expand Up @@ -232,6 +254,10 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
flb_free(ctx->message_key_field);
}

if (ctx->hash_key) {
flb_free(ctx->hash_key);
}

flb_sds_destroy(ctx->gelf_fields.timestamp_key);
flb_sds_destroy(ctx->gelf_fields.host_key);
flb_sds_destroy(ctx->gelf_fields.short_message_key);
Expand Down
5 changes: 5 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#define FLB_KAFKA_FMT_AVRO 3
#endif
#define FLB_KAFKA_TS_KEY "@timestamp"
#define FLB_KAFKA_HASH_KEY "_id"
#define FLB_KAFKA_QUEUE_FULL_RETRIES "10"

/* rdkafka log levels based on syslog(3) */
Expand Down Expand Up @@ -68,6 +69,10 @@ struct flb_out_kafka {
int topic_key_len;
char *topic_key;

int hash;
int hash_key_len;
char *hash_key;

int timestamp_key_len;
char *timestamp_key;
int timestamp_format;
Expand Down