Skip to content

Commit 89ff32a

Browse files
committed
out_kafka: add hash option
this PR enhances plugin out_kafka with 'hash' option. See fluent/fluent-bit-docs#321 If option 'hash' is set, an uniqe hash is added to each message configuration example: ``` Hash On Hash_Key _myid ``` the commit was tested in docker/openshift environment messages in kafka with hash field '_myid': ``` {"@timestamp":1648491317.038613,"cpu_p":1.266666666666667,"user_p":0.9,"system_p":0.3666666666666666,"cpu0.p_cpu":0.8,"cpu0.p_user":0.4,"cpu0.p_system":0.4,"cpu1.p_cpu":1.2,"cpu1.p_user":0.8,"cpu1.p_system":0.4,"cpu2.p_cpu":0.8,"cpu2.p_user":0.4,"cpu2.p_system":0.4,"cpu3.p_cpu":1.4,"cpu3.p_user":1.0,"cpu3.p_system":0.4,"cpu4.p_cpu":1.2,"cpu4.p_user":0.8,"cpu4.p_system":0.4,"cpu5.p_cpu":1.8,"cpu5.p_user":1.6,"cpu5.p_system":0.2,"_id":"8a726e3841ebe225017073cd79d6a34ece3c3f9b20ae155d48610f3f99d49ef6"} {"@timestamp":1648491313.039214,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"52e3e91e42d99eebd02cbfd863f9df979a8147615dbe979cbaf7c1fc8be8c107"} {"@timestamp":1648491314.041181,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"800535ad134c2dea06580233358665ca769698c95f7fb5ead34523de420df450"} {"@timestamp":1648491315.041067,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"e6cd998b20207c71d65e7037e998aeefe36029739cb5fd1879b10252deaf5295"} {"@timestamp":1648491316.038793,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"6cc2cb91766c333999842665c2467e03af6e502c9a75a8a22f6b0afe720bf31c"} {"@timestamp":1648491317.038671,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"149c5b304277efcf94a420425dfe074346c73f4b0ad809d6609aec5fda02386a"} {"@timestamp":1648491318.038539,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"19b05475c35bcad1880737ea4c060b31480927ee8066f238c9cf13db839d192b"} ``` Signed-off-by: Michael Voelker <[email protected]>
1 parent 67b1443 commit 89ff32a

File tree

3 files changed

+166
-3
lines changed

3 files changed

+166
-3
lines changed

plugins/out_kafka/kafka.c

Lines changed: 135 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
23

34
/* Fluent Bit
@@ -21,6 +22,7 @@
2122
#include <fluent-bit/flb_time.h>
2223
#include <fluent-bit/flb_pack.h>
2324
#include <fluent-bit/flb_utils.h>
25+
#include <mbedtls/sha256.h>
2426

2527
#include "kafka_config.h"
2628
#include "kafka_topic.h"
@@ -66,6 +68,39 @@ void cb_kafka_logger(const rd_kafka_t *rk, int level,
6668
}
6769
}
6870

71+
/* update mbedtls_sha256_context with a key/value object from the message */
72+
static int kafka_hash(struct flb_out_kafka *ctx, char* *hash_buf, int *hash_buf_size,
73+
mbedtls_sha256_context *sha256_ctx, msgpack_object *msg_obj)
74+
{
75+
int ret = 0;
76+
int max_increase = 14; // max hash_buffer memory: hash_buf_size(512) * 2^14 = 8MB
77+
int increase_count = 0;
78+
79+
while (increase_count < max_increase) {
80+
ret = msgpack_object_print_buffer(*hash_buf, *hash_buf_size, *msg_obj);
81+
if (ret != 0) {
82+
ret = mbedtls_sha256_update_ret(sha256_ctx, (const unsigned char *) *hash_buf, ret);
83+
if (ret != 0) {
84+
flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size);
85+
break;
86+
}
87+
return 1;
88+
}
89+
increase_count++;
90+
flb_plg_debug(ctx->ins, "increasing hash_buf: %d * 2", *hash_buf_size);
91+
*hash_buf_size = *hash_buf_size * 2;
92+
*hash_buf = flb_realloc(*hash_buf, *hash_buf_size);
93+
if (!*hash_buf) {
94+
flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size);
95+
break;
96+
}
97+
}
98+
flb_plg_warn(ctx->ins, "hash: max_increase reached - can't increase hash_buffer");
99+
flb_free(*hash_buf);
100+
*hash_buf = NULL;
101+
return 0;
102+
}
103+
69104
static int cb_kafka_init(struct flb_output_instance *ins,
70105
struct flb_config *config,
71106
void *data)
@@ -105,6 +140,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
105140
msgpack_object key;
106141
msgpack_object val;
107142
flb_sds_t s;
143+
uint8_t hash[32] = {0};
144+
char *hash_buf = NULL;
145+
char hash_formatted[64] = {'\0'};
146+
int *hash_buf_size = &(int){512};
147+
int hash_hex_length;
148+
mbedtls_sha256_context sha256_ctx;
149+
int sha256_ret = 0;
150+
int j;
151+
int size_for_hash = 0;
108152

109153
#ifdef FLB_HAVE_AVRO_ENCODER
110154
// used to flag when a buffer needs to be freed for avro
@@ -141,13 +185,50 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
141185
if (flb_log_check(FLB_LOG_DEBUG))
142186
msgpack_object_print(stderr, *map);
143187

188+
/* increase size for the timestamp */
189+
size = map->via.map.size;
190+
size++;
191+
192+
if (ctx->hash) {
193+
/* init mbedtls_sha256 and hash_buffer */
194+
mbedtls_sha256_init(&sha256_ctx);
195+
sha256_ret = mbedtls_sha256_starts_ret(&sha256_ctx, 0);
196+
if (sha256_ret != 0) {
197+
flb_plg_warn(ctx->ins, "can't init mbedtls_sha256, disable hash generation");
198+
}
199+
else {
200+
hash_buf = flb_malloc(*hash_buf_size);
201+
if (!hash_buf) {
202+
flb_errno();
203+
flb_plg_warn(ctx->ins, "can't init hash_buffer, disable hash generation");
204+
mbedtls_sha256_free(&sha256_ctx);
205+
}
206+
/* update hash with msg timestamp */
207+
else {
208+
snprintf(hash_formatted, sizeof(hash_formatted)-1, "%"PRIu64 , (uint64_t) tm->tm.tv_nsec);
209+
ret = mbedtls_sha256_update_ret(&sha256_ctx, (const unsigned char *) hash_formatted, sizeof(hash_formatted)-1);
210+
if (ret != 0) {
211+
flb_plg_warn(ctx->ins, "can't update mbedtls_sha256 with timestamp, disable hash generation");
212+
if (hash_buf) {
213+
flb_free(hash_buf);
214+
hash_buf = NULL;
215+
}
216+
mbedtls_sha256_free(&sha256_ctx);
217+
}
218+
/* hash init successful, increase msgpack size for additional hash */
219+
else {
220+
size++;
221+
size_for_hash = 1;
222+
}
223+
}
224+
}
225+
}
144226
/* Init temporal buffers */
145227
msgpack_sbuffer_init(&mp_sbuf);
146228
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
147229

148230
if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
149231
/* Make room for the timestamp */
150-
size = map->via.map.size + 1;
151232
msgpack_pack_map(&mp_pck, size);
152233

153234
/* Pack timestamp */
@@ -181,7 +262,6 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
181262
}
182263
}
183264
else {
184-
size = map->via.map.size;
185265
msgpack_pack_map(&mp_pck, size);
186266
}
187267

@@ -192,6 +272,19 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
192272
msgpack_pack_object(&mp_pck, key);
193273
msgpack_pack_object(&mp_pck, val);
194274

275+
if (ctx->hash && hash_buf) {
276+
ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &val);
277+
if (ret) {
278+
ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &key);
279+
}
280+
if (!ret) {
281+
if (hash_buf) {
282+
flb_free(hash_buf);
283+
}
284+
mbedtls_sha256_free(&sha256_ctx);
285+
}
286+
}
287+
195288
/* Lookup message key */
196289
if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
197290
if (key.via.str.size == ctx->message_key_field_len &&
@@ -259,6 +352,35 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
259352
}
260353
}
261354

355+
if (ctx->hash) {
356+
if (hash_buf) {
357+
hash_hex_length = 64;
358+
sha256_ret = mbedtls_sha256_finish_ret(&sha256_ctx, hash);
359+
if (sha256_ret != 0) {
360+
flb_plg_warn(ctx->ins, "hash generation error\n");
361+
}
362+
mbedtls_sha256_free(&sha256_ctx);
363+
i = 0;
364+
for(j = 0; j < 32; j++) {
365+
sprintf(hash_formatted+(j+i), "%02x", (int) hash[j]);
366+
i++;
367+
}
368+
flb_plg_debug(ctx->ins, "generated hash: <%s>\n", hash_formatted);
369+
flb_free(hash_buf);
370+
}
371+
else {
372+
flb_plg_warn(ctx->ins, "set hash to 'failed'\n");
373+
hash_hex_length = 6;
374+
strncpy(hash_formatted, "failed", hash_hex_length + 1);
375+
}
376+
if (size_for_hash) {
377+
msgpack_pack_str(&mp_pck, ctx->hash_key_len);
378+
msgpack_pack_str_body(&mp_pck, ctx->hash_key, ctx->hash_key_len);
379+
msgpack_pack_str(&mp_pck, hash_hex_length);
380+
msgpack_pack_str_body(&mp_pck, hash_formatted, hash_hex_length);
381+
}
382+
}
383+
262384
if (ctx->format == FLB_KAFKA_FMT_JSON) {
263385
s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
264386
if (!s) {
@@ -569,6 +691,16 @@ static struct flb_config_map config_map[] = {
569691
0, FLB_FALSE, 0,
570692
"Set the level key for gelf output."
571693
},
694+
{
695+
FLB_CONFIG_MAP_BOOL, "hash", "false",
696+
0, FLB_TRUE, offsetof(struct flb_out_kafka, hash),
697+
"Add sha256 hash to the message."
698+
},
699+
{
700+
FLB_CONFIG_MAP_STR, "hash_key", FLB_KAFKA_HASH_KEY,
701+
0, FLB_TRUE, offsetof(struct flb_out_kafka, hash_key),
702+
"Set the hash key for the message hash."
703+
},
572704
#ifdef FLB_HAVE_AVRO_ENCODER
573705
{
574706
FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL,
@@ -619,4 +751,4 @@ struct flb_output_plugin out_kafka_plugin = {
619751
.cb_exit = cb_kafka_exit,
620752
.config_map = config_map,
621753
.flags = 0
622-
};
754+
};

plugins/out_kafka/kafka_config.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,28 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
7676
ctx->topic_key_len = strlen(ctx->topic_key);
7777
}
7878

79+
/* Config: Hash */
80+
if (ctx->hash) {
81+
ctx->hash_key_len = strlen(ctx->hash_key);
82+
}
83+
if (ctx->hash_key) {
84+
ctx->hash_key_len = strlen(ctx->hash_key);
85+
}
86+
else {
87+
ctx->hash_key_len = 0;
88+
}
89+
90+
/* Config: Hash_Key */
91+
tmp = flb_output_get_property("hash_key", ins);
92+
if (tmp) {
93+
ctx->hash_key = flb_strdup(tmp);
94+
ctx->hash_key_len = strlen(tmp);
95+
}
96+
else {
97+
ctx->hash_key = FLB_KAFKA_HASH_KEY;
98+
ctx->hash_key_len = strlen(FLB_KAFKA_HASH_KEY);
99+
}
100+
79101
/* Config: Format */
80102
if (ctx->format_str) {
81103
if (strcasecmp(ctx->format_str, "json") == 0) {
@@ -232,6 +254,10 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
232254
flb_free(ctx->message_key_field);
233255
}
234256

257+
if (ctx->hash_key) {
258+
flb_free(ctx->hash_key);
259+
}
260+
235261
flb_sds_destroy(ctx->gelf_fields.timestamp_key);
236262
flb_sds_destroy(ctx->gelf_fields.host_key);
237263
flb_sds_destroy(ctx->gelf_fields.short_message_key);

plugins/out_kafka/kafka_config.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#define FLB_KAFKA_FMT_AVRO 3
3636
#endif
3737
#define FLB_KAFKA_TS_KEY "@timestamp"
38+
#define FLB_KAFKA_HASH_KEY "_id"
3839
#define FLB_KAFKA_QUEUE_FULL_RETRIES "10"
3940

4041
/* rdkafka log levels based on syslog(3) */
@@ -68,6 +69,10 @@ struct flb_out_kafka {
6869
int topic_key_len;
6970
char *topic_key;
7071

72+
int hash;
73+
int hash_key_len;
74+
char *hash_key;
75+
7176
int timestamp_key_len;
7277
char *timestamp_key;
7378
int timestamp_format;

0 commit comments

Comments
 (0)