diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1e8f2f0783..7709d318387 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -251,6 +251,11 @@ struct flb_config { char *storage_type; /* global storage type */ int storage_inherit; /* apply storage type to inputs */ + /* DLQ for non-retriable output failures */ + int storage_keep_rejected; /* 0/1 */ + char *storage_rejected_path; /* relative to storage_path, default "rejected" */ + void *storage_rejected_stream; /* NULL until first use */ + /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB struct mk_list sqldb_list; @@ -411,6 +416,9 @@ enum conf_type { #define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" #define FLB_CONF_STORAGE_TYPE "storage.type" #define FLB_CONF_STORAGE_INHERIT "storage.inherit" +/* Storage DLQ */ +#define FLB_CONF_STORAGE_KEEP_REJECTED "storage.keep.rejected" +#define FLB_CONF_STORAGE_REJECTED_PATH "storage.rejected.path" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index 57b50e19016..854216b1038 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -72,6 +72,8 @@ static inline char *flb_storage_get_type(int type) return NULL; } +struct flb_input_instance; + int flb_storage_create(struct flb_config *ctx); int flb_storage_input_create(struct cio_ctx *cio, struct flb_input_instance *in); @@ -85,4 +87,11 @@ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_met void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks); +/* DLQ */ +int flb_storage_quarantine_chunk(struct flb_config *ctx, + struct cio_chunk *ch, + const char *tag, + int status_code, + const char *out_name); + #endif diff --git a/src/flb_config.c b/src/flb_config.c index ddfdd010a1c..e61ff331d0f 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -163,6 +163,13 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_INHERIT, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, storage_inherit)}, + /* Storage / DLQ */ + {FLB_CONF_STORAGE_KEEP_REJECTED, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, storage_keep_rejected)}, + {FLB_CONF_STORAGE_REJECTED_PATH, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, storage_rejected_path)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, @@ -312,6 +319,7 @@ struct flb_config *flb_config_init() config->storage_type = NULL; config->storage_inherit = FLB_FALSE; config->storage_bl_flush_on_shutdown = FLB_FALSE; + config->storage_rejected_path = NULL; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; config->json_escape_unicode = FLB_TRUE; @@ -573,6 +581,9 @@ void flb_config_exit(struct flb_config *config) if (config->storage_bl_mem_limit) { flb_free(config->storage_bl_mem_limit); } + if (config->storage_rejected_path) { + flb_free(config->storage_rejected_path); + } #ifdef FLB_HAVE_STREAM_PROCESSOR if (config->stream_processor_file) { diff --git a/src/flb_engine.c b/src/flb_engine.c index d7fcd7a6223..c2ac94b414a 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -231,6 +231,50 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance ((double)ins->total_limit_size)); } +static void handle_dlq_if_available(struct flb_config *config, + struct flb_task *task, + struct flb_output_instance *ins, + int status_code /* pass 0 if unknown */) +{ + const char *tag_buf = NULL; + int tag_len = 0; + flb_sds_t tag_sds = NULL; + const char *tag = NULL; + const char *out = NULL; + struct flb_input_chunk *ic; + struct cio_chunk *cio_ch; + + if (!config || !config->storage_keep_rejected || !task || !task->ic || !ins) { + return; + } + + ic = (struct flb_input_chunk *) task->ic; + + if (!ic || !ic->chunk) { + return; + } + + /* Obtain tag from the input chunk API (no direct field available) */ + if (flb_input_chunk_get_tag(ic, &tag_buf, &tag_len) == 0 && tag_buf && tag_len > 0) { + tag_sds = flb_sds_create_len(tag_buf, tag_len); /* make it NUL-terminated */ + tag = tag_sds; + } + else { + /* Fallback: use input instance name */ + tag = flb_input_name(task->i_ins); + } + + out = flb_output_name(ins); + cio_ch = (struct cio_chunk *) ic->chunk; /* ic->chunk is a cio_chunk* under the hood */ + + /* Copy bytes into DLQ stream (filesystem) */ + (void) flb_storage_quarantine_chunk(config, cio_ch, tag, status_code, out); + + if (tag_sds) { + flb_sds_destroy(tag_sds); + } +} + static inline int handle_output_event(uint64_t ts, struct flb_config *config, uint64_t val) @@ -353,6 +397,8 @@ static inline int handle_output_event(uint64_t ts, } else if (ret == FLB_RETRY) { if (ins->retry_limit == FLB_OUT_RETRY_NONE) { + handle_dlq_if_available(config, task, ins, 0); + /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, 1, (char *[]) {name}); @@ -388,6 +434,8 @@ static inline int handle_output_event(uint64_t ts, * - It reached the maximum number of re-tries */ + handle_dlq_if_available(config, task, ins, 0); + /* cmetrics */ cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, @@ -429,6 +477,8 @@ static inline int handle_output_event(uint64_t ts, * memory available or we ran out of file descriptors. */ if (retry_seconds == -1) { + handle_dlq_if_available(config, task, ins, 0); + flb_warn("[engine] retry for chunk '%s' could not be scheduled: " "input=%s > output=%s", flb_input_chunk_get_name(task->ic), @@ -465,6 +515,7 @@ static inline int handle_output_event(uint64_t ts, } } else if (ret == FLB_ERROR) { + handle_dlq_if_available(config, task, ins, 0); /* cmetrics */ cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, diff --git a/src/flb_storage.c b/src/flb_storage.c index 0f0ff93f1ed..ae619e57cc6 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -747,6 +747,151 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch *fs_chunks = storage_st.chunks_fs; } +/* Replace '/', '\\' and ':' with '_' to make filename components safe */ +static inline void sanitize_name_component(const char *in, char *out, size_t out_sz) +{ + size_t i; + + if (out_sz == 0) { + return; + } + + if (!in) { + in = "no-tag"; + } + + for (i = 0; i < out_sz - 1 && in[i] != '\0'; i++) { + out[i] = (in[i] == '/' || in[i] == '\\' || in[i] == ':') ? '_' : in[i]; + } + out[i] = '\0'; +} + +static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx) +{ +#ifdef CIO_HAVE_BACKEND_FILESYSTEM + struct cio_stream *st; + const char *name; + + if (!ctx || !ctx->cio) { + return NULL; + } + if (!ctx->storage_keep_rejected || !ctx->storage_path) { + return NULL; + } + + if (ctx->storage_rejected_stream) { + return ctx->storage_rejected_stream; + } + + name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + + st = cio_stream_get(ctx->cio, name); + if (!st) { + st = cio_stream_create(ctx->cio, name, FLB_STORAGE_FS); + } + if (!st) { + flb_warn("[storage] failed to create rejected stream '%s'", name); + return NULL; + } + + ctx->storage_rejected_stream = st; + return st; +#else + FLB_UNUSED(ctx); + return NULL; +#endif +} + +static inline int flb_storage_chunk_restore_state(struct cio_chunk *src, int was_up, int ret_val) +{ + if (!was_up) { + if (cio_chunk_down(src) != CIO_OK) { + flb_debug("[storage] failed to bring chunk back down"); + } + } + + return ret_val; +} + +int flb_storage_quarantine_chunk(struct flb_config *ctx, + struct cio_chunk *src, + const char *tag, + int status_code, + const char *out_name) +{ +#ifdef CIO_HAVE_BACKEND_FILESYSTEM + struct cio_stream *dlq; + void *buf = NULL; + int was_up = 0; + size_t size = 0; + int err = 0; + char name[256]; + struct cio_chunk *dst; + char safe_tag[128]; + char safe_out[64]; + + if (!ctx || !src) { + return -1; + } + dlq = get_or_create_rejected_stream(ctx); + if (!dlq) { + return -1; + } + + /* Remember original state and bring the chunk up if needed */ + was_up = (cio_chunk_is_up(src) == CIO_TRUE); + if (!was_up) { + if (cio_chunk_up_force(src) != CIO_OK) { + flb_warn("[storage] cannot bring chunk up to copy into DLQ"); + return -1; + } + } + + sanitize_name_component(tag, safe_tag, sizeof(safe_tag)); + sanitize_name_component(out_name ? out_name : "out", safe_out, sizeof(safe_out)); + + /* Compose a simple, unique-ish file name with sanitized pieces */ + snprintf(name, sizeof(name), + "%s_%d_%s_%p.flb", + safe_tag, status_code, safe_out, (void *) src); + + if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) { + flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size); + return flb_storage_chunk_restore_state(src, was_up, -1); + } + + /* Create + write the DLQ copy */ + dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err); + if (!dst) { + flb_warn("[storage] DLQ open failed (err=%d)", err); + flb_free(buf); + return flb_storage_chunk_restore_state(src, was_up, -1); + } + if (cio_chunk_write(dst, buf, size) != CIO_OK || + cio_chunk_sync(dst) != CIO_OK) { + flb_warn("[storage] DLQ write/sync failed"); + cio_chunk_close(dst, CIO_TRUE); + flb_free(buf); + return flb_storage_chunk_restore_state(src, was_up, -1); + } + + cio_chunk_close(dst, CIO_FALSE); + flb_free(buf); + + flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size); + + return flb_storage_chunk_restore_state(src, was_up, 0); +#else + FLB_UNUSED(ctx); + FLB_UNUSED(src); + FLB_UNUSED(tag); + FLB_UNUSED(status_code); + FLB_UNUSED(out_name); + + return -1; +#endif +} + void flb_storage_destroy(struct flb_config *ctx) { struct cio_ctx *cio; diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index a270f1dc3c8..2ba9dc65f87 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -53,6 +53,7 @@ set(UNIT_TESTS_FILES storage_inherit.c unicode.c opentelemetry.c + storage_dlq.c ) # TLS helpers diff --git a/tests/internal/storage_dlq.c b/tests/internal/storage_dlq.c new file mode 100644 index 00000000000..bca4d269079 --- /dev/null +++ b/tests/internal/storage_dlq.c @@ -0,0 +1,593 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "flb_tests_internal.h" + +#ifdef _WIN32 +# define FLB_UNLINK _unlink +# define FLB_RMDIR _rmdir +#else +# define FLB_UNLINK unlink +# define FLB_RMDIR rmdir +#endif + +static int mkpath(const char *p) { +#if FLB_SYSTEM_WINDOWS + if (_mkdir(p) == 0) { + return 0; + } +#else + if (mkdir(p, 0777) == 0) { + return 0; + } +#endif + if (errno == EEXIST) { + return 0; + } + return -1; +} + +static void join_path(char *out, size_t cap, const char *a, const char *b) +{ +#ifdef _WIN32 + _snprintf(out, cap, "%s\\%s", a, b); +#else + snprintf(out, cap, "%s/%s", a, b); +#endif + out[cap - 1] = '\0'; +} + +static void tmpdir_for(char *out, size_t n, const char *name) +{ +#ifdef _WIN32 + DWORD pid = GetCurrentProcessId(); + _snprintf(out, n, "C:\\Windows\\Temp\\flb-dlq-%s-%lu", name, (unsigned long) pid); +#else + snprintf(out, n, "/tmp/flb-dlq-%s-%ld", name, (long) getpid()); +#endif + out[n-1] = '\0'; + mkpath(out); +} + +/* helper: open a DLQ chunk by basename and return its content copy */ +static int read_dlq_chunk_content(struct flb_config *ctx, + const char *rejected_stream_name, + const char *chunk_basename, + void **out_buf, size_t *out_size) +{ + int err = 0; + struct cio_stream *st; + struct cio_chunk *ch; + + *out_buf = NULL; + *out_size = 0; + + st = cio_stream_get(ctx->cio, rejected_stream_name); + if (!st) { + st = cio_stream_create(ctx->cio, rejected_stream_name, FLB_STORAGE_FS); + if (!st) { return -1; } + } + + /* Open existing DLQ file by name */ + ch = cio_chunk_open(ctx->cio, st, chunk_basename, CIO_OPEN, 0, &err); + if (!ch) { + return -1; + } + + /* ensure it's readable */ + if (cio_chunk_is_up(ch) != CIO_TRUE) { + if (cio_chunk_up_force(ch) != CIO_OK) { + cio_chunk_close(ch, CIO_FALSE); + return -1; + } + } + + if (cio_chunk_get_content_copy(ch, out_buf, out_size) != CIO_OK) { + cio_chunk_close(ch, CIO_FALSE); + return -1; + } + + cio_chunk_close(ch, CIO_FALSE); + return 0; +} + +/* tiny binary “contains” (since memmem is non-portable) */ +static int buf_contains(const void *hay, size_t hlen, + const void *needle, size_t nlen) +{ + size_t i; + if (nlen == 0 || hlen < nlen) return 0; + const unsigned char *h = (const unsigned char *) hay; + const unsigned char *n = (const unsigned char *) needle; + + for (i = 0; i + nlen <= hlen; i++) { + if (h[i] == n[0] && memcmp(h + i, n, nlen) == 0) { + return 1; + } + } + return 0; +} + +#if FLB_SYSTEM_WINDOWS +static int find_latest_flb_win32(const char *dir, char *out, size_t out_sz) +{ + WIN32_FIND_DATAA ffd; + HANDLE h = INVALID_HANDLE_VALUE; + char pattern[1024]; + ULONGLONG best_ts = 0ULL; + char best_name[MAX_PATH] = {0}; + + _snprintf(pattern, sizeof(pattern), "%s\\*.flb", dir); + pattern[sizeof(pattern)-1] = '\0'; + + h = FindFirstFileA(pattern, &ffd); + if (h == INVALID_HANDLE_VALUE) { + return -1; + } + + do { + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + ULONGLONG ts = (((ULONGLONG)ffd.ftLastWriteTime.dwHighDateTime) << 32) | + (ULONGLONG)ffd.ftLastWriteTime.dwLowDateTime; + if (ts >= best_ts) { + best_ts = ts; + strncpy(best_name, ffd.cFileName, sizeof(best_name)-1); + best_name[sizeof(best_name)-1] = '\0'; + } + } while (FindNextFileA(h, &ffd)); + + FindClose(h); + + if (best_name[0] == '\0') { + return -1; + } + + join_path(out, out_sz, dir, best_name); + return 0; +} +#else +static int find_latest_flb_unix(const char *dir, char *out, size_t out_sz) +{ + DIR *d = opendir(dir); + struct dirent *e; + time_t best_t = 0; + char best_path[1024] = {0}; + struct stat st; + char full[1024]; + + if (!d) return -1; + + while ((e = readdir(d)) != NULL) { + size_t len = strlen(e->d_name); + if (len < 5) { + continue; + } + if (strcmp(e->d_name + (len - 4), ".flb") != 0) { + continue; + } + + join_path(full, sizeof(full), dir, e->d_name); + if (stat(full, &st) == 0) { + if (st.st_mtime >= best_t) { + best_t = st.st_mtime; + strncpy(best_path, full, sizeof(best_path)-1); + } + } + } + closedir(d); + + if (best_path[0] == '\0') { + return -1; + } + strncpy(out, best_path, out_sz - 1); + out[out_sz-1] = '\0'; + return 0; +} +#endif + +/* find the most recent *.flb file in dir; write full path into out */ +static int find_latest_flb(const char *dir, char *out, size_t out_sz) +{ +#if FLB_SYSTEM_WINDOWS + return find_latest_flb_win32(dir, out, out_sz); +#else + return find_latest_flb_unix(dir, out, out_sz); +#endif +} + +static void free_ctx(struct flb_config *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->cio) { + cio_destroy(ctx->cio); + ctx->cio = NULL; + } + + flb_config_exit(ctx); +} + +static const char *get_dlq_stream_name(struct flb_config *ctx) +{ + if (ctx->storage_rejected_stream) { + return ((struct cio_stream *)ctx->storage_rejected_stream)->name; + } + return ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; +} + +static void delete_all_chunks_in_stream(struct cio_ctx *cio, struct cio_stream *st) +{ + struct mk_list *head; + struct mk_list *tmp; + struct cio_chunk *ch; + + if (!cio || !st) { + return; + } + + mk_list_foreach_safe(head, tmp, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + + char *name_copy = flb_strdup(ch->name); + if (!name_copy) { + continue; + } + + cio_chunk_close(ch, CIO_FALSE); + + (void) cio_chunk_delete(cio, st, name_copy); + + flb_free(name_copy); + } +} + +static void rmdir_stream_dir(const char *root, const char *stream_name) +{ + if (!root || !stream_name) { + return; + } + + char path[1024]; + snprintf(path, sizeof(path), "%s/%s", root, stream_name); + path[sizeof(path)-1] = '\0'; + + /* Best-effort: ignore errors */ + (void) rmdir(path); +} + +/* Minimal POSIX rm -rf for the whole temp tree after CIO is gone */ +#if FLB_SYSTEM_WINDOWS +static void rm_rf_best_effort_win32(const char *root) +{ + WIN32_FIND_DATAA ffd; + HANDLE h = INVALID_HANDLE_VALUE; + char pattern[1024], p[1024]; + + _snprintf(pattern, sizeof(pattern), "%s\\*", + root ? root : ""); + pattern[sizeof(pattern)-1] = '\0'; + + h = FindFirstFileA(pattern, &ffd); + if (h == INVALID_HANDLE_VALUE) { + /* try removing root itself */ + (void) FLB_RMDIR(root); + return; + } + + do { + const char *name = ffd.cFileName; + if (!strcmp(name, ".") || !strcmp(name, "..")) continue; + + join_path(p, sizeof(p), root, name); + + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + rm_rf_best_effort_win32(p); + } + else { + /* clear read-only if needed */ + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_READONLY) { + SetFileAttributesA(p, + ffd.dwFileAttributes & ~FILE_ATTRIBUTE_READONLY); + } + (void) DeleteFileA(p); + } + } while (FindNextFileA(h, &ffd)); + + FindClose(h); + (void) FLB_RMDIR(root); +} +#else +static void rm_rf_best_effort_unix(const char *root) +{ + DIR *d = opendir(root); + struct dirent *e; + char p[1024]; + struct stat st; + + if (!d) { + (void) FLB_RMDIR(root); + return; + } + while ((e = readdir(d)) != NULL) { + if (!strcmp(e->d_name, ".") || !strcmp(e->d_name, "..")) { + continue; + } + join_path(p, sizeof(p), root, e->d_name); + if (lstat(p, &st) != 0) { + continue; + } + if (S_ISDIR(st.st_mode)) { + rm_rf_best_effort_unix(p); + } + else { + (void) FLB_UNLINK(p); + } + } + closedir(d); + (void) FLB_RMDIR(root); +} +#endif + +static void rm_rf_best_effort(const char *root) +{ +#if FLB_SYSTEM_WINDOWS + rm_rf_best_effort_win32(root); +#else + rm_rf_best_effort_unix(root); +#endif +} + +static void test_cleanup_with_cio(struct flb_config *ctx, const char *root) +{ + if (ctx && ctx->cio) { + struct cio_stream *st_in = cio_stream_get(ctx->cio, "in_tail"); + struct cio_stream *st_dlq = cio_stream_get(ctx->cio, get_dlq_stream_name(ctx)); + + delete_all_chunks_in_stream(ctx->cio, st_in); + delete_all_chunks_in_stream(ctx->cio, st_dlq); + + rmdir_stream_dir(root, "in_tail"); + rmdir_stream_dir(root, get_dlq_stream_name(ctx)); + } + + free_ctx(ctx); + + rm_rf_best_effort(root); +} + +static struct flb_config *make_ctx_fs(const char *root, const char *rejected) +{ + struct cio_options opts; + struct flb_config *ctx = flb_config_init(); + TEST_CHECK(ctx != NULL); + + ctx->storage_path = flb_strdup(root); + ctx->storage_keep_rejected = FLB_TRUE; + ctx->storage_rejected_path = flb_strdup(rejected); + + cio_options_init(&opts); + opts.root_path = ctx->storage_path; + opts.flags = CIO_OPEN | CIO_CHECKSUM; + opts.log_cb = NULL; + + ctx->cio = cio_create(&opts); + TEST_CHECK(ctx->cio != NULL); + + /* mimic engine behavior: load + qsort */ + TEST_CHECK(cio_load(ctx->cio, NULL) == 0); + cio_qsort(ctx->cio, NULL); + + return ctx; +} + +static struct cio_chunk *make_src_chunk(struct flb_config *ctx, + int storage_type, /* FLB_STORAGE_FS */ + const char *stream_name, + const char *file_name, + const char *payload) +{ + int err = 0; + int cio_type = storage_type; + struct cio_stream *st = NULL; + struct cio_chunk *ch = NULL; + + st = cio_stream_get(ctx->cio, stream_name); + if (!st) { + st = cio_stream_create(ctx->cio, stream_name, cio_type); + } + TEST_CHECK(st != NULL); + + ch = cio_chunk_open(ctx->cio, st, file_name, CIO_OPEN, 0, &err); + TEST_CHECK(ch != NULL); + + TEST_CHECK(cio_chunk_write(ch, payload, strlen(payload)) == CIO_OK); + TEST_CHECK(cio_chunk_sync(ch) == CIO_OK); + + return ch; +} + +static void test_dlq_copy_from_fs_chunk(void) +{ + char root[256], rejdir[256], latest[1024]; + struct cio_chunk *src = NULL; + struct flb_config *ctx = NULL; + int rc; + const char *payload = + "{\"time\":\"2024-09-03 14:51:05.064735+00:00\",\"msg\":\"oops FS\"}\n"; + char latest_copy[1024]; + void *content = NULL; + size_t content_size = 0; + char *base = NULL; + + tmpdir_for(root, sizeof(root), "fs"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "in_tail", + "t0-0-0000000000.000000000.flb", + payload); + + rc = flb_storage_quarantine_chunk(ctx, src, + "kube.var.log.containers.test", + 400, "http"); + TEST_CHECK(rc == 0); + + TEST_CHECK(find_latest_flb(rejdir, latest, sizeof(latest)) == 0); + + /* get just the filename (basename) */ + strncpy(latest_copy, latest, sizeof(latest_copy)-1); + latest_copy[sizeof(latest_copy)-1] = '\0'; + base = basename(latest_copy); + + TEST_CHECK(read_dlq_chunk_content(ctx, "rejected", base, &content, &content_size) == 0); + TEST_CHECK(content != NULL); + TEST_CHECK(content_size > 0); + TEST_CHECK(buf_contains(content, content_size, payload, strlen(payload)) == 1); + + flb_free(content); + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +static void test_dlq_disabled_no_copy(void) +{ + char root[256], rejdir[256], latest[1024]; + struct cio_chunk *src = NULL; + struct flb_config *ctx = NULL; + struct cio_options opts; + int rc; + const char *payload = "{\"msg\":\"should not be copied\"}\n"; + + tmpdir_for(root, sizeof(root), "disabled"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + /* DLQ disabled */ + ctx = flb_config_init(); + TEST_CHECK(ctx != NULL); + + ctx->storage_path = flb_strdup(root); + ctx->storage_keep_rejected = FLB_FALSE; + ctx->storage_rejected_path = flb_strdup("rejected"); + + cio_options_init(&opts); + opts.root_path = ctx->storage_path; + opts.flags = CIO_OPEN; + ctx->cio = cio_create(&opts); + TEST_CHECK(ctx->cio != NULL); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "in_tail", + "t1-0.flb", + payload); + + /* Attempt to copy: should fail because DLQ is disabled */ + rc = flb_storage_quarantine_chunk(ctx, src, + "tag", 400, "out"); + TEST_CHECK(rc != 0); + + TEST_CHECK(find_latest_flb(rejdir, latest, sizeof(latest)) != 0); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +static void test_dlq_restores_chunk_state_when_initially_down(void) +{ + char root[256], rejdir[256]; + struct flb_config *ctx = NULL; + struct cio_chunk *src = NULL; + int rc; + const char *payload = "{\"msg\":\"state-restore-down\"}\n"; + + tmpdir_for(root, sizeof(root), "restore-down"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + + /* Create the chunk */ + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "in_tail", + "restore-down-0-0000000000.000000000.flb", + payload); + TEST_CHECK(src != NULL); + + if (cio_chunk_is_up(src) == CIO_TRUE) { + TEST_CHECK(cio_chunk_down(src) == CIO_OK); + } + TEST_CHECK(cio_chunk_is_up(src) != CIO_TRUE); + + rc = flb_storage_quarantine_chunk(ctx, src, + "tag.down", 500, "out_http"); + TEST_CHECK(rc == 0); + + TEST_CHECK(cio_chunk_is_up(src) != CIO_TRUE); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +static void test_dlq_preserves_chunk_state_when_initially_up(void) +{ + char root[256], rejdir[256]; + struct flb_config *ctx = NULL; + struct cio_chunk *src = NULL; + int rc; + const char *payload = "{\"msg\":\"state-preserve-up\"}\n"; + + tmpdir_for(root, sizeof(root), "preserve-up"); + snprintf(rejdir, sizeof(rejdir), "%s/%s", root, "rejected"); + mkpath(rejdir); + + ctx = make_ctx_fs(root, "rejected"); + + src = make_src_chunk(ctx, FLB_STORAGE_FS, + "preserve_in", + "preserve-up-0-0000000000.000000000.flb", + payload); + TEST_CHECK(src != NULL); + + if (cio_chunk_is_up(src) != CIO_TRUE) { + TEST_CHECK(cio_chunk_up_force(src) == CIO_OK); + } + TEST_CHECK(cio_chunk_is_up(src) == CIO_TRUE); + + rc = flb_storage_quarantine_chunk(ctx, src, + "tag.up", 502, "out_es"); + TEST_CHECK(rc == 0); + + TEST_CHECK(cio_chunk_is_up(src) == CIO_TRUE); + + cio_chunk_close(src, CIO_FALSE); + test_cleanup_with_cio(ctx, root); +} + +TEST_LIST = { + { "dlq_copy_from_fs_chunk", test_dlq_copy_from_fs_chunk }, + { "dlq_disabled_no_copy", test_dlq_disabled_no_copy }, + { "dlq_restores_chunk_state_when_initially_down", test_dlq_restores_chunk_state_when_initially_down }, + { "dlq_preserves_chunk_state_when_initially_up", test_dlq_preserves_chunk_state_when_initially_up }, + { NULL, NULL } +};