Skip to content

Commit fff078f

Browse files
committed
out_kafka: add hash option
this PR enhances plugin out_kafka with 'hash' option. See fluent/fluent-bit-docs#321 If option 'hash' is set, an uniqe hash is added to each message configuration example: ``` Hash On Hash_Key _myid ``` the commit was tested in docker/openshift environment messages in kafka with hash field '_myid': ``` {"@timestamp":1648491317.038613,"cpu_p":1.266666666666667,"user_p":0.9,"system_p":0.3666666666666666,"cpu0.p_cpu":0.8,"cpu0.p_user":0.4,"cpu0.p_system":0.4,"cpu1.p_cpu":1.2,"cpu1.p_user":0.8,"cpu1.p_system":0.4,"cpu2.p_cpu":0.8,"cpu2.p_user":0.4,"cpu2.p_system":0.4,"cpu3.p_cpu":1.4,"cpu3.p_user":1.0,"cpu3.p_system":0.4,"cpu4.p_cpu":1.2,"cpu4.p_user":0.8,"cpu4.p_system":0.4,"cpu5.p_cpu":1.8,"cpu5.p_user":1.6,"cpu5.p_system":0.2,"_id":"8a726e3841ebe225017073cd79d6a34ece3c3f9b20ae155d48610f3f99d49ef6"} {"@timestamp":1648491313.039214,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"52e3e91e42d99eebd02cbfd863f9df979a8147615dbe979cbaf7c1fc8be8c107"} {"@timestamp":1648491314.041181,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"800535ad134c2dea06580233358665ca769698c95f7fb5ead34523de420df450"} {"@timestamp":1648491315.041067,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"e6cd998b20207c71d65e7037e998aeefe36029739cb5fd1879b10252deaf5295"} {"@timestamp":1648491316.038793,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"6cc2cb91766c333999842665c2467e03af6e502c9a75a8a22f6b0afe720bf31c"} {"@timestamp":1648491317.038671,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"149c5b304277efcf94a420425dfe074346c73f4b0ad809d6609aec5fda02386a"} {"@timestamp":1648491318.038539,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"19b05475c35bcad1880737ea4c060b31480927ee8066f238c9cf13db839d192b"} ``` Signed-off-by: Michael Voelker <[email protected]>
1 parent 67b1443 commit fff078f

File tree

3 files changed

+165
-3
lines changed

3 files changed

+165
-3
lines changed

plugins/out_kafka/kafka.c

Lines changed: 134 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <fluent-bit/flb_time.h>
2222
#include <fluent-bit/flb_pack.h>
2323
#include <fluent-bit/flb_utils.h>
24+
#include <mbedtls/sha256.h>
2425

2526
#include "kafka_config.h"
2627
#include "kafka_topic.h"
@@ -66,6 +67,39 @@ void cb_kafka_logger(const rd_kafka_t *rk, int level,
6667
}
6768
}
6869

70+
/* update mbedtls_sha256_context with a key/value object from the message */
71+
static int kafka_hash(struct flb_out_kafka *ctx, char* *hash_buf, int *hash_buf_size,
72+
mbedtls_sha256_context *sha256_ctx, msgpack_object *msg_obj)
73+
{
74+
int ret = 0;
75+
int max_increase = 14; // max hash_buffer memory: hash_buf_size(512) * 2^14 = 8MB
76+
int increase_count = 0;
77+
78+
while (increase_count < max_increase) {
79+
ret = msgpack_object_print_buffer(*hash_buf, *hash_buf_size, *msg_obj);
80+
if (ret != 0) {
81+
ret = mbedtls_sha256_update_ret(sha256_ctx, (const unsigned char *) *hash_buf, ret);
82+
if (ret != 0) {
83+
flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size);
84+
break;
85+
}
86+
return 1;
87+
}
88+
increase_count++;
89+
flb_plg_debug(ctx->ins, "increasing hash_buf: %d * 2", *hash_buf_size);
90+
*hash_buf_size = *hash_buf_size * 2;
91+
*hash_buf = flb_realloc(*hash_buf, *hash_buf_size);
92+
if (!*hash_buf) {
93+
flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size);
94+
break;
95+
}
96+
}
97+
flb_plg_warn(ctx->ins, "hash: max_increase reached - can't increase hash_buffer");
98+
flb_free(*hash_buf);
99+
*hash_buf = NULL;
100+
return 0;
101+
}
102+
69103
static int cb_kafka_init(struct flb_output_instance *ins,
70104
struct flb_config *config,
71105
void *data)
@@ -105,6 +139,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
105139
msgpack_object key;
106140
msgpack_object val;
107141
flb_sds_t s;
142+
uint8_t hash[32] = {0};
143+
char *hash_buf = NULL;
144+
char hash_formatted[64] = {'\0'};
145+
int *hash_buf_size = &(int){512};
146+
int hash_hex_length;
147+
mbedtls_sha256_context sha256_ctx;
148+
int sha256_ret = 0;
149+
int j;
150+
int size_for_hash = 0;
108151

109152
#ifdef FLB_HAVE_AVRO_ENCODER
110153
// used to flag when a buffer needs to be freed for avro
@@ -141,13 +184,50 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
141184
if (flb_log_check(FLB_LOG_DEBUG))
142185
msgpack_object_print(stderr, *map);
143186

187+
/* increase size for the timestamp */
188+
size = map->via.map.size;
189+
size++;
190+
191+
if (ctx->hash) {
192+
/* init mbedtls_sha256 and hash_buffer */
193+
mbedtls_sha256_init(&sha256_ctx);
194+
sha256_ret = mbedtls_sha256_starts_ret(&sha256_ctx, 0);
195+
if (sha256_ret != 0) {
196+
flb_plg_warn(ctx->ins, "can't init mbedtls_sha256, disable hash generation");
197+
}
198+
else {
199+
hash_buf = flb_malloc(*hash_buf_size);
200+
if (!hash_buf) {
201+
flb_errno();
202+
flb_plg_warn(ctx->ins, "can't init hash_buffer, disable hash generation");
203+
mbedtls_sha256_free(&sha256_ctx);
204+
}
205+
/* update hash with msg timestamp */
206+
else {
207+
snprintf(hash_formatted, sizeof(hash_formatted)-1, "%"PRIu64 , (uint64_t) tm->tm.tv_nsec);
208+
ret = mbedtls_sha256_update_ret(&sha256_ctx, (const unsigned char *) hash_formatted, sizeof(hash_formatted)-1);
209+
if (ret != 0) {
210+
flb_plg_warn(ctx->ins, "can't update mbedtls_sha256 with timestamp, disable hash generation");
211+
if (hash_buf) {
212+
flb_free(hash_buf);
213+
hash_buf = NULL;
214+
}
215+
mbedtls_sha256_free(&sha256_ctx);
216+
}
217+
/* hash init successful, increase msgpack size for additional hash */
218+
else {
219+
size++;
220+
size_for_hash = 1;
221+
}
222+
}
223+
}
224+
}
144225
/* Init temporal buffers */
145226
msgpack_sbuffer_init(&mp_sbuf);
146227
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
147228

148229
if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
149230
/* Make room for the timestamp */
150-
size = map->via.map.size + 1;
151231
msgpack_pack_map(&mp_pck, size);
152232

153233
/* Pack timestamp */
@@ -181,7 +261,6 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
181261
}
182262
}
183263
else {
184-
size = map->via.map.size;
185264
msgpack_pack_map(&mp_pck, size);
186265
}
187266

@@ -192,6 +271,19 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
192271
msgpack_pack_object(&mp_pck, key);
193272
msgpack_pack_object(&mp_pck, val);
194273

274+
if (ctx->hash && hash_buf) {
275+
ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &val);
276+
if (ret) {
277+
ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &key);
278+
}
279+
if (!ret) {
280+
if (hash_buf) {
281+
flb_free(hash_buf);
282+
}
283+
mbedtls_sha256_free(&sha256_ctx);
284+
}
285+
}
286+
195287
/* Lookup message key */
196288
if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
197289
if (key.via.str.size == ctx->message_key_field_len &&
@@ -259,6 +351,35 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
259351
}
260352
}
261353

354+
if (ctx->hash) {
355+
if (hash_buf) {
356+
hash_hex_length = 64;
357+
sha256_ret = mbedtls_sha256_finish_ret(&sha256_ctx, hash);
358+
if (sha256_ret != 0) {
359+
flb_plg_warn(ctx->ins, "hash generation error\n");
360+
}
361+
mbedtls_sha256_free(&sha256_ctx);
362+
i = 0;
363+
for(j = 0; j < 32; j++) {
364+
sprintf(hash_formatted+(j+i), "%02x", (int) hash[j]);
365+
i++;
366+
}
367+
flb_plg_debug(ctx->ins, "generated hash: <%s>\n", hash_formatted);
368+
flb_free(hash_buf);
369+
}
370+
else {
371+
flb_plg_warn(ctx->ins, "set hash to 'failed'\n");
372+
hash_hex_length = 6;
373+
strncpy(hash_formatted, "failed", hash_hex_length + 1);
374+
}
375+
if (size_for_hash) {
376+
msgpack_pack_str(&mp_pck, ctx->hash_key_len);
377+
msgpack_pack_str_body(&mp_pck, ctx->hash_key, ctx->hash_key_len);
378+
msgpack_pack_str(&mp_pck, hash_hex_length);
379+
msgpack_pack_str_body(&mp_pck, hash_formatted, hash_hex_length);
380+
}
381+
}
382+
262383
if (ctx->format == FLB_KAFKA_FMT_JSON) {
263384
s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
264385
if (!s) {
@@ -569,6 +690,16 @@ static struct flb_config_map config_map[] = {
569690
0, FLB_FALSE, 0,
570691
"Set the level key for gelf output."
571692
},
693+
{
694+
FLB_CONFIG_MAP_BOOL, "hash", "false",
695+
0, FLB_TRUE, offsetof(struct flb_out_kafka, hash),
696+
"Add sha256 hash to the message."
697+
},
698+
{
699+
FLB_CONFIG_MAP_STR, "hash_key", FLB_KAFKA_HASH_KEY,
700+
0, FLB_TRUE, offsetof(struct flb_out_kafka, hash_key),
701+
"Set the hash key for the message hash."
702+
},
572703
#ifdef FLB_HAVE_AVRO_ENCODER
573704
{
574705
FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL,
@@ -619,4 +750,4 @@ struct flb_output_plugin out_kafka_plugin = {
619750
.cb_exit = cb_kafka_exit,
620751
.config_map = config_map,
621752
.flags = 0
622-
};
753+
};

plugins/out_kafka/kafka_config.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,28 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
7676
ctx->topic_key_len = strlen(ctx->topic_key);
7777
}
7878

79+
/* Config: Hash */
80+
if (ctx->hash) {
81+
ctx->hash_key_len = strlen(ctx->hash_key);
82+
}
83+
if (ctx->hash_key) {
84+
ctx->hash_key_len = strlen(ctx->hash_key);
85+
}
86+
else {
87+
ctx->hash_key_len = 0;
88+
}
89+
90+
/* Config: Hash_Key */
91+
tmp = flb_output_get_property("hash_key", ins);
92+
if (tmp) {
93+
ctx->hash_key = flb_strdup(tmp);
94+
ctx->hash_key_len = strlen(tmp);
95+
}
96+
else {
97+
ctx->hash_key = FLB_KAFKA_HASH_KEY;
98+
ctx->hash_key_len = strlen(FLB_KAFKA_HASH_KEY);
99+
}
100+
79101
/* Config: Format */
80102
if (ctx->format_str) {
81103
if (strcasecmp(ctx->format_str, "json") == 0) {
@@ -232,6 +254,10 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
232254
flb_free(ctx->message_key_field);
233255
}
234256

257+
if (ctx->hash_key) {
258+
flb_free(ctx->hash_key);
259+
}
260+
235261
flb_sds_destroy(ctx->gelf_fields.timestamp_key);
236262
flb_sds_destroy(ctx->gelf_fields.host_key);
237263
flb_sds_destroy(ctx->gelf_fields.short_message_key);

plugins/out_kafka/kafka_config.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#define FLB_KAFKA_FMT_AVRO 3
3636
#endif
3737
#define FLB_KAFKA_TS_KEY "@timestamp"
38+
#define FLB_KAFKA_HASH_KEY "_id"
3839
#define FLB_KAFKA_QUEUE_FULL_RETRIES "10"
3940

4041
/* rdkafka log levels based on syslog(3) */
@@ -68,6 +69,10 @@ struct flb_out_kafka {
6869
int topic_key_len;
6970
char *topic_key;
7071

72+
int hash;
73+
int hash_key_len;
74+
char *hash_key;
75+
7176
int timestamp_key_len;
7277
char *timestamp_key;
7378
int timestamp_format;

0 commit comments

Comments
 (0)