diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index d171ef08167..3ebcb5edbae 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -257,6 +257,7 @@ struct flb_config { /* DLQ for non-retriable output failures */ int storage_keep_rejected; /* 0/1 */ char *storage_rejected_path; /* relative to storage_path, default "rejected" */ + char *storage_rejected_limit; /* maximum total bytes in DLQ stream */ void *storage_rejected_stream; /* NULL until first use */ /* Embedded SQL Database support (SQLite3) */ @@ -423,6 +424,7 @@ enum conf_type { /* Storage DLQ */ #define FLB_CONF_STORAGE_KEEP_REJECTED "storage.keep.rejected" #define FLB_CONF_STORAGE_REJECTED_PATH "storage.rejected.path" +#define FLB_CONF_STORAGE_REJECTED_LIMIT "storage.rejected.limit" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/src/flb_config.c b/src/flb_config.c index 06c76ca78c3..f6993114fcd 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -171,6 +171,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_REJECTED_PATH, FLB_CONF_TYPE_STR, offsetof(struct flb_config, storage_rejected_path)}, + {FLB_CONF_STORAGE_REJECTED_LIMIT, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, storage_rejected_limit)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, @@ -359,6 +362,7 @@ struct flb_config *flb_config_init() config->storage_inherit = FLB_FALSE; config->storage_bl_flush_on_shutdown = FLB_FALSE; config->storage_rejected_path = NULL; + config->storage_rejected_limit = NULL; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; config->json_escape_unicode = FLB_TRUE; @@ -624,6 +628,9 @@ void flb_config_exit(struct flb_config *config) if (config->storage_rejected_path) { flb_free(config->storage_rejected_path); } + if (config->storage_rejected_limit) { + flb_free(config->storage_rejected_limit); + } #ifdef FLB_HAVE_STREAM_PROCESSOR if (config->stream_processor_file) { diff --git a/src/flb_storage.c b/src/flb_storage.c index 2554799b083..7f9b3f874de 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -24,6 +24,7 @@ #include #include #include +#include static struct cmt *metrics_context_create(struct flb_storage_metrics *sm) { @@ -843,6 +844,26 @@ static inline int flb_storage_chunk_restore_state(struct cio_chunk *src, int was return ret_val; } +static ssize_t dlq_stream_total_size(struct cio_stream *st) +{ + ssize_t chunk_size; + ssize_t total = 0; + struct cio_chunk *ch; + struct mk_list *head; + + mk_list_foreach(head, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + + /* Total content bytes, to match write-path size checks */ + chunk_size = cio_chunk_get_content_size(ch); + if (chunk_size > 0) { + total += chunk_size; + } + } + + return total; +} + int flb_storage_quarantine_chunk(struct flb_config *ctx, struct cio_chunk *src, const char *tag, @@ -859,6 +880,9 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, struct cio_chunk *dst; char safe_tag[128]; char safe_out[64]; + ssize_t current_size; + ssize_t next_size; + int64_t max_size; if (!ctx || !src) { return -1; @@ -868,6 +892,22 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, return -1; } + if (ctx->storage_rejected_limit) { + max_size = flb_utils_size_to_bytes(ctx->storage_rejected_limit); + if (max_size <= 0) { + flb_warn("[storage] invalid DLQ size limit '%s'", + ctx->storage_rejected_limit); + return -1; + } + + current_size = dlq_stream_total_size(dlq); + if (current_size >= max_size) { + flb_warn("[storage] DLQ size limit reached (%zd/%" PRId64 " bytes), " + "rejected chunk copy skipped", current_size, max_size); + return -1; + } + } + /* Remember original state and bring the chunk up if needed */ was_up = (cio_chunk_is_up(src) == CIO_TRUE); if (!was_up) { @@ -890,6 +930,22 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx, return flb_storage_chunk_restore_state(src, was_up, -1); } + if (ctx->storage_rejected_limit && current_size + (ssize_t) size > max_size) { + flb_warn("[storage] DLQ size limit exceeded (%zd + %zu > %" PRId64 + " bytes), rejected chunk copy skipped", + current_size, size, max_size); + flb_free(buf); + return flb_storage_chunk_restore_state(src, was_up, -1); + } + + if (ctx->storage_rejected_limit) { + next_size = current_size + (ssize_t) size; + if (next_size >= (max_size * 9) / 10) { + flb_warn("[storage] DLQ content size is at %zd/%" PRId64 + " bytes (>=90%% of limit)", next_size, max_size); + } + } + /* Create + write the DLQ copy */ dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err); if (!dst) { diff --git a/tests/internal/storage_dlq.c b/tests/internal/storage_dlq.c index bca4d269079..26b028994a1 100644 --- a/tests/internal/storage_dlq.c +++ b/tests/internal/storage_dlq.c @@ -584,10 +584,43 @@ static void test_dlq_preserves_chunk_state_when_initially_up(void) test_cleanup_with_cio(ctx, root); } +static void test_dlq_respects_size_limit(void) +{ + char root[256], rejdir[256], latest[1024]; + struct flb_config *ctx = NULL; + struct cio_chunk *src = NULL; + int rc; + const char *payload = "{\"msg\":\"this payload exceeds 16 bytes\"}\n"; + + tmpdir_for(root, sizeof(root), "size-limit"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + ctx->storage_rejected_limit = flb_strdup("16"); + TEST_CHECK(ctx->storage_rejected_limit != NULL); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "limit_in", + "limit-0-0000000000.000000000.flb", + payload); + TEST_CHECK(src != NULL); + + rc = flb_storage_quarantine_chunk(ctx, src, + "tag.limit", 500, "out_http"); + TEST_CHECK(rc != 0); + + TEST_CHECK(find_latest_flb(rejdir, latest, sizeof(latest)) != 0); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + TEST_LIST = { { "dlq_copy_from_fs_chunk", test_dlq_copy_from_fs_chunk }, { "dlq_disabled_no_copy", test_dlq_disabled_no_copy }, { "dlq_restores_chunk_state_when_initially_down", test_dlq_restores_chunk_state_when_initially_down }, { "dlq_preserves_chunk_state_when_initially_up", test_dlq_preserves_chunk_state_when_initially_up }, + { "dlq_respects_size_limit", test_dlq_respects_size_limit }, { NULL, NULL } };