Skip to content

Commit 465b262

Browse files
committed
out_kafka: add hash option
this PR enhances plugin out_kafka with 'hash' option. See fluent/fluent-bit-docs#321 If option 'hash' is set, an uniqe hash is added to each message configuration example: ``` Hash On Hash_Key _myid ``` the commit was tested in docker/openshift environment messages in kafka with hash field '_myid': ``` {"@timestamp":1592327982.411502,"cpu_p":1.1,"user_p":0.3666666666666666,"system_p":0.7333333333333333,"cpu0.p_cpu":1.4,"cpu0.p_user":0.4,"cpu0.p_system":1,"cpu1.p_cpu":1.2,"cpu1.p_user":0.4,"cpu1.p_system":0.8,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":1,"cpu3.p_user":0.2,"cpu3.p_system":0.8,"cpu4.p_cpu":0.6,"cpu4.p_user":0,"cpu4.p_system":0.6,"cpu5.p_cpu":0.8,"cpu5.p_user":0.2,"cpu5.p_system":0.6,"_myid":"D839ED0D670D7000"} {"@timestamp":1592327987.411792,"cpu_p":0.9666666666666666,"user_p":0.2666666666666667,"system_p":0.7,"cpu0.p_cpu":1.2,"cpu0.p_user":0.4,"cpu0.p_system":0.8,"cpu1.p_cpu":0.8,"cpu1.p_user":0.2,"cpu1.p_system":0.6,"cpu2.p_cpu":1.2,"cpu2.p_user":0.4,"cpu2.p_system":0.8,"cpu3.p_cpu":0.8,"cpu3.p_user":0.2,"cpu3.p_system":0.6,"cpu4.p_cpu":0.8,"cpu4.p_user":0.2,"cpu4.p_system":0.6,"cpu5.p_cpu":1,"cpu5.p_user":0.2,"cpu5.p_system":0.8,"_myid":"788F98C1E1632000"} {"@timestamp":1592327992.377262,"cpu_p":0.9666666666666666,"user_p":0.3666666666666666,"system_p":0.6,"cpu0.p_cpu":1.2,"cpu0.p_user":0.4,"cpu0.p_system":0.8,"cpu1.p_cpu":0.6,"cpu1.p_user":0.2,"cpu1.p_system":0.4,"cpu2.p_cpu":1.2,"cpu2.p_user":0.6,"cpu2.p_system":0.6,"cpu3.p_cpu":1,"cpu3.p_user":0.4,"cpu3.p_system":0.6,"cpu4.p_cpu":0.4,"cpu4.p_user":0.2,"cpu4.p_system":0.2,"cpu5.p_cpu":1.2,"cpu5.p_user":0.4,"cpu5.p_system":0.8,"_myid":"4321FF0183AC4000"} {"@timestamp":1592327997.378218,"cpu_p":1.1,"user_p":0.3666666666666666,"system_p":0.7333333333333333,"cpu0.p_cpu":1.6,"cpu0.p_user":0.4,"cpu0.p_system":1.2,"cpu1.p_cpu":0.8,"cpu1.p_user":0.4,"cpu1.p_system":0.4,"cpu2.p_cpu":1,"cpu2.p_user":0.4,"cpu2.p_system":0.6,"cpu3.p_cpu":1.2,"cpu3.p_user":0.4,"cpu3.p_system":0.8,"cpu4.p_cpu":0.8,"cpu4.p_user":0.2,"cpu4.p_system":0.6,"cpu5.p_cpu":1.4,"cpu5.p_user":0.4,"cpu5.p_system":1,"_myid":"207661B77DBA7A00"} ``` Signed-off-by: Michael Voelker <[email protected]>
1 parent 46e479e commit 465b262

File tree

3 files changed

+75
-0
lines changed

3 files changed

+75
-0
lines changed

plugins/out_kafka/kafka.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
8989
struct flb_kafka *ctx, struct flb_config *config)
9090
{
9191
int i;
92+
int counter;
9293
int ret;
9394
int size;
9495
int queue_full_retries = 0;
9596
char *out_buf;
97+
unsigned long message_hash = 0;
98+
/* long int -> to hex-string conversion */
99+
char message_hash_buf[32];
96100
size_t out_size;
97101
struct mk_list *head;
98102
struct mk_list *topics;
@@ -114,6 +118,11 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
114118
if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
115119
/* Make room for the timestamp */
116120
size = map->via.map.size + 1;
121+
122+
/* Make room for the message_hash */
123+
if (ctx->hash) {
124+
size++;
125+
}
117126
msgpack_pack_map(&mp_pck, size);
118127

119128
/* Pack timestamp */
@@ -148,16 +157,46 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
148157
}
149158
else {
150159
size = map->via.map.size;
160+
/* Make room for the message_hash */
161+
if (ctx->hash) {
162+
size++;
163+
}
151164
msgpack_pack_map(&mp_pck, size);
152165
}
153166

167+
if (ctx->hash) {
168+
/* Init message_hash with timestamp */
169+
message_hash = (unsigned long) flb_time_to_double(tm);
170+
}
154171
for (i = 0; i < map->via.map.size; i++) {
155172
key = map->via.map.ptr[i].key;
156173
val = map->via.map.ptr[i].val;
157174

158175
msgpack_pack_object(&mp_pck, key);
159176
msgpack_pack_object(&mp_pck, val);
160177

178+
if (ctx->hash) {
179+
if (key.via.str.size) {
180+
for (counter = 0; counter < key.via.str.size; counter++) {
181+
message_hash = key.via.str.ptr[counter] + (message_hash << 5) + message_hash;
182+
}
183+
}
184+
if (val.type == MSGPACK_OBJECT_STR) {
185+
for (counter = 0; counter < val.via.str.size; counter++) {
186+
message_hash = val.via.str.ptr[counter] + (message_hash << 5) + message_hash;
187+
}
188+
}
189+
if (val.type == MSGPACK_OBJECT_FLOAT64) {
190+
message_hash = (unsigned long) (message_hash + (val.via.f64 * 1000));
191+
}
192+
if (val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
193+
message_hash = (unsigned long) (message_hash + (val.via.u64 << 6));
194+
}
195+
if (val.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
196+
message_hash = (unsigned long) (message_hash + (val.via.i64 << 6));
197+
}
198+
}
199+
161200
/* Lookup message key */
162201
if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
163202
if (key.via.str.size == ctx->message_key_field_len &&
@@ -225,6 +264,17 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
225264
}
226265
}
227266

267+
if (ctx->hash) {
268+
/* Add message_hash as hex-string field */
269+
msgpack_pack_str(&mp_pck, ctx->hash_key_len);
270+
msgpack_pack_str_body(&mp_pck, ctx->hash_key, ctx->hash_key_len);
271+
/* (long int) message_hash -> (str) message_hash_buf */
272+
i = snprintf(NULL, 0, "%lX", message_hash);
273+
snprintf(message_hash_buf, i+1, "%lX", message_hash);
274+
msgpack_pack_str(&mp_pck, i);
275+
msgpack_pack_str_body(&mp_pck, message_hash_buf, i);
276+
}
277+
228278
if (ctx->format == FLB_KAFKA_FMT_JSON) {
229279
s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
230280
if (!s) {

plugins/out_kafka/kafka_config.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,26 @@ struct flb_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
127127
ctx->dynamic_topic = FLB_FALSE;
128128
}
129129

130+
/* Config: Hash */
131+
tmp = flb_output_get_property("hash", ins);
132+
if (tmp) {
133+
ctx->hash = flb_utils_bool(tmp);
134+
}
135+
else {
136+
ctx->hash = FLB_FALSE;
137+
}
138+
139+
/* Config: Hash_Key */
140+
tmp = flb_output_get_property("hash_key", ins);
141+
if (tmp) {
142+
ctx->hash_key = flb_strdup(tmp);
143+
ctx->hash_key_len = strlen(tmp);
144+
}
145+
else {
146+
ctx->hash_key = FLB_KAFKA_HASH_KEY;
147+
ctx->hash_key_len = strlen(FLB_KAFKA_HASH_KEY);
148+
}
149+
130150
/* Config: Format */
131151
tmp = flb_output_get_property("format", ins);
132152
if (tmp) {

plugins/out_kafka/kafka_config.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#define FLB_KAFKA_BROKERS "127.0.0.1"
3333
#define FLB_KAFKA_TOPIC "fluent-bit"
3434
#define FLB_KAFKA_TS_KEY "@timestamp"
35+
#define FLB_KAFKA_HASH_KEY "_id"
3536

3637
/* rdkafka log levels based on syslog(3) */
3738
#define FLB_KAFKA_LOG_EMERG 0
@@ -63,6 +64,10 @@ struct flb_kafka {
6364
int topic_key_len;
6465
char *topic_key;
6566

67+
int hash;
68+
int hash_key_len;
69+
char *hash_key;
70+
6671
int timestamp_key_len;
6772
char *timestamp_key;
6873
int timestamp_format;

0 commit comments

Comments
 (0)