diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..df1605af31c 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms); if (!rkm) { break; @@ -180,8 +180,11 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + + if(!ctx->enable_auto_commit) { + /* TO-DO: commit the record based on `ret` */ + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + } /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && @@ -243,6 +246,21 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } + /* Set the kafka poll timeout dependend on wether we run in our own + * or in the main event thread. + * a) run in main event thread: + * -> minimize the delay we might create + * b) run in our own thread: + * -> optimize for throuput and relay on 'fetch.wait.max.ms' + * which is set to 500 by default default. lets set it to + * twice that so that increasing fetch.wait.max.ms still + * has an effect. + */ + ctx->poll_timeount_ms = 1; + if(ins->is_threaded) { + ctx->poll_timeount_ms = 1000; + } + if (ctx->buffer_max_size > 0) { ctx->polling_threshold = ctx->buffer_max_size; @@ -428,6 +446,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), + "Rely on kafka auto-commit and commit messages in batches" + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..7eca5f5341f 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -32,6 +32,7 @@ #define FLB_IN_KAFKA_DEFAULT_FORMAT "none" #define FLB_IN_KAFKA_UNLIMITED (size_t)-1 #define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" +#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false" enum { FLB_IN_KAFKA_FORMAT_NONE, @@ -48,6 +49,8 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + bool enable_auto_commit; + int poll_timeount_ms; }; #endif