diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b1f07458884..9f3c4938f9f 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -49,6 +49,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, } ctx->ins = ins; ctx->blocked = FLB_FALSE; + mk_list_init(&ctx->topics); ret = flb_output_config_map_set(ins, (void*) ctx); if (ret == -1) { @@ -239,9 +240,13 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, if (!ctx->kafka.rk) { flb_plg_error(ctx->ins, "failed to create producer: %s", errstr); + rd_kafka_conf_destroy(ctx->conf); + ctx->conf = NULL; flb_out_kafka_destroy(ctx); return NULL; } + /* rd_kafka_new() succeeded, conf ownership transferred to rk */ + ctx->conf = NULL; #ifdef FLB_HAVE_AVRO_ENCODER /* Config AVRO */ @@ -256,7 +261,6 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, #endif /* Config: Topic */ - mk_list_init(&ctx->topics); tmp = flb_output_get_property("topics", ins); if (!tmp) { flb_kafka_topic_create(FLB_KAFKA_TOPIC, ctx); @@ -304,6 +308,10 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx) rd_kafka_destroy(ctx->kafka.rk); } + if (ctx->conf) { + rd_kafka_conf_destroy(ctx->conf); + } + if (ctx->opaque) { flb_kafka_opaque_destroy(ctx->opaque); }