Skip to content

Commit e4be6a9

Browse files
committed
config: storage: Implement dlq for filesystem chunks
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 9de8ce5 commit e4be6a9

File tree

3 files changed

+119
-0
lines changed

3 files changed

+119
-0
lines changed

include/fluent-bit/flb_config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ struct flb_config {
254254
/* DLQ for non-retriable output failures */
255255
int storage_keep_rejected; /* 0/1 */
256256
char *storage_rejected_path; /* relative to storage_path, default "rejected" */
257+
void *storage_rejected_stream; /* NULL until first use */
257258

258259
/* Embedded SQL Database support (SQLite3) */
259260
#ifdef FLB_HAVE_SQLDB

include/fluent-bit/flb_storage.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,11 @@ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_met
8585

8686
void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);
8787

88+
/* DLQ */
89+
int flb_storage_quarantine_chunk(struct flb_config *ctx,
90+
struct cio_chunk *ch,
91+
const char *tag,
92+
int status_code,
93+
const char *out_name);
94+
8895
#endif

src/flb_storage.c

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,117 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch
747747
*fs_chunks = storage_st.chunks_fs;
748748
}
749749

750+
751+
static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx)
752+
{
753+
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
754+
struct cio_stream *st;
755+
const char *name;
756+
757+
if (!ctx || !ctx->cio) {
758+
return NULL;
759+
}
760+
if (!ctx->storage_keep_rejected || !ctx->storage_path) {
761+
return NULL;
762+
}
763+
764+
if (ctx->storage_rejected_stream) {
765+
return ctx->storage_rejected_stream;
766+
}
767+
768+
name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected";
769+
770+
st = cio_stream_get(ctx->cio, name);
771+
if (!st) {
772+
st = cio_stream_create(ctx->cio, name, FLB_STORAGE_FS);
773+
}
774+
if (!st) {
775+
flb_warn("[storage] failed to create rejected stream '%s'", name);
776+
return NULL;
777+
}
778+
779+
ctx->storage_rejected_stream = st;
780+
return st;
781+
#else
782+
FLB_UNUSED(ctx);
783+
return NULL;
784+
#endif
785+
}
786+
787+
int flb_storage_quarantine_chunk(struct flb_config *ctx,
788+
struct cio_chunk *src,
789+
const char *tag,
790+
int status_code,
791+
const char *out_name)
792+
{
793+
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
794+
struct cio_stream *dlq;
795+
void *buf = NULL;
796+
size_t size = 0;
797+
int err = 0;
798+
char name[256];
799+
struct cio_chunk *dst;
800+
801+
if (!ctx || !src) {
802+
return -1;
803+
}
804+
dlq = get_or_create_rejected_stream(ctx);
805+
if (!dlq) {
806+
return -1;
807+
}
808+
809+
if (cio_chunk_is_up(src) != CIO_TRUE) {
810+
if (cio_chunk_up_force(src) != CIO_OK) {
811+
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
812+
return -1;
813+
}
814+
}
815+
816+
if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
817+
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
818+
return -1;
819+
}
820+
821+
/* Compose a simple, unique-ish file name */
822+
snprintf(name, sizeof(name),
823+
"%s_%d_%s_%p.flb",
824+
tag ? tag : "no-tag",
825+
status_code,
826+
out_name ? out_name : "out",
827+
(void *) src);
828+
829+
/* Create + write the DLQ copy */
830+
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
831+
if (!dst) {
832+
flb_warn("[storage] DLQ open failed (err=%d)", err);
833+
flb_free(buf);
834+
return -1;
835+
}
836+
if (cio_chunk_write(dst, buf, size) != CIO_OK ||
837+
cio_chunk_sync(dst) != CIO_OK) {
838+
flb_warn("[storage] DLQ write/sync failed");
839+
cio_chunk_close(dst, CIO_TRUE);
840+
flb_free(buf);
841+
return -1;
842+
}
843+
844+
cio_chunk_close(dst, CIO_FALSE);
845+
flb_free(buf);
846+
847+
flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);
848+
849+
return 0;
850+
#else
851+
FLB_UNUSED(ctx);
852+
FLB_UNUSED(src);
853+
FLB_UNUSED(tag);
854+
FLB_UNUSED(status_code);
855+
FLB_UNUSED(out_name);
856+
857+
return -1;
858+
#endif
859+
}
860+
750861
void flb_storage_destroy(struct flb_config *ctx)
751862
{
752863
struct cio_ctx *cio;

0 commit comments

Comments
 (0)