Skip to content

Commit 047120c

Browse files
HaChanhokakao-champ-ion
authored andcommitted
in_kafka: add async commit mode
Signed-off-by: HaChanHo <[email protected]>
1 parent dffcd42 commit 047120c

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

plugins/in_kafka/in_kafka.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ static int in_kafka_collect(struct flb_input_instance *ins,
181181
rd_kafka_message_destroy(rkm);
182182

183183
/* TO-DO: commit the record based on `ret` */
184-
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
184+
rd_kafka_commit(ctx->kafka.rk, NULL, ctx->async_commit);
185+
186+
flb_plg_debug(ins, "offset committed(%s)", ctx->async_commit ? "async" : "sync");
185187

186188
/* Break from the loop when reaching the limit of polling if available */
187189
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
@@ -428,6 +430,11 @@ static struct flb_config_map config_map[] = {
428430
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
429431
"Set the maximum size of chunk"
430432
},
433+
{
434+
FLB_CONFIG_MAP_BOOL, "async_commit", "false",
435+
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, async_commit),
436+
"When enabled the offset commit operation is asynchronous"
437+
},
431438
/* EOF */
432439
{0}
433440
};

plugins/in_kafka/in_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ struct flb_in_kafka_config {
4848
int coll_fd;
4949
size_t buffer_max_size; /* Maximum size of chunk allocation */
5050
size_t polling_threshold;
51+
int async_commit;
5152
};
5253

5354
#endif

0 commit comments

Comments
 (0)