Skip to content

Commit 6e8695f

Browse files
jinyongchoiedsiper
authored andcommitted
out_kafka: add support to force flush when shutdown. (fluent#5593)
When fluent-bit shutdown for out_kafka plugin with rdkafka.linger.ms and rdkafka.batch.size and then we show the warn message and lose the message. So, this patch fixes the issue of not flushing due to the rdkafka linger.ms option. The 'rd_kafka_flush' function is linger.ms time will be ignored for the duration of the call, queued messages will be sent to the broker as soon as possible. Signed-off-by: jinyong.choi <[email protected]>
1 parent b17e56f commit 6e8695f

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

plugins/out_kafka/kafka.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,10 +495,29 @@ static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
495495
FLB_OUTPUT_RETURN(FLB_OK);
496496
}
497497

498+
static void kafka_flush_force(struct flb_out_kafka *ctx,
499+
struct flb_config *config)
500+
{
501+
int ret;
502+
503+
if (!ctx) {
504+
return;
505+
}
506+
507+
if (ctx->kafka.rk) {
508+
ret = rd_kafka_flush(ctx->kafka.rk, config->grace * 1000);
509+
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
510+
flb_plg_warn(ctx->ins, "Failed to force flush: %s",
511+
rd_kafka_err2str(ret));
512+
}
513+
}
514+
}
515+
498516
static int cb_kafka_exit(void *data, struct flb_config *config)
499517
{
500518
struct flb_out_kafka *ctx = data;
501519

520+
kafka_flush_force(ctx, config);
502521
flb_out_kafka_destroy(ctx);
503522
return 0;
504523
}

0 commit comments

Comments
 (0)