Skip to content

Conversation

@cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Jan 23, 2026

Previously, lacking of handling DLQ preservations on restarting.
We ought to preserve them on restarting.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
pipeline:
  inputs:
    - name: dummy
      storage.type: filesystem
      tag: test
      samples: 3
  outputs:
    - name: http
      host: localhostsss
      port: 8080
      match: '*'
    - name: stdout
      match: '*'
service:
  flush: 1
  storage.path: ./storage
  storage.keep.rejected: on
  storage.rejected.path: rejected
  • Debug log output from testing the change

1st attempt:

Fluent Bit v5.0.0
* Copyright (C) 2015-2025 The Fluent Bit Authors
* Fluent Bit is a CNCF graduated project under the Fluent organization
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____           _            
|  ___| |                | |   | ___ (_) |         |  ___||  _  |         | |           
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   _|___ \ | |/' |______ __| | _____   __
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \|  /| |______/ _` |/ _ \ \ / /
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V //\__/ /\ |_/ /     | (_| |  __/\ V / 
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)\___/       \__,_|\___| \_/


[2026/01/23 14:31:18.804846000] [ info] [fluent bit] version=5.0.0, commit=9cf4c9f648, pid=8856
[2026/01/23 14:31:18.805010000] [ info] [storage] created root path ./storage
[2026/01/23 14:31:18.805150000] [ info] [storage] ver=1.4.0, type=memory+filesystem, sync=normal, checksum=off, max_chunks_up=128
[2026/01/23 14:31:18.805158000] [ info] [storage] backlog input plugin: storage_backlog.1
[2026/01/23 14:31:18.805163000] [ info] [simd    ] NEON
[2026/01/23 14:31:18.805168000] [ info] [cmetrics] version=1.0.6
[2026/01/23 14:31:18.805185000] [ info] [ctraces ] version=0.6.6
[2026/01/23 14:31:18.805346000] [ info] [input:dummy:dummy.0] initializing
[2026/01/23 14:31:18.805352000] [ info] [input:dummy:dummy.0] storage_strategy='filesystem' (memory + filesystem)
[2026/01/23 14:31:18.805471000] [ info] [input:storage_backlog:storage_backlog.1] initializing
[2026/01/23 14:31:18.805478000] [ info] [input:storage_backlog:storage_backlog.1] storage_strategy='memory' (memory only)
[2026/01/23 14:31:18.805503000] [ info] [input:storage_backlog:storage_backlog.1] queue memory limit: 95.4M
[2026/01/23 14:31:18.805753000] [ info] [output:http:http.0] worker #0 started
[2026/01/23 14:31:18.805768000] [ info] [output:http:http.0] worker #1 started
[2026/01/23 14:31:18.805813000] [ info] [output:stdout:stdout.1] worker #0 started
[2026/01/23 14:31:18.805830000] [ info] [sp] stream processor started
[2026/01/23 14:31:18.805884000] [ info] [engine] Shutdown Grace Period=5, Shutdown Input Grace Period=2
[0] test: [[1769146279.806656000, {}], {"message"=>"dummy"}]
[2026/01/23 14:31:20.852548000] [ warn] [net] getaddrinfo(host='localhostsss', err=4): Domain name not found
[2026/01/23 14:31:20.852609000] [error] [output:http:http.0] no upstream connections available to localhostsss:8080
[2026/01/23 14:31:20.852727000] [ warn] [engine] failed to flush chunk '8856-1769146279.806978000.flb', retry in 7 seconds: task_id=0, input=dummy.0 > output=http.0 (out_id=0)
[0] test: [[1769146280.807698000, {}], {"message"=>"dummy"}]
[2026/01/23 14:31:21.841990000] [ warn] [net] getaddrinfo(host='localhostsss', err=4): Domain name not found
[2026/01/23 14:31:21.842047000] [error] [output:http:http.0] no upstream connections available to localhostsss:8080
[2026/01/23 14:31:21.842149000] [ warn] [engine] failed to flush chunk '8856-1769146280.807799000.flb', retry in 8 seconds: task_id=1, input=dummy.0 > output=http.0 (out_id=0)
[0] test: [[1769146281.811844000, {}], {"message"=>"dummy"}]
[2026/01/23 14:31:22.835107000] [ warn] [net] getaddrinfo(host='localhostsss', err=4): Domain name not found
[2026/01/23 14:31:22.835154000] [error] [output:http:http.0] no upstream connections available to localhostsss:8080
[2026/01/23 14:31:22.835241000] [ warn] [engine] failed to flush chunk '8856-1769146281.811927000.flb', retry in 9 seconds: task_id=2, input=dummy.0 > output=http.0 (out_id=0)
[2026/01/23 14:31:27.884364000] [ warn] [net] getaddrinfo(host='localhostsss', err=4): Domain name not found
[2026/01/23 14:31:27.884415000] [error] [output:http:http.0] no upstream connections available to localhostsss:8080
[2026/01/23 14:31:27.885053000] [ info] [storage] quarantined rejected chunk into DLQ stream (bytes=36)
[2026/01/23 14:31:27.885097000] [error] [engine] chunk '8856-1769146279.806978000.flb' cannot be retried: task_id=0, input=dummy.0 > output=http.0
[2026/01/23 14:31:29.879828000] [ warn] [net] getaddrinfo(host='localhostsss', err=4): Domain name not found
[2026/01/23 14:31:29.879897000] [error] [output:http:http.0] no upstream connections available to localhostsss:8080
[2026/01/23 14:31:29.880612000] [ info] [storage] quarantined rejected chunk into DLQ stream (bytes=36)
[2026/01/23 14:31:29.880648000] [error] [engine] chunk '8856-1769146280.807799000.flb' cannot be retried: task_id=1, input=dummy.0 > output=http.0
[2026/01/23 14:31:31.872571000] [ warn] [net] getaddrinfo(host='localhostsss', err=4): Domain name not found
[2026/01/23 14:31:31.872631000] [error] [output:http:http.0] no upstream connections available to localhostsss:8080
[2026/01/23 14:31:31.873253000] [ info] [storage] quarantined rejected chunk into DLQ stream (bytes=36)
[2026/01/23 14:31:31.873281000] [error] [engine] chunk '8856-1769146281.811927000.flb' cannot be retried: task_id=2, input=dummy.0 > output=http.0
^C[2026/01/23 14:31:32] [engine] caught signal (SIGINT)
[2026/01/23 14:31:32.705447000] [ info] [input] pausing dummy.0
[2026/01/23 14:31:32.705509000] [ info] [input] pausing storage_backlog.1
[2026/01/23 14:31:32.705569000] [ info] [output:http:http.0] thread worker #0 stopping...
[2026/01/23 14:31:32.705675000] [ info] [output:http:http.0] thread worker #0 stopped
[2026/01/23 14:31:32.705968000] [ info] [output:http:http.0] thread worker #1 stopping...
[2026/01/23 14:31:32.706080000] [ info] [output:http:http.0] thread worker #1 stopped
[2026/01/23 14:31:32.706401000] [ info] [output:stdout:stdout.1] thread worker #0 stopping...
[2026/01/23 14:31:32.706498000] [ info] [output:stdout:stdout.1] thread worker #0 stopped

2nd attempt:

Fluent Bit v5.0.0
* Copyright (C) 2015-2025 The Fluent Bit Authors
* Fluent Bit is a CNCF graduated project under the Fluent organization
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____           _            
|  ___| |                | |   | ___ (_) |         |  ___||  _  |         | |           
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   _|___ \ | |/' |______ __| | _____   __
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \|  /| |______/ _` |/ _ \ \ / /
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V //\__/ /\ |_/ /     | (_| |  __/\ V / 
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)\___/       \__,_|\___| \_/


[2026/01/23 14:32:16.796392000] [ info] Configuration:
[2026/01/23 14:32:16.796402000] [ info]  flush time     | 1.000000 seconds
[2026/01/23 14:32:16.796409000] [ info]  grace          | 5 seconds
[2026/01/23 14:32:16.796413000] [ info]  daemon         | 0
[2026/01/23 14:32:16.796417000] [ info] ___________
[2026/01/23 14:32:16.796421000] [ info]  inputs:
[2026/01/23 14:32:16.796424000] [ info]      dummy
[2026/01/23 14:32:16.796427000] [ info] ___________
[2026/01/23 14:32:16.796430000] [ info]  filters:
[2026/01/23 14:32:16.796433000] [ info] ___________
[2026/01/23 14:32:16.796436000] [ info]  outputs:
[2026/01/23 14:32:16.796439000] [ info]      http.0
[2026/01/23 14:32:16.796442000] [ info]      stdout.1
[2026/01/23 14:32:16.796444000] [ info] ___________
[2026/01/23 14:32:16.796447000] [ info]  collectors:
[2026/01/23 14:32:16.796614000] [ info] [fluent bit] version=5.0.0, commit=9cf4c9f648, pid=8960
[2026/01/23 14:32:16.796631000] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2026/01/23 14:32:16.796919000] [ info] [storage] ver=1.4.0, type=memory+filesystem, sync=normal, checksum=off, max_chunks_up=128
[2026/01/23 14:32:16.796927000] [ info] [storage] backlog input plugin: storage_backlog.1
[2026/01/23 14:32:16.796934000] [ info] [simd    ] NEON
[2026/01/23 14:32:16.796940000] [ info] [cmetrics] version=1.0.6
[2026/01/23 14:32:16.796948000] [ info] [ctraces ] version=0.6.6
[2026/01/23 14:32:16.797059000] [ info] [input:dummy:dummy.0] initializing
[2026/01/23 14:32:16.797065000] [ info] [input:dummy:dummy.0] storage_strategy='filesystem' (memory + filesystem)
[2026/01/23 14:32:16.797074000] [debug] [dummy:dummy.0] created event channels: read=30 write=31
[2026/01/23 14:32:16.797176000] [ info] [input:storage_backlog:storage_backlog.1] initializing
[2026/01/23 14:32:16.797182000] [ info] [input:storage_backlog:storage_backlog.1] storage_strategy='memory' (memory only)
[2026/01/23 14:32:16.797191000] [debug] [storage_backlog:storage_backlog.1] created event channels: read=32 write=33
[2026/01/23 14:32:16.797209000] [ info] [input:storage_backlog:storage_backlog.1] queue memory limit: 95.4M
[2026/01/23 14:32:16.797220000] [debug] [http:http.0] created event channels: read=34 write=35
[2026/01/23 14:32:16.797389000] [debug] [stdout:stdout.1] created event channels: read=48 write=49
[2026/01/23 14:32:16.797459000] [ info] [output:http:http.0] worker #0 started
[2026/01/23 14:32:16.797473000] [ info] [output:http:http.0] worker #1 started
[2026/01/23 14:32:16.797508000] [debug] [router] match rule dummy.0:http.0
[2026/01/23 14:32:16.797515000] [debug] [router] match rule dummy.0:stdout.1
[2026/01/23 14:32:16.797526000] [debug] [router] match rule storage_backlog.1:http.0
[2026/01/23 14:32:16.797534000] [debug] [router] match rule storage_backlog.1:stdout.1
[2026/01/23 14:32:16.797536000] [ info] [output:stdout:stdout.1] worker #0 started
[2026/01/23 14:32:16.797552000] [ info] [sp] stream processor started
[2026/01/23 14:32:16.797607000] [debug] [storage backlog] skipping DLQ stream 'rejected'
[2026/01/23 14:32:16.797625000] [ info] [input:storage_backlog:storage_backlog.1] register dummy.0/8897-1769146307.250503000.flb
[2026/01/23 14:32:16.797651000] [ info] [input:storage_backlog:storage_backlog.1] register dummy.0/8897-1769146308.251033000.flb
[2026/01/23 14:32:16.797666000] [ info] [engine] Shutdown Grace Period=5, Shutdown Input Grace Period=2
[2026/01/23 14:32:17.804440000] [ info] [input:storage_backlog:storage_backlog.1] queueing dummy.0:8897-1769146307.250503000.flb
[2026/01/23 14:32:17.804874000] [ info] [input:storage_backlog:storage_backlog.1] queueing dummy.0:8897-1769146308.251033000.flb

^C[2026/01/23 14:32:18] [engine] caught signal (SIGINT)
[2026/01/23 14:32:18.575121000] [ info] [input] pausing dummy.0
[2026/01/23 14:32:18.575181000] [ info] [input] pausing storage_backlog.1

Now quarantined DLQs are preserved:

% ls ./storage/rejected
test_0_http.0_0x60800000d620.flb test_0_http.0_0x60800000d720.flb test_0_http.0_0x60800000d7a0.flb
  • Attached Valgrind output that shows no leaks or memory corruption was found

With macOS's leaks, there's no leakages found:

Process 19936 is not debuggable. Due to security restrictions, leaks can only show or save contents of readonly memory of restricted processes.

Process:         fluent-bit [19936]
Path:            /Users/USER/*/fluent-bit
Load Address:    0x102160000
Identifier:      fluent-bit
Version:         0
Code Type:       ARM64
Platform:        macOS
Parent Process:  leaks [19935]
Target Type:     live task

Date/Time:       2026-01-23 14:38:49.659 +0900
Launch Time:     2026-01-23 14:38:45.458 +0900
OS Version:      macOS 26.2 (25C56)
Report Version:  7
Analysis Tool:   /usr/bin/leaks

Physical footprint:         8000K
Physical footprint (peak):  8112K
Idle exit:                  untracked
----

leaks Report Version: 4.0, multi-line stacks
Process 19936: 1298 nodes malloced for 200 KB
Process 19936: 0 leaks for 0 total leaked bytes.

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes
    • Improved handling of rejected and dead-letter queue streams to ensure they are properly skipped during processing operations.

✏️ Tip: You can customize this high-level summary in your review settings.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
@coderabbitai
Copy link

coderabbitai bot commented Jan 23, 2026

📝 Walkthrough

Walkthrough

Introduces a new inline helper function sb_is_rejected_stream() to identify DLQ/rejected streams based on storage configuration, then adds a guard condition in sb_segregate_chunks() to skip processing those rejected streams.

Changes

Cohort / File(s) Summary
DLQ Stream Skip Guard
plugins/in_storage_backlog/sb.c
Added inline helper sb_is_rejected_stream(config, stream) that checks config.storage_keep_rejected and config.storage_rejected_path (defaults to "rejected"). Integrated guard in sb_segregate_chunks loop to skip DLQ/rejected streams with debug logging.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

Possibly related PRs

Suggested reviewers

  • edsiper
  • fujimotos
  • koleini

Poem

🐰 A rabbit hops through streams with care,
Rejected paths? We skip them there!
With guards in place and config bright,
The backlog flows, now clean and right! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'in_storage_backlog: Preserve DLQ on restarting' directly and clearly describes the main change: preserving dead-letter queue files when Fluent Bit restarts, which aligns with the changeset's core objective.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@lecaros
Copy link
Contributor

lecaros commented Jan 23, 2026

confirmed this is no longer adding rejected chunk files to the backlog queue. Thanks @cosmo0920!
cc @edsiper

@patrick-stephens
Copy link
Collaborator

patrick-stephens commented Jan 23, 2026

Do we have rejected chunks in the older series @cosmo0920 ?

EDIT: No, added in 4.2.0: 3d6f0bc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants