@@ -161,7 +161,25 @@ static int in_kafka_collect(struct flb_input_instance *ins,
161161 ret = FLB_EVENT_ENCODER_SUCCESS ;
162162
163163 while (ret == FLB_EVENT_ENCODER_SUCCESS ) {
164- rkm = rd_kafka_consumer_poll (ctx -> kafka .rk , 1 );
164+ /* Set the Kafka poll timeout based on execution mode:
165+ *
166+ * a) Running in the main event loop (non-threaded):
167+ * - Use a minimal timeout (1ms - 10ms) to avoid blocking other inputs.
168+ *
169+ * b) Running in a dedicated thread:
170+ * - Optimize for throughput by allowing Kafka's internal batching.
171+ * - Align with 'fetch.wait.max.ms' (default: 500ms) to maximize batch efficiency.
172+ * - Set timeout slightly higher than 'fetch.wait.max.ms' (e.g., 1.5x - 2x) to
173+ * ensure it does not interfere with Kafka’s fetch behavior, while still
174+ * keeping the consumer responsive.
175+ */
176+ if (ctx -> ins -> flags & FLB_INPUT_THREADED ) {
177+ /* Threaded mode: Optimize for batch processing and efficiency */
178+ rkm = rd_kafka_consumer_poll (ctx -> kafka .rk , ctx -> poll_timeout_ms );
179+ } else {
180+ /* Main event loop: Minimize delay for non-blocking execution */
181+ rkm = rd_kafka_consumer_poll (ctx -> kafka .rk , 1 );
182+ }
165183
166184 if (!rkm ) {
167185 break ;
@@ -428,6 +446,14 @@ static struct flb_config_map config_map[] = {
428446 0 , FLB_TRUE , offsetof(struct flb_in_kafka_config , buffer_max_size ),
429447 "Set the maximum size of chunk"
430448 },
449+ {
450+ FLB_CONFIG_MAP_INT , "poll_timeout_ms" , "1" ,
451+ 0 , FLB_TRUE , offsetof(struct flb_in_kafka_config , poll_timeout_ms ),
452+ "Set the timeout in milliseconds for Kafka consumer poll operations. "
453+ "When running in the main event loop, use a low value (e.g., 1-10ms) to minimize blocking. "
454+ "When running in a dedicated thread, a higher value (e.g., 1.5x - 2x 'librdkafka.fetch.wait.max.ms') "
455+ "can improve efficiency by leveraging Kafka's batching."
456+ },
431457 /* EOF */
432458 {0 }
433459};
0 commit comments