Skip to content

Commit 0fcee1e

Browse files
authored
in_kafka: optimize poll timeout handling for threaded and main event loop modes (#10122)
* in_kafka: optimize poll timeout handling for threaded and main event loop modes --------- Signed-off-by: nareshku <[email protected]>
1 parent ac025e7 commit 0fcee1e

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

plugins/in_kafka/in_kafka.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,25 @@ static int in_kafka_collect(struct flb_input_instance *ins,
161161
ret = FLB_EVENT_ENCODER_SUCCESS;
162162

163163
while (ret == FLB_EVENT_ENCODER_SUCCESS) {
164-
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);
164+
/* Set the Kafka poll timeout based on execution mode:
165+
*
166+
* a) Running in the main event loop (non-threaded):
167+
* - Use a minimal timeout to avoid blocking other inputs.
168+
*
169+
* b) Running in a dedicated thread:
170+
* - Optimize for throughput by allowing Kafka's internal batching.
171+
* - Align with 'fetch.wait.max.ms' (default: 500ms) to maximize batch efficiency.
172+
* - Set timeout slightly higher than 'fetch.wait.max.ms' (e.g., 1.5x - 2x) to
173+
* ensure it does not interfere with Kafka’s fetch behavior, while still
174+
* keeping the consumer responsive.
175+
*/
176+
if (ctx->ins->flags & FLB_INPUT_THREADED) {
177+
/* Threaded mode: Optimize for batch processing and efficiency */
178+
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeout_ms);
179+
} else {
180+
/* Main event loop: Minimize delay for non-blocking execution */
181+
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);
182+
}
165183

166184
if (!rkm) {
167185
break;
@@ -428,6 +446,14 @@ static struct flb_config_map config_map[] = {
428446
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
429447
"Set the maximum size of chunk"
430448
},
449+
{
450+
FLB_CONFIG_MAP_INT, "poll_timeout_ms", "1",
451+
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, poll_timeout_ms),
452+
"Set the timeout in milliseconds for Kafka consumer poll operations. "
453+
"This option only takes effect when running in a dedicated thread (i.e., when 'threaded' is enabled). "
454+
"Using a higher timeout (e.g., 1.5x - 2x 'rdkafka.fetch.wait.max.ms') "
455+
"can improve efficiency by leveraging Kafka's batching mechanism."
456+
},
431457
/* EOF */
432458
{0}
433459
};

plugins/in_kafka/in_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ struct flb_in_kafka_config {
4848
int coll_fd;
4949
size_t buffer_max_size; /* Maximum size of chunk allocation */
5050
size_t polling_threshold;
51+
int poll_timeout_ms;
5152
};
5253

5354
#endif

0 commit comments

Comments
 (0)