Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
#define FLB_INPUT_RUNNING 1
#define FLB_INPUT_PAUSED 0

/* Paused by */
#define FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT 0
#define FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW 1

struct flb_input_instance;

struct flb_input_plugin {
Expand Down Expand Up @@ -249,6 +253,9 @@ struct flb_input_instance {
*/
int storage_buf_status;


int storage_buf_paused_by;

/*
* Optional data passed to the plugin, this info is useful when
* running Fluent Bit in library mode and the target plugin needs
Expand Down
10 changes: 10 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_inst
#define FLB_OUTPUT_PRIVATE 1024
#define FLB_OUTPUT_SYNCHRONOUS 2048 /* run one task at a time, no flush cycle limit */

#define FLB_OUTPUT_STORAGE_OVERFLOW_DROP 0 /* storage.overflow_action: drop_oldest_chunk */
#define FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION 1 /* storage.overlow_action : pause_ingestion */

/*
* Event type handlers
Expand Down Expand Up @@ -406,6 +408,14 @@ struct flb_output_instance {
*/
size_t total_limit_size;

/* when the instance has been configured with a limit for storage.total_limit_size,
* if the limit is reached the action to take is defined by storage_overflow_action:
*
* - FLB_OUTPUT_STORAGE_OVERFLOW_DROP (default)
* - FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION
*/
int storage_overflow_action;

/* Queue for singleplexed tasks */
struct flb_task_queue *singleplex_queue;

Expand Down
85 changes: 71 additions & 14 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1203,11 +1203,15 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance
{
struct flb_storage_input *storage = (struct flb_storage_input *)i->storage;

if (storage->type == FLB_STORAGE_FS) {
if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) {
if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) {
return FLB_TRUE;
}
/* Not applicable for storage based on memory */
if (storage->type == FLB_STORAGE_MEM) {
return FLB_FALSE;
}

/* if input instance has enabled 'storage.pause_on_chunks_overlimit' */
if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) {
if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) {
return FLB_TRUE;
}
}

Expand Down Expand Up @@ -1260,10 +1264,12 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
in->name);
}
}

if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE &&
in->config->is_running == FLB_TRUE &&
in->config->is_ingestion_active == FLB_TRUE &&
in->storage_buf_status == FLB_INPUT_PAUSED) {
in->storage_buf_status == FLB_INPUT_PAUSED &&
in->storage_buf_paused_by == FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT) {
in->storage_buf_status = FLB_INPUT_RUNNING;
if (in->p->cb_resume) {
flb_input_resume(in);
Expand All @@ -1274,32 +1280,46 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
}
}

if (in->config->is_running == FLB_TRUE &&
in->config->is_ingestion_active == FLB_TRUE &&
in->storage_buf_status == FLB_INPUT_PAUSED &&
in->storage_buf_paused_by == FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW) {
in->storage_buf_status = FLB_INPUT_RUNNING;
if (in->p->cb_resume) {
flb_input_resume(in);
flb_info("[input] %s resume from storage.overflow_action", flb_input_name(in));
}
}

return total;
}

/*
* If the number of bytes in use by the chunks are over the imposed limit
* by configuration, pause the instance.
*/
static inline int flb_input_chunk_protect(struct flb_input_instance *i)
static inline int flb_input_chunk_protect(struct flb_input_instance *i, struct flb_input_chunk *last_input_chunk)
{
struct mk_list *head;
struct flb_config *config;
struct flb_output_instance *o_ins;
struct flb_storage_input *storage = i->storage;

config = i->config;
if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
i->name,
storage->cio->total_chunks_up,
storage->cio->max_chunks_up);
flb_input_pause(i);
i->storage_buf_status = FLB_INPUT_PAUSED;
i->storage_buf_paused_by = FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT;
return FLB_TRUE;
}

if (storage->type == FLB_STORAGE_FS) {
return FLB_FALSE;
}

if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) {
if ((storage->type == FLB_STORAGE_MEM || storage->type == FLB_STORAGE_MEM) &&
flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) {
/*
* if the plugin is already overlimit and the strategy is based on
* a memory-ring-buffer logic, do not pause the plugin, upon next
Expand All @@ -1313,13 +1333,50 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i)
* The plugin is using 'memory' buffering only and already reached
* it limit, just pause the ingestion.
*/
flb_warn("[input] %s paused (mem buf overlimit)",
i->name);
flb_warn("[input] %s paused (mem buf overlimit)", flb_input_name(i));
flb_input_pause(i);
i->mem_buf_status = FLB_INPUT_PAUSED;
return FLB_TRUE;
}

/*
* if the output plugin that is a route for this input instance has enabled
* `storage.total_limit_size` and it capacity is at 90% or less than 5MB, we
* will pause the input instance.
*/
if (storage->type == FLB_STORAGE_FS) {
mk_list_foreach(head, &config->outputs) {
o_ins = mk_list_entry(head, struct flb_output_instance, _head);

/* only count outputs with storage.total_limit_size set */
if (o_ins->total_limit_size <= 0) {
continue;
}

/* skip outputs with overflow action set to drop */
if (o_ins->storage_overflow_action == FLB_OUTPUT_STORAGE_OVERFLOW_DROP) {
continue;
}

/* check if this output plugin matches the route of the input */
if (flb_routes_mask_get_bit(last_input_chunk->routes_mask, o_ins->id) == 0) {
continue;
}

/* pause ingestion if capacity is at 90%, or less than 5MB available */
if ((o_ins->fs_chunks_size >= (o_ins->total_limit_size * 0.9)) ||
(o_ins->fs_chunks_size + (5 * 1024 * 1024)) >= o_ins->total_limit_size) {
flb_warn("[input] %s paused by storage.overflow_action - output %s usage is %zu/%zu)",
flb_input_name(i), flb_output_name(o_ins),
o_ins->fs_chunks_size, o_ins->total_limit_size);
flb_input_pause(i);
i->storage_buf_status = FLB_INPUT_PAUSED;
i->storage_buf_paused_by = FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW;
return FLB_TRUE;
}
}
}

return FLB_FALSE;
}

Expand Down Expand Up @@ -1750,7 +1807,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
}
#endif /* FLB_HAVE_CHUNK_TRACE */

flb_input_chunk_protect(in);
flb_input_chunk_protect(in, ic);
return 0;
}

Expand Down
18 changes: 16 additions & 2 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
if (is_empty) {
return flb_output_task_queue_flush_one(out_ins->singleplex_queue);
}

return 0;
}

Expand All @@ -300,7 +300,7 @@ int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue)
mk_list_del(&ended_task->_head);
flb_free(ended_task);
}

/* Flush if there is a pending task queued */
is_empty = mk_list_is_empty(&queue->pending) == 0;
if (!is_empty) {
Expand Down Expand Up @@ -923,6 +923,20 @@ int flb_output_set_property(struct flb_output_instance *ins,
flb_sds_destroy(tmp);
ins->total_limit_size = (size_t) limit;
}
else if (prop_key_check("storage.overflow_action", k, len) == 0 && tmp) {
if (strcasecmp(tmp, "drop_oldest_chunk") == 0) {
ins->storage_overflow_action = FLB_OUTPUT_STORAGE_OVERFLOW_DROP;
}
else if (strcasecmp(tmp, "pause_ingestion") == 0) {
ins->storage_overflow_action = FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION;
}
else {
flb_error("[config] invalid overflow_action '%s' for %s plugin",
tmp, (char *) flb_output_name(ins));
flb_sds_destroy(tmp);
return -1;
}
}
else if (prop_key_check("workers", k, len) == 0 && tmp) {
/* Set the number of workers */
ins->tp_workers = atoi(tmp);
Expand Down