Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion CHUNKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The following is the layout of a Chunk in the file system:
| 4 BYTES CRC32 + 16 BYTES +--> CRC32(Content) + Padding
+-------------------------------+
| Content |
| +-------------------------+ |
| +------------------------+ |
| | 2 BYTES +-----> Metadata Length
| +-------------------------+ |
| +-------------------------+ |
Expand Down Expand Up @@ -91,6 +91,112 @@ Content Data < | | records | |
Fluent Bit API provides backward compatibility with the previous metadata and content
format found on series v1.8.

Starting with the Fluent Bit release that introduces direct route persistence, the
fourth metadata byte now carries feature flags. A zero value preserves the legacy
layout, while a non-zero value indicates that additional structures follow the tag.
When the ``FLB_CHUNK_FLAG_DIRECT_ROUTES`` bit is set the tag is terminated with a
single ``\0`` byte and a routing payload is appended. Fluent Bit v4.2 and later
also set the ``FLB_CHUNK_FLAG_DIRECT_ROUTE_LABELS`` bit to store each destination's
alias (or generated name) alongside its numeric identifier so routes can survive
configuration changes that renumber outputs. If any stored identifier exceeds
65535 the ``FLB_CHUNK_FLAG_DIRECT_ROUTE_WIDE_IDS`` bit is enabled and each ID is
encoded using four bytes so large configurations remain routable after a restart.
When plugin names are stored, the ``FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS`` bit is
set to enable type-safe routing by matching plugin names:

```
+--------------------------+-------+
| 0xF1 | 0x77 | <- Magic Bytes
+--------------------------+-------+
| Type | Flags | <- Chunk type and flag bits
+--------------------------+--------------------------+
| Tag string (no size prefix) | <- Tag associated to records
+-----------------------------------------------------+
| 0x00 (Tag terminator) | <- Present when flags != 0
Routing Payload Start ----+-----------------------------------------------------+
| Routing Length (uint16_t big endian) | <- Total size of routing
| | payload (excluding this
| | 2-byte field)
+-----------------------------------------------------+
| Route Count (uint16_t big endian) | <- Number of output
| | destinations stored
+-----------------------------------------------------+
| Output IDs (route_count entries) | <- Each stored as uint16_t
| | (big endian) or uint32_t
| | when FLB_CHUNK_FLAG_
| | DIRECT_ROUTE_WIDE_IDS
+-----------------------------------------------------+
| Label Lengths (route_count entries) | <- Present when FLB_CHUNK_
| | FLAG_DIRECT_ROUTE_LABELS
| | Each uint16_t big endian
| | with bit 15 encoding alias
| | flag (0x8000) and bits
| | 0-14 the length (0x7FFF)
+-----------------------------------------------------+
| Label Strings (concatenated, no null) | <- Present when FLB_CHUNK_
| | FLAG_DIRECT_ROUTE_LABELS
| | Variable length
+-----------------------------------------------------+
| Plugin Name Lengths (route_count entries) | <- Present when FLB_CHUNK_
| | FLAG_DIRECT_ROUTE_PLUGIN_IDS
| | Each uint16_t big endian
+-----------------------------------------------------+
| Plugin Name Strings (concatenated, no null) | <- Present when FLB_CHUNK_
| | FLAG_DIRECT_ROUTE_PLUGIN_IDS
| | Variable length
| |
Routing Payload End -----+-----------------------------------------------------+
```

The routing payload captures the direct route mapping so that filesystem chunks
loaded by the storage backlog re-use the same outputs after a restart. Chunks
without direct routes keep the legacy layout (flags byte set to zero) and remain
fully backwards compatible across Fluent Bit versions. When labels are stored the
reader first reconstructs routes by matching aliases or numbered names and only
falls back to numeric identifiers if the textual metadata cannot be matched. This
ensures that chunks continue to flow to the intended destinations even when the
output configuration is re-ordered.

**Routing Payload Structure**: The routing payload begins immediately after the tag
terminator and extends for the number of bytes specified by the Routing Length
field. The Routing Length field stores the total size of all routing data (excluding
the 2-byte Routing Length field itself), including the Route Count field, all
Output IDs, Label Lengths (if present), Label Bytes (if present), Plugin Lengths
(if present), and Plugin Bytes (if present). The Route Count field indicates how
many output destinations are encoded in the routing payload. Each route entry
consists of one Output ID, optionally followed by one Label Length entry and its
corresponding Label Bytes, and optionally followed by one Plugin Length entry and
its corresponding Plugin Bytes. All routes are stored sequentially, with arrays
of lengths preceding their corresponding string data blocks.

**Labels**: Labels are textual identifiers used to match output instances when
restoring routes from chunk metadata. They provide a stable way to identify
outputs that survives configuration changes, unlike numeric IDs which can be
reassigned when outputs are reordered. Labels come in two forms: aliases and
generated names. An alias is a user-provided identifier set via the ``Alias``
configuration property, explicitly chosen by the user to identify a specific
output instance. A generated name is automatically created when no alias is
provided, following the pattern ``{plugin_name}.{sequence_number}`` (e.g.,
``stdout.0``, ``stdout.1``, ``http.0``). The system stores the alias if one
exists, otherwise falls back to the generated name. When restoring routes, the
reader first attempts to match stored labels against current output aliases,
then against current generated names, and only falls back to numeric ID matching
if no label was stored. This label-based matching ensures that chunks continue
routing to the correct outputs even when output IDs change due to configuration
reordering, making the routing resilient to configuration changes.

**Label Length Encoding**: When labels are present, each label length is stored as
a 16-bit big-endian value with the most significant bit (0x8000) encoding whether
the label represents an alias (1) or a generated name (0). The actual length is
encoded in the lower 15 bits (0x7FFF). This allows the reader to distinguish
between user-provided aliases and auto-generated names when matching routes.

**Plugin Names**: When plugin names are stored (``FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS``),
each route includes the plugin type name (e.g., "stdout", "http") to enable type-safe
matching. This prevents routing to outputs of different plugin types that might share
the same alias or name. Plugin name lengths are stored as 16-bit big-endian values
followed by the concatenated plugin name strings without null terminators.

### Fluent Bit <= v1.8

Up to Fluent Bit <= 1.8.x, the metadata and content data is simple, where metadata
Expand Down
37 changes: 37 additions & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_routes_mask.h>
#include <stdint.h>

struct cio_chunk;

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand All @@ -42,6 +45,26 @@
/* Number of bytes reserved for Metadata Header on Chunks */
#define FLB_INPUT_CHUNK_META_HEADER 4

/* Chunk metadata flags */
#define FLB_CHUNK_FLAG_DIRECT_ROUTES (1 << 0)
#define FLB_CHUNK_FLAG_DIRECT_ROUTE_LABELS (1 << 1)
#define FLB_CHUNK_FLAG_DIRECT_ROUTE_WIDE_IDS (1 << 2)
#define FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS (1 << 3)

#define FLB_CHUNK_DIRECT_ROUTE_LABEL_ALIAS_FLAG 0x8000
#define FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK 0x7FFF

struct flb_output_instance;

struct flb_chunk_direct_route {
uint32_t id;
uint16_t label_length;
const char *label;
uint8_t label_is_alias;
uint16_t plugin_name_length;
const char *plugin_name;
};

/* Chunks magic bytes (starting from Fluent Bit v1.8.10) */
#define FLB_INPUT_CHUNK_MAGIC_BYTE_0 (unsigned char) 0xF1
#define FLB_INPUT_CHUNK_MAGIC_BYTE_1 (unsigned char) 0x77
Expand Down Expand Up @@ -110,6 +133,20 @@ int flb_input_chunk_get_event_type(struct flb_input_chunk *ic);
int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
const char **tag_buf, int *tag_len);

int flb_input_chunk_write_header_v2(struct cio_chunk *chunk,
int event_type,
char *tag, int tag_len,
const struct flb_chunk_direct_route *routes,
int route_count);
int flb_chunk_route_plugin_matches(struct flb_output_instance *o_ins,
const struct flb_chunk_direct_route *route);
int flb_input_chunk_has_direct_routes(struct flb_input_chunk *ic);
int flb_input_chunk_get_direct_routes(struct flb_input_chunk *ic,
struct flb_chunk_direct_route **routes,
int *route_count);
void flb_input_chunk_destroy_direct_routes(struct flb_chunk_direct_route *routes,
int route_count);

void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins);
void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data);
ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic);
Expand Down
13 changes: 9 additions & 4 deletions lib/chunkio/.github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ jobs:
max-parallel: 48
fail-fast: false
matrix:
os: [windows-latest, windows-2019]
os: [windows-2025, windows-2022]
steps:
- uses: actions/checkout@v2
- name: Build on ${{ matrix.os }} with vs-2019
- name: Set up with Developer Command Prompt for Microsoft Visual C++
uses: ilammy/msvc-dev-cmd@v1
with:
arch: amd64
- name: Build on ${{ matrix.os }} with MSVC
run: |
.\scripts\win_build.bat
cmake -G "NMake Makefiles" -DCIO_TESTS=On .
cmake --build .
- name: Run unit tests.
run: |
ctest --rerun-failed --output-on-failure -C Debug --test-dir .
ctest . --rerun-failed --output-on-failure --test-dir .
build-unix:
name: Build sources on amd64 for ${{ matrix.os }} - ${{ matrix.compiler }}
runs-on: ${{ matrix.os }}
Expand Down
2 changes: 1 addition & 1 deletion lib/chunkio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ project(chunk-io C)

set(CIO_VERSION_MAJOR 1)
set(CIO_VERSION_MINOR 5)
set(CIO_VERSION_PATCH 3)
set(CIO_VERSION_PATCH 4)
set(CIO_VERSION_STR "${CIO_VERSION_MAJOR}.${CIO_VERSION_MINOR}.${CIO_VERSION_PATCH}")

# CFLAGS
Expand Down
3 changes: 3 additions & 0 deletions lib/chunkio/include/chunkio/cio_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct cio_file {
size_t realloc_size; /* chunk size to increase alloc */
char *path; /* root path + stream */
char *map; /* map of data */
struct cio_ctx *ctx; /* owning context */
#ifdef _WIN32
HANDLE backing_file;
HANDLE backing_mapping;
Expand All @@ -49,6 +50,8 @@ struct cio_file {
char *st_content;
crc_t crc_cur; /* crc: current value calculated */
int crc_reset; /* crc: must recalculate from the beginning ? */
int auto_remap_warned; /* has sync auto-remap warning been emitted? */
int map_truncated_warned; /* has RO truncation warning been emitted? */
};

size_t cio_file_real_size(struct cio_file *cf);
Expand Down
61 changes: 48 additions & 13 deletions lib/chunkio/src/cio_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ static int munmap_file(struct cio_ctx *ctx, struct cio_chunk *ch)
}

/* Unmap file */
cio_file_native_unmap(cf);
ret = cio_file_native_unmap(cf);
if (ret != CIO_OK) {
return -1;
}

cf->data_size = 0;
cf->alloc_size = 0;
Expand All @@ -343,6 +346,7 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size)
{
ssize_t content_size;
size_t fs_size;
size_t requested_map_size;
int ret;
struct cio_file *cf;

Expand Down Expand Up @@ -413,6 +417,7 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size)
cf->alloc_size = size;

/* Map the file */
requested_map_size = cf->alloc_size;
ret = cio_file_native_map(cf, cf->alloc_size);

if (ret != CIO_OK) {
Expand All @@ -421,6 +426,21 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size)
return CIO_ERROR;
}

if ((cf->flags & CIO_OPEN_RD) && requested_map_size != cf->alloc_size) {
if (cf->map_truncated_warned == CIO_FALSE) {
cio_log_warn(ctx,
"[cio file] truncated read-only map from %zu to %zu bytes: %s/%s",
requested_map_size,
cf->alloc_size,
ch->st->name,
ch->name);
cf->map_truncated_warned = CIO_TRUE;
}
}
else {
cf->map_truncated_warned = CIO_FALSE;
}

/* check content data size */
if (fs_size > 0) {
content_size = cio_file_st_get_content_len(cf->map,
Expand Down Expand Up @@ -664,6 +684,9 @@ struct cio_file *cio_file_open(struct cio_ctx *ctx,
cf->crc_cur = cio_crc32_init();
cf->path = path;
cf->map = NULL;
cf->ctx = ctx;
cf->auto_remap_warned = CIO_FALSE;
cf->map_truncated_warned = CIO_FALSE;
ch->backend = cf;

#ifdef _WIN32
Expand Down Expand Up @@ -801,9 +824,9 @@ static int _cio_file_up(struct cio_chunk *ch, int enforced)
return CIO_ERROR;
}

if (cf->fd > 0) {
if (cio_file_native_is_open(cf)) {
cio_log_error(ch->ctx, "[cio file] file descriptor already exists: "
"[fd=%i] %s:%s", cf->fd, ch->st->name, ch->name);
"%s:%s", ch->st->name, ch->name);
return CIO_ERROR;
}

Expand Down Expand Up @@ -908,7 +931,11 @@ int cio_file_down(struct cio_chunk *ch)
}

/* unmap memory */
munmap_file(ch->ctx, ch);
ret = munmap_file(ch->ctx, ch);

if (ret != 0) {
return -1;
}

/* Allocated map size is zero */
cf->alloc_size = 0;
Expand All @@ -921,7 +948,12 @@ int cio_file_down(struct cio_chunk *ch)
}

/* Close file descriptor */
cio_file_native_close(cf);
ret = cio_file_native_close(cf);

if (ret != CIO_OK) {
cio_errno();
return -1;
}

return 0;
}
Expand Down Expand Up @@ -1047,7 +1079,6 @@ int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size)
char *cur_content_data;
char *new_content_data;
size_t new_size;
size_t content_av;
size_t meta_av;
struct cio_file *cf;

Expand Down Expand Up @@ -1082,13 +1113,11 @@ int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size)
* where we need to increase the memory map size, move the content area
* bytes to a different position and write the metadata.
*
* Calculate the available space in the content area.
* Check if resize is needed before calculating content_av to avoid
* unsigned underflow. We need: header + new_metadata + content_data <= alloc_size
*/
content_av = cf->alloc_size - cf->data_size;

/* If there is no enough space, increase the file size and it memory map */
if (content_av < size) {
new_size = (size - meta_av) + cf->data_size + CIO_FILE_HEADER_MIN;
if (cf->alloc_size < CIO_FILE_HEADER_MIN + size + cf->data_size) {
new_size = CIO_FILE_HEADER_MIN + size + cf->data_size;

ret = cio_file_resize(cf, new_size);

Expand All @@ -1106,7 +1135,7 @@ int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size)
/* set new position for the content data */
cur_content_data = cio_file_st_get_content(cf->map);
new_content_data = meta + size;
memmove(new_content_data, cur_content_data, size);
memmove(new_content_data, cur_content_data, cf->data_size);

/* copy new metadata */
memcpy(meta, buf, size);
Expand Down Expand Up @@ -1138,6 +1167,12 @@ int cio_file_sync(struct cio_chunk *ch)
return 0;
}

/* If chunk is down (unmapped), there's nothing to sync */
/* You can only write to a chunk when it's up, so if it's down, no pending changes exist */
if (!cio_file_native_is_mapped(cf)) {
return 0;
}

if (cf->synced == CIO_TRUE) {
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions lib/chunkio/src/cio_file_unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ int cio_file_native_unmap(struct cio_file *cf)

cf->alloc_size = 0;
cf->map = NULL;
cf->map_truncated_warned = CIO_FALSE;

return CIO_OK;
}
Expand Down
Loading
Loading