Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions plugins/in_storage_backlog/sb.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,25 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
return 0;
}

static inline int sb_is_rejected_stream(struct flb_config *config,
struct cio_stream *stream)
{
const char *rp;

if (!config || !stream || !stream->name) {
return FLB_FALSE;
}

if (config->storage_keep_rejected != FLB_TRUE) {
return FLB_FALSE;
}

rp = config->storage_rejected_path ?
config->storage_rejected_path : "rejected";

return strcmp(stream->name, rp) == 0;
}

int sb_segregate_chunks(struct flb_config *config)
{
int ret;
Expand All @@ -357,6 +376,12 @@ int sb_segregate_chunks(struct flb_config *config)
mk_list_foreach(stream_iterator, &context->cio->streams) {
stream = mk_list_entry(stream_iterator, struct cio_stream, _head);

/* DLQ stream is not part of backlog. Just skip. */
if (sb_is_rejected_stream(config, stream)) {
flb_debug("[storage backlog] skipping DLQ stream '%s'", stream->name);
continue;
}

mk_list_foreach_safe(chunk_iterator, tmp, &stream->chunks) {
chunk = mk_list_entry(chunk_iterator, struct cio_chunk, _head);

Expand Down
Loading