Skip to content

Commit e47284b

Browse files
committed
input: make input threads ring_buffer capacity and window flush configurable
Input plugins running in threaded mode use a per-instance ring buffer to enqueue records into the pipeline, which are then consumed by the engine. By default, each ring buffer is sized for 1024 entries. In high-throughput scenarios, the default capacity may not be sufficient. This patch introduces two new configuration options to allow fine-tuning of the ring buffer behavior per input plugin: - thread.ring_buffer.capacity: Sets the maximum number of entries the ring buffer can hold (default: 1024). - thread.ring_buffer.window: Sets the flush window as a percentage of capacity, used to trigger a flush request when the threshold is reached (default: 5). Signed-off-by: Eduardo Silva <[email protected]>
1 parent 8bdcda8 commit e47284b

File tree

2 files changed

+52
-8
lines changed

2 files changed

+52
-8
lines changed

include/fluent-bit/flb_input.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ struct flb_input_instance {
379379
* in the ring buffer.
380380
*/
381381
struct flb_ring_buffer *rb;
382+
size_t ring_buffer_size; /* ring buffer size */
383+
uint8_t ring_buffer_window; /* ring buffer window percentage */
382384

383385
/* List of upstreams */
384386
struct mk_list upstreams;

src/flb_input.c

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,20 @@ pthread_key_t libco_in_param_key;
5656
#define protcmp(a, b) strncasecmp(a, b, strlen(a))
5757

5858
/*
59-
* Ring buffer size: we make space for 512 entries that each input instance can
60-
* use to enqueue data. Note that this value is fixed and only affect input plugins
61-
* which runs in threaded mode (separate thread)
59+
* Ring buffer capacity: by default we make space for 1024 entries that each
60+
* input instance can use to enqueue data. The capacity can be customized per
61+
* input instance through the 'thread.ring_buffer.capacity' property. The value
62+
* represents the number of slots and is converted to bytes when the ring buffer
63+
* is created. This affects only input plugins running in threaded mode.
6264
*
6365
* Ring buffer window: the current window size is set to 5% which means that the
64-
* ring buffer will emit a flush request whenever there are 51 records or more
65-
* awaiting to be consumed.
66+
* ring buffer will emit a flush request whenever the window threshold is reached.
67+
* The window percentage can be tuned per input instance using the
68+
* 'thread.ring_buffer.window' property.
6669
*/
6770

68-
#define FLB_INPUT_RING_BUFFER_SIZE (sizeof(void *) * 1024)
71+
#define FLB_INPUT_RING_BUFFER_CAPACITY 1024
72+
#define FLB_INPUT_RING_BUFFER_SIZE (sizeof(void *) * FLB_INPUT_RING_BUFFER_CAPACITY)
6973
#define FLB_INPUT_RING_BUFFER_WINDOW (5)
7074

7175
/* config map to register options available for all input plugins */
@@ -123,6 +127,16 @@ struct flb_config_map input_global_properties[] = {
123127
0, FLB_FALSE, 0,
124128
"Enable threading on an input"
125129
},
130+
{
131+
FLB_CONFIG_MAP_INT, "thread.ring_buffer.capacity", "0",
132+
0, FLB_FALSE, 0,
133+
"Set custom ring buffer capacity when the input runs in threaded mode"
134+
},
135+
{
136+
FLB_CONFIG_MAP_INT, "thread.ring_buffer.window", "0",
137+
0, FLB_FALSE, 0,
138+
"Set custom ring buffer window percentage for threaded inputs"
139+
},
126140

127141
{0}
128142
};
@@ -404,8 +418,12 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
404418

405419
}
406420

421+
/* set default ring buffer size and window */
422+
instance->ring_buffer_size = FLB_INPUT_RING_BUFFER_SIZE;
423+
instance->ring_buffer_window = FLB_INPUT_RING_BUFFER_WINDOW;
424+
407425
/* allocate a ring buffer */
408-
instance->rb = flb_ring_buffer_create(FLB_INPUT_RING_BUFFER_SIZE);
426+
instance->rb = flb_ring_buffer_create(instance->ring_buffer_size);
409427
if (!instance->rb) {
410428
flb_error("instance %s could not initialize ring buffer",
411429
flb_input_name(instance));
@@ -704,6 +722,30 @@ int flb_input_set_property(struct flb_input_instance *ins,
704722

705723
ins->is_threaded = enabled;
706724
}
725+
else if (prop_key_check("thread.ring_buffer.capacity", k, len) == 0 && tmp) {
726+
ret = atoi(tmp);
727+
flb_sds_destroy(tmp);
728+
if (ret <= 0) {
729+
return -1;
730+
}
731+
ins->ring_buffer_size = (size_t) ret * sizeof(void *);
732+
if (ins->rb) {
733+
flb_ring_buffer_destroy(ins->rb);
734+
ins->rb = flb_ring_buffer_create(ins->ring_buffer_size);
735+
if (!ins->rb) {
736+
flb_error("instance %s could not initialize ring buffer", flb_input_name(ins));
737+
return -1;
738+
}
739+
}
740+
}
741+
else if (prop_key_check("thread.ring_buffer.window", k, len) == 0 && tmp) {
742+
ret = atoi(tmp);
743+
flb_sds_destroy(tmp);
744+
if (ret <= 0 || ret > 100) {
745+
return -1;
746+
}
747+
ins->ring_buffer_window = (uint8_t) ret;
748+
}
707749
else if (prop_key_check("storage.pause_on_chunks_overlimit", k, len) == 0 && tmp) {
708750
ret = flb_utils_bool(tmp);
709751
flb_sds_destroy(tmp);
@@ -1320,7 +1362,7 @@ int flb_input_instance_init(struct flb_input_instance *ins,
13201362
}
13211363

13221364
/* register the ring buffer */
1323-
ret = flb_ring_buffer_add_event_loop(ins->rb, config->evl, FLB_INPUT_RING_BUFFER_WINDOW);
1365+
ret = flb_ring_buffer_add_event_loop(ins->rb, config->evl, ins->ring_buffer_window);
13241366
if (ret) {
13251367
flb_error("failed while registering ring buffer events on input %s",
13261368
ins->name);

0 commit comments

Comments
 (0)