We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent fb63425 commit 0dd498bCopy full SHA for 0dd498b
plugins/out_kafka/kafka.c
@@ -478,6 +478,14 @@ static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
478
while (msgpack_unpack_next(&result,
479
event_chunk->data,
480
event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) {
481
+ if (result.data.type != MSGPACK_OBJECT_ARRAY) {
482
+ continue;
483
+ }
484
+
485
+ if (result.data.via.array.size != 2) {
486
487
488
489
flb_time_pop_from_msgpack(&tms, &result, &obj);
490
491
ret = produce_message(&tms, obj, ctx, config);
0 commit comments