Skip to content

Commit ee5035b

Browse files
committed
engine: Add a capability to handle dead letter queue for preserving invalid chunks for later verifications
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent e4be6a9 commit ee5035b

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

src/flb_engine.c

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,50 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance
231231
((double)ins->total_limit_size));
232232
}
233233

234+
static void handle_dlq_if_available(struct flb_config *config,
235+
struct flb_task *task,
236+
struct flb_output_instance *ins,
237+
int status_code /* pass 0 if unknown */)
238+
{
239+
const char *tag_buf = NULL;
240+
int tag_len = 0;
241+
flb_sds_t tag_sds = NULL;
242+
const char *tag = NULL;
243+
const char *out = NULL;
244+
struct flb_input_chunk *ic;
245+
struct cio_chunk *cio_ch;
246+
247+
if (!config || !config->storage_keep_rejected || !task || !task->ic || !ins) {
248+
return;
249+
}
250+
251+
ic = (struct flb_input_chunk *) task->ic;
252+
253+
if (!ic || !ic->chunk) {
254+
return;
255+
}
256+
257+
/* Obtain tag from the input chunk API (no direct field available) */
258+
if (flb_input_chunk_get_tag(ic, &tag_buf, &tag_len) == 0 && tag_buf && tag_len > 0) {
259+
tag_sds = flb_sds_create_len(tag_buf, tag_len); /* make it NUL-terminated */
260+
tag = tag_sds;
261+
}
262+
else {
263+
/* Fallback: use input instance name */
264+
tag = flb_input_name(task->i_ins);
265+
}
266+
267+
out = flb_output_name(ins);
268+
cio_ch = (struct cio_chunk *) ic->chunk; /* ic->chunk is a cio_chunk* under the hood */
269+
270+
/* Copy bytes into DLQ stream (filesystem) */
271+
(void) flb_storage_quarantine_chunk(config, cio_ch, tag, status_code, out);
272+
273+
if (tag_sds) {
274+
flb_sds_destroy(tag_sds);
275+
}
276+
}
277+
234278
static inline int handle_output_event(uint64_t ts,
235279
struct flb_config *config,
236280
uint64_t val)
@@ -353,6 +397,8 @@ static inline int handle_output_event(uint64_t ts,
353397
}
354398
else if (ret == FLB_RETRY) {
355399
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
400+
handle_dlq_if_available(config, task, ins, 0);
401+
356402
/* cmetrics: output_dropped_records_total */
357403
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
358404
1, (char *[]) {name});
@@ -388,6 +434,8 @@ static inline int handle_output_event(uint64_t ts,
388434
* - It reached the maximum number of re-tries
389435
*/
390436

437+
handle_dlq_if_available(config, task, ins, 0);
438+
391439
/* cmetrics */
392440
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name});
393441
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
@@ -429,6 +477,8 @@ static inline int handle_output_event(uint64_t ts,
429477
* memory available or we ran out of file descriptors.
430478
*/
431479
if (retry_seconds == -1) {
480+
handle_dlq_if_available(config, task, ins, 0);
481+
432482
flb_warn("[engine] retry for chunk '%s' could not be scheduled: "
433483
"input=%s > output=%s",
434484
flb_input_chunk_get_name(task->ic),
@@ -465,6 +515,7 @@ static inline int handle_output_event(uint64_t ts,
465515
}
466516
}
467517
else if (ret == FLB_ERROR) {
518+
handle_dlq_if_available(config, task, ins, 0);
468519
/* cmetrics */
469520
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name});
470521
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,

0 commit comments

Comments
 (0)