2828void cb_kafka_msg (rd_kafka_t * rk , const rd_kafka_message_t * rkmessage ,
2929 void * opaque )
3030{
31- struct flb_kafka * ctx = (struct flb_kafka * ) opaque ;
31+ struct flb_out_kafka * ctx = (struct flb_out_kafka * ) opaque ;
3232
3333 if (rkmessage -> err ) {
3434 flb_plg_warn (ctx -> ins , "message delivery failed: %s" ,
@@ -44,9 +44,9 @@ void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
4444void cb_kafka_logger (const rd_kafka_t * rk , int level ,
4545 const char * fac , const char * buf )
4646{
47- struct flb_kafka * ctx ;
47+ struct flb_out_kafka * ctx ;
4848
49- ctx = (struct flb_kafka * ) rd_kafka_opaque (rk );
49+ ctx = (struct flb_out_kafka * ) rd_kafka_opaque (rk );
5050
5151 if (level <= FLB_KAFKA_LOG_ERR ) {
5252 flb_plg_error (ctx -> ins , "%s: %s" ,
@@ -70,7 +70,7 @@ static int cb_kafka_init(struct flb_output_instance *ins,
7070 struct flb_config * config ,
7171 void * data )
7272{
73- struct flb_kafka * ctx ;
73+ struct flb_out_kafka * ctx ;
7474
7575 /* Configuration */
7676 ctx = flb_kafka_conf_create (ins , config );
@@ -85,7 +85,7 @@ static int cb_kafka_init(struct flb_output_instance *ins,
8585}
8686
8787int produce_message (struct flb_time * tm , msgpack_object * map ,
88- struct flb_kafka * ctx , struct flb_config * config )
88+ struct flb_out_kafka * ctx , struct flb_config * config )
8989{
9090 int i ;
9191 int ret ;
@@ -459,7 +459,7 @@ static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
459459
460460 int ret ;
461461 size_t off = 0 ;
462- struct flb_kafka * ctx = out_context ;
462+ struct flb_out_kafka * ctx = out_context ;
463463 struct flb_time tms ;
464464 msgpack_object * obj ;
465465 msgpack_unpacked result ;
@@ -497,7 +497,7 @@ static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
497497
498498static int cb_kafka_exit (void * data , struct flb_config * config )
499499{
500- struct flb_kafka * ctx = data ;
500+ struct flb_out_kafka * ctx = data ;
501501
502502 flb_kafka_conf_destroy (ctx );
503503 return 0 ;
0 commit comments