Skip to content

Commit 0e15cc1

Browse files
committed
input: add thread.ring_buffer.retry_limit option
When a threaded input plugin's ring buffer is full, the input thread retries writing to the buffer before dropping data. Previously, this retry limit was hardcoded to 10 (1 second total with 100ms sleep between retries). This patch makes the retry limit configurable via the new 'thread.ring_buffer.retry_limit' option. The default value remains 10 for backward compatibility. Increasing this value allows the system to handle temporary backpressure situations without dropping data. Fixes #11393 Signed-off-by: jinyong.choi <[email protected]>
1 parent 1dcc0b7 commit 0e15cc1

File tree

3 files changed

+26
-4
lines changed

3 files changed

+26
-4
lines changed

include/fluent-bit/flb_input.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ struct flb_input_instance {
383383
struct flb_ring_buffer *rb;
384384
size_t ring_buffer_size; /* ring buffer size */
385385
uint8_t ring_buffer_window; /* ring buffer window percentage */
386+
int ring_buffer_retry_limit; /* ring buffer write retry limit */
386387

387388
/* List of upstreams */
388389
struct mk_list upstreams;

src/flb_input.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,17 @@ pthread_key_t libco_in_param_key;
6767
* ring buffer will emit a flush request whenever the window threshold is reached.
6868
* The window percentage can be tuned per input instance using the
6969
* 'thread.ring_buffer.window' property.
70+
*
71+
* Ring buffer retry limit: when the ring buffer is full, the input thread will
72+
* retry writing to the buffer up to 'retry_limit' times (with 100ms sleep between
73+
* retries) before dropping the data. The default is 10 retries (1 second total).
74+
* This can be tuned per input instance using 'thread.ring_buffer.retry_limit'.
7075
*/
7176

7277
#define FLB_INPUT_RING_BUFFER_CAPACITY 1024
7378
#define FLB_INPUT_RING_BUFFER_SIZE (sizeof(void *) * FLB_INPUT_RING_BUFFER_CAPACITY)
7479
#define FLB_INPUT_RING_BUFFER_WINDOW (5)
80+
#define FLB_INPUT_RING_BUFFER_RETRY_LIMIT (10)
7581

7682
/* config map to register options available for all input plugins */
7783
struct flb_config_map input_global_properties[] = {
@@ -138,6 +144,12 @@ struct flb_config_map input_global_properties[] = {
138144
0, FLB_FALSE, 0,
139145
"Set custom ring buffer window percentage for threaded inputs"
140146
},
147+
{
148+
FLB_CONFIG_MAP_INT, "thread.ring_buffer.retry_limit",
149+
STR(FLB_INPUT_RING_BUFFER_RETRY_LIMIT),
150+
0, FLB_FALSE, 0,
151+
"Set maximum retry attempts when ring buffer is full before dropping data"
152+
},
141153

142154
{0}
143155
};
@@ -420,9 +432,10 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
420432

421433
}
422434

423-
/* set default ring buffer size and window */
435+
/* set default ring buffer size, window, and retry limit */
424436
instance->ring_buffer_size = FLB_INPUT_RING_BUFFER_SIZE;
425437
instance->ring_buffer_window = FLB_INPUT_RING_BUFFER_WINDOW;
438+
instance->ring_buffer_retry_limit = FLB_INPUT_RING_BUFFER_RETRY_LIMIT;
426439

427440
/* allocate a ring buffer */
428441
instance->rb = flb_ring_buffer_create(instance->ring_buffer_size);
@@ -759,6 +772,15 @@ int flb_input_set_property(struct flb_input_instance *ins,
759772
}
760773
ins->ring_buffer_window = (uint8_t) ret;
761774
}
775+
else if (prop_key_check("thread.ring_buffer.retry_limit", k, len) == 0 && tmp) {
776+
ret = atoi(tmp);
777+
flb_sds_destroy(tmp);
778+
if (ret <= 0) {
779+
flb_error("[input] thread.ring_buffer.retry_limit must be greater than 0");
780+
return -1;
781+
}
782+
ins->ring_buffer_retry_limit = ret;
783+
}
762784
else if (prop_key_check("storage.pause_on_chunks_overlimit", k, len) == 0 && tmp) {
763785
ret = flb_utils_bool(tmp);
764786
flb_sds_destroy(tmp);

src/flb_input_chunk.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2945,7 +2945,6 @@ static int append_to_ring_buffer(struct flb_input_instance *ins,
29452945
{
29462946
int ret;
29472947
int retries = 0;
2948-
int retry_limit = 10;
29492948
struct input_chunk_raw *cr;
29502949

29512950
if (buf_size == 0) {
@@ -2995,9 +2994,9 @@ static int append_to_ring_buffer(struct flb_input_instance *ins,
29952994
/*
29962995
* There is a little chance that the ring buffer is full or due to saturation
29972996
* from the main thread the data is not being consumed. On this scenario we
2998-
* retry up to 'retry_limit' times with a little wait time.
2997+
* retry up to 'ring_buffer_retry_limit' times with a little wait time.
29992998
*/
3000-
if (retries >= retry_limit) {
2999+
if (retries >= ins->ring_buffer_retry_limit) {
30013000
flb_plg_error(ins, "could not enqueue records into the ring buffer");
30023001
destroy_chunk_raw(cr);
30033002

0 commit comments

Comments
 (0)