diff --git a/CHUNKS.md b/CHUNKS.md index 852cc8e6345..073bf1ca5f3 100644 --- a/CHUNKS.md +++ b/CHUNKS.md @@ -40,7 +40,7 @@ The following is the layout of a Chunk in the file system: | 4 BYTES CRC32 + 16 BYTES +--> CRC32(Content) + Padding +-------------------------------+ | Content | -| +-------------------------+ | +| +------------------------+ | | | 2 BYTES +-----> Metadata Length | +-------------------------+ | | +-------------------------+ | @@ -91,6 +91,112 @@ Content Data < | | records | | Fluent Bit API provides backward compatibility with the previous metadata and content format found on series v1.8. +Starting with the Fluent Bit release that introduces direct route persistence, the +fourth metadata byte now carries feature flags. A zero value preserves the legacy +layout, while a non-zero value indicates that additional structures follow the tag. +When the ``FLB_CHUNK_FLAG_DIRECT_ROUTES`` bit is set the tag is terminated with a +single ``\0`` byte and a routing payload is appended. Fluent Bit v4.2 and later +also set the ``FLB_CHUNK_FLAG_DIRECT_ROUTE_LABELS`` bit to store each destination's +alias (or generated name) alongside its numeric identifier so routes can survive +configuration changes that renumber outputs. If any stored identifier exceeds +65535 the ``FLB_CHUNK_FLAG_DIRECT_ROUTE_WIDE_IDS`` bit is enabled and each ID is +encoded using four bytes so large configurations remain routable after a restart. +When plugin names are stored, the ``FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS`` bit is +set to enable type-safe routing by matching plugin names: + +``` + +--------------------------+-------+ + | 0xF1 | 0x77 | <- Magic Bytes + +--------------------------+-------+ + | Type | Flags | <- Chunk type and flag bits + +--------------------------+--------------------------+ + | Tag string (no size prefix) | <- Tag associated to records + +-----------------------------------------------------+ + | 0x00 (Tag terminator) | <- Present when flags != 0 +Routing Payload Start ----+-----------------------------------------------------+ + | Routing Length (uint16_t big endian) | <- Total size of routing + | | payload (excluding this + | | 2-byte field) + +-----------------------------------------------------+ + | Route Count (uint16_t big endian) | <- Number of output + | | destinations stored + +-----------------------------------------------------+ + | Output IDs (route_count entries) | <- Each stored as uint16_t + | | (big endian) or uint32_t + | | when FLB_CHUNK_FLAG_ + | | DIRECT_ROUTE_WIDE_IDS + +-----------------------------------------------------+ + | Label Lengths (route_count entries) | <- Present when FLB_CHUNK_ + | | FLAG_DIRECT_ROUTE_LABELS + | | Each uint16_t big endian + | | with bit 15 encoding alias + | | flag (0x8000) and bits + | | 0-14 the length (0x7FFF) + +-----------------------------------------------------+ + | Label Strings (concatenated, no null) | <- Present when FLB_CHUNK_ + | | FLAG_DIRECT_ROUTE_LABELS + | | Variable length + +-----------------------------------------------------+ + | Plugin Name Lengths (route_count entries) | <- Present when FLB_CHUNK_ + | | FLAG_DIRECT_ROUTE_PLUGIN_IDS + | | Each uint16_t big endian + +-----------------------------------------------------+ + | Plugin Name Strings (concatenated, no null) | <- Present when FLB_CHUNK_ + | | FLAG_DIRECT_ROUTE_PLUGIN_IDS + | | Variable length + | | +Routing Payload End -----+-----------------------------------------------------+ +``` + +The routing payload captures the direct route mapping so that filesystem chunks +loaded by the storage backlog re-use the same outputs after a restart. Chunks +without direct routes keep the legacy layout (flags byte set to zero) and remain +fully backwards compatible across Fluent Bit versions. When labels are stored the +reader first reconstructs routes by matching aliases or numbered names and only +falls back to numeric identifiers if the textual metadata cannot be matched. This +ensures that chunks continue to flow to the intended destinations even when the +output configuration is re-ordered. + +**Routing Payload Structure**: The routing payload begins immediately after the tag +terminator and extends for the number of bytes specified by the Routing Length +field. The Routing Length field stores the total size of all routing data (excluding +the 2-byte Routing Length field itself), including the Route Count field, all +Output IDs, Label Lengths (if present), Label Bytes (if present), Plugin Lengths +(if present), and Plugin Bytes (if present). The Route Count field indicates how +many output destinations are encoded in the routing payload. Each route entry +consists of one Output ID, optionally followed by one Label Length entry and its +corresponding Label Bytes, and optionally followed by one Plugin Length entry and +its corresponding Plugin Bytes. All routes are stored sequentially, with arrays +of lengths preceding their corresponding string data blocks. + +**Labels**: Labels are textual identifiers used to match output instances when +restoring routes from chunk metadata. They provide a stable way to identify +outputs that survives configuration changes, unlike numeric IDs which can be +reassigned when outputs are reordered. Labels come in two forms: aliases and +generated names. An alias is a user-provided identifier set via the ``Alias`` +configuration property, explicitly chosen by the user to identify a specific +output instance. A generated name is automatically created when no alias is +provided, following the pattern ``{plugin_name}.{sequence_number}`` (e.g., +``stdout.0``, ``stdout.1``, ``http.0``). The system stores the alias if one +exists, otherwise falls back to the generated name. When restoring routes, the +reader first attempts to match stored labels against current output aliases, +then against current generated names, and only falls back to numeric ID matching +if no label was stored. This label-based matching ensures that chunks continue +routing to the correct outputs even when output IDs change due to configuration +reordering, making the routing resilient to configuration changes. + +**Label Length Encoding**: When labels are present, each label length is stored as +a 16-bit big-endian value with the most significant bit (0x8000) encoding whether +the label represents an alias (1) or a generated name (0). The actual length is +encoded in the lower 15 bits (0x7FFF). This allows the reader to distinguish +between user-provided aliases and auto-generated names when matching routes. + +**Plugin Names**: When plugin names are stored (``FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS``), +each route includes the plugin type name (e.g., "stdout", "http") to enable type-safe +matching. This prevents routing to outputs of different plugin types that might share +the same alias or name. Plugin name lengths are stored as 16-bit big-endian values +followed by the concatenated plugin name strings without null terminators. + ### Fluent Bit <= v1.8 Up to Fluent Bit <= 1.8.x, the metadata and content data is simple, where metadata diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index b9b55abe28d..21b6b76826e 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -24,6 +24,9 @@ #include #include #include +#include + +struct cio_chunk; #include #include @@ -42,6 +45,26 @@ /* Number of bytes reserved for Metadata Header on Chunks */ #define FLB_INPUT_CHUNK_META_HEADER 4 +/* Chunk metadata flags */ +#define FLB_CHUNK_FLAG_DIRECT_ROUTES (1 << 0) +#define FLB_CHUNK_FLAG_DIRECT_ROUTE_LABELS (1 << 1) +#define FLB_CHUNK_FLAG_DIRECT_ROUTE_WIDE_IDS (1 << 2) +#define FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS (1 << 3) + +#define FLB_CHUNK_DIRECT_ROUTE_LABEL_ALIAS_FLAG 0x8000 +#define FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK 0x7FFF + +struct flb_output_instance; + +struct flb_chunk_direct_route { + uint32_t id; + uint16_t label_length; + const char *label; + uint8_t label_is_alias; + uint16_t plugin_name_length; + const char *plugin_name; +}; + /* Chunks magic bytes (starting from Fluent Bit v1.8.10) */ #define FLB_INPUT_CHUNK_MAGIC_BYTE_0 (unsigned char) 0xF1 #define FLB_INPUT_CHUNK_MAGIC_BYTE_1 (unsigned char) 0x77 @@ -110,6 +133,20 @@ int flb_input_chunk_get_event_type(struct flb_input_chunk *ic); int flb_input_chunk_get_tag(struct flb_input_chunk *ic, const char **tag_buf, int *tag_len); +int flb_input_chunk_write_header_v2(struct cio_chunk *chunk, + int event_type, + char *tag, int tag_len, + const struct flb_chunk_direct_route *routes, + int route_count); +int flb_chunk_route_plugin_matches(struct flb_output_instance *o_ins, + const struct flb_chunk_direct_route *route); +int flb_input_chunk_has_direct_routes(struct flb_input_chunk *ic); +int flb_input_chunk_get_direct_routes(struct flb_input_chunk *ic, + struct flb_chunk_direct_route **routes, + int *route_count); +void flb_input_chunk_destroy_direct_routes(struct flb_chunk_direct_route *routes, + int route_count); + void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins); void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data); ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic); diff --git a/lib/chunkio/.github/workflows/ci.yaml b/lib/chunkio/.github/workflows/ci.yaml index c937a2c9083..a816202e7af 100644 --- a/lib/chunkio/.github/workflows/ci.yaml +++ b/lib/chunkio/.github/workflows/ci.yaml @@ -15,15 +15,20 @@ jobs: max-parallel: 48 fail-fast: false matrix: - os: [windows-latest, windows-2019] + os: [windows-2025, windows-2022] steps: - uses: actions/checkout@v2 - - name: Build on ${{ matrix.os }} with vs-2019 + - name: Set up with Developer Command Prompt for Microsoft Visual C++ + uses: ilammy/msvc-dev-cmd@v1 + with: + arch: amd64 + - name: Build on ${{ matrix.os }} with MSVC run: | - .\scripts\win_build.bat + cmake -G "NMake Makefiles" -DCIO_TESTS=On . + cmake --build . - name: Run unit tests. run: | - ctest --rerun-failed --output-on-failure -C Debug --test-dir . + ctest . --rerun-failed --output-on-failure --test-dir . build-unix: name: Build sources on amd64 for ${{ matrix.os }} - ${{ matrix.compiler }} runs-on: ${{ matrix.os }} diff --git a/lib/chunkio/CMakeLists.txt b/lib/chunkio/CMakeLists.txt index 21f1c3e47fe..5a7f1c60fe7 100644 --- a/lib/chunkio/CMakeLists.txt +++ b/lib/chunkio/CMakeLists.txt @@ -3,7 +3,7 @@ project(chunk-io C) set(CIO_VERSION_MAJOR 1) set(CIO_VERSION_MINOR 5) -set(CIO_VERSION_PATCH 3) +set(CIO_VERSION_PATCH 4) set(CIO_VERSION_STR "${CIO_VERSION_MAJOR}.${CIO_VERSION_MINOR}.${CIO_VERSION_PATCH}") # CFLAGS diff --git a/lib/chunkio/include/chunkio/cio_file.h b/lib/chunkio/include/chunkio/cio_file.h index 3bc6591710c..f05d3861b96 100644 --- a/lib/chunkio/include/chunkio/cio_file.h +++ b/lib/chunkio/include/chunkio/cio_file.h @@ -40,6 +40,7 @@ struct cio_file { size_t realloc_size; /* chunk size to increase alloc */ char *path; /* root path + stream */ char *map; /* map of data */ + struct cio_ctx *ctx; /* owning context */ #ifdef _WIN32 HANDLE backing_file; HANDLE backing_mapping; @@ -49,6 +50,8 @@ struct cio_file { char *st_content; crc_t crc_cur; /* crc: current value calculated */ int crc_reset; /* crc: must recalculate from the beginning ? */ + int auto_remap_warned; /* has sync auto-remap warning been emitted? */ + int map_truncated_warned; /* has RO truncation warning been emitted? */ }; size_t cio_file_real_size(struct cio_file *cf); diff --git a/lib/chunkio/src/cio_file.c b/lib/chunkio/src/cio_file.c index 56ac160a067..26dc992efed 100644 --- a/lib/chunkio/src/cio_file.c +++ b/lib/chunkio/src/cio_file.c @@ -324,7 +324,10 @@ static int munmap_file(struct cio_ctx *ctx, struct cio_chunk *ch) } /* Unmap file */ - cio_file_native_unmap(cf); + ret = cio_file_native_unmap(cf); + if (ret != CIO_OK) { + return -1; + } cf->data_size = 0; cf->alloc_size = 0; @@ -343,6 +346,7 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size) { ssize_t content_size; size_t fs_size; + size_t requested_map_size; int ret; struct cio_file *cf; @@ -413,6 +417,7 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size) cf->alloc_size = size; /* Map the file */ + requested_map_size = cf->alloc_size; ret = cio_file_native_map(cf, cf->alloc_size); if (ret != CIO_OK) { @@ -421,6 +426,21 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size) return CIO_ERROR; } + if ((cf->flags & CIO_OPEN_RD) && requested_map_size != cf->alloc_size) { + if (cf->map_truncated_warned == CIO_FALSE) { + cio_log_warn(ctx, + "[cio file] truncated read-only map from %zu to %zu bytes: %s/%s", + requested_map_size, + cf->alloc_size, + ch->st->name, + ch->name); + cf->map_truncated_warned = CIO_TRUE; + } + } + else { + cf->map_truncated_warned = CIO_FALSE; + } + /* check content data size */ if (fs_size > 0) { content_size = cio_file_st_get_content_len(cf->map, @@ -664,6 +684,9 @@ struct cio_file *cio_file_open(struct cio_ctx *ctx, cf->crc_cur = cio_crc32_init(); cf->path = path; cf->map = NULL; + cf->ctx = ctx; + cf->auto_remap_warned = CIO_FALSE; + cf->map_truncated_warned = CIO_FALSE; ch->backend = cf; #ifdef _WIN32 @@ -801,9 +824,9 @@ static int _cio_file_up(struct cio_chunk *ch, int enforced) return CIO_ERROR; } - if (cf->fd > 0) { + if (cio_file_native_is_open(cf)) { cio_log_error(ch->ctx, "[cio file] file descriptor already exists: " - "[fd=%i] %s:%s", cf->fd, ch->st->name, ch->name); + "%s:%s", ch->st->name, ch->name); return CIO_ERROR; } @@ -908,7 +931,11 @@ int cio_file_down(struct cio_chunk *ch) } /* unmap memory */ - munmap_file(ch->ctx, ch); + ret = munmap_file(ch->ctx, ch); + + if (ret != 0) { + return -1; + } /* Allocated map size is zero */ cf->alloc_size = 0; @@ -921,7 +948,12 @@ int cio_file_down(struct cio_chunk *ch) } /* Close file descriptor */ - cio_file_native_close(cf); + ret = cio_file_native_close(cf); + + if (ret != CIO_OK) { + cio_errno(); + return -1; + } return 0; } @@ -1047,7 +1079,6 @@ int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size) char *cur_content_data; char *new_content_data; size_t new_size; - size_t content_av; size_t meta_av; struct cio_file *cf; @@ -1082,13 +1113,11 @@ int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size) * where we need to increase the memory map size, move the content area * bytes to a different position and write the metadata. * - * Calculate the available space in the content area. + * Check if resize is needed before calculating content_av to avoid + * unsigned underflow. We need: header + new_metadata + content_data <= alloc_size */ - content_av = cf->alloc_size - cf->data_size; - - /* If there is no enough space, increase the file size and it memory map */ - if (content_av < size) { - new_size = (size - meta_av) + cf->data_size + CIO_FILE_HEADER_MIN; + if (cf->alloc_size < CIO_FILE_HEADER_MIN + size + cf->data_size) { + new_size = CIO_FILE_HEADER_MIN + size + cf->data_size; ret = cio_file_resize(cf, new_size); @@ -1106,7 +1135,7 @@ int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size) /* set new position for the content data */ cur_content_data = cio_file_st_get_content(cf->map); new_content_data = meta + size; - memmove(new_content_data, cur_content_data, size); + memmove(new_content_data, cur_content_data, cf->data_size); /* copy new metadata */ memcpy(meta, buf, size); @@ -1138,6 +1167,12 @@ int cio_file_sync(struct cio_chunk *ch) return 0; } + /* If chunk is down (unmapped), there's nothing to sync */ + /* You can only write to a chunk when it's up, so if it's down, no pending changes exist */ + if (!cio_file_native_is_mapped(cf)) { + return 0; + } + if (cf->synced == CIO_TRUE) { return 0; } diff --git a/lib/chunkio/src/cio_file_unix.c b/lib/chunkio/src/cio_file_unix.c index 72d49312dc1..c1798a7de1a 100644 --- a/lib/chunkio/src/cio_file_unix.c +++ b/lib/chunkio/src/cio_file_unix.c @@ -66,6 +66,7 @@ int cio_file_native_unmap(struct cio_file *cf) cf->alloc_size = 0; cf->map = NULL; + cf->map_truncated_warned = CIO_FALSE; return CIO_OK; } diff --git a/lib/chunkio/src/cio_file_win32.c b/lib/chunkio/src/cio_file_win32.c index 18044c40467..43b7a241c9f 100644 --- a/lib/chunkio/src/cio_file_win32.c +++ b/lib/chunkio/src/cio_file_win32.c @@ -38,14 +38,14 @@ int cio_file_native_unmap(struct cio_file *cf) return CIO_ERROR; } - if (!cio_file_native_is_open(cf)) { - return CIO_OK; - } - + /* Check if already unmapped first */ if (!cio_file_native_is_mapped(cf)) { return CIO_OK; } + /* On Windows, we can unmap even if file handle is closed */ + /* The mapping handle maintains the reference */ + result = UnmapViewOfFile(cf->map); if (result == 0) { @@ -54,11 +54,18 @@ int cio_file_native_unmap(struct cio_file *cf) return CIO_ERROR; } - CloseHandle(cf->backing_mapping); + result = CloseHandle(cf->backing_mapping); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } cf->backing_mapping = INVALID_HANDLE_VALUE; cf->alloc_size = 0; cf->map = NULL; + cf->map_truncated_warned = CIO_FALSE; return CIO_OK; } @@ -67,6 +74,9 @@ int cio_file_native_map(struct cio_file *cf, size_t map_size) { DWORD desired_protection; DWORD desired_access; + size_t file_size; + size_t actual_map_size; + int ret; if (cf == NULL) { return CIO_ERROR; @@ -92,9 +102,40 @@ int cio_file_native_map(struct cio_file *cf, size_t map_size) return CIO_ERROR; } + /* Get current file size to ensure we don't map beyond it for read-only files */ + ret = cio_file_native_get_size(cf, &file_size); + if (ret != CIO_OK) { + return CIO_ERROR; + } + + /* For read-only files, we cannot map beyond the file size */ + /* For read-write files, if map_size > file_size, we should resize first */ + if (cf->flags & CIO_OPEN_RD) { + if (map_size > file_size) { + actual_map_size = file_size; + } + else { + actual_map_size = map_size; + } + } + else { + /* For RW files, if map_size > file_size, resize the file first */ + if (map_size > file_size) { + ret = cio_file_native_resize(cf, map_size); + if (ret != CIO_OK) { + return CIO_ERROR; + } + } + actual_map_size = map_size; + } + + /* CreateFileMappingA requires size as two DWORDs (high and low) */ + /* Use actual_map_size to ensure consistency */ cf->backing_mapping = CreateFileMappingA(cf->backing_file, NULL, desired_protection, - 0, 0, NULL); + (DWORD)(actual_map_size >> 32), + (DWORD)(actual_map_size & 0xFFFFFFFFUL), + NULL); if (cf->backing_mapping == NULL) { cio_file_native_report_os_error(); @@ -102,7 +143,7 @@ int cio_file_native_map(struct cio_file *cf, size_t map_size) return CIO_ERROR; } - cf->map = MapViewOfFile(cf->backing_mapping, desired_access, 0, 0, map_size); + cf->map = MapViewOfFile(cf->backing_mapping, desired_access, 0, 0, actual_map_size); if (cf->map == NULL) { cio_file_native_report_os_error(); @@ -114,7 +155,7 @@ int cio_file_native_map(struct cio_file *cf, size_t map_size) return CIO_ERROR; } - cf->alloc_size = map_size; + cf->alloc_size = actual_map_size; return CIO_OK; } @@ -474,6 +515,38 @@ int cio_file_native_delete(struct cio_file *cf) { int result; + if (cf == NULL) { + return CIO_ERROR; + } + + if (cio_file_native_is_mapped(cf)) { + if (cf->ctx != NULL) { + cio_log_warn(cf->ctx, + "[cio file] auto-unmapping chunk prior to delete: %s", + cf->path); + } + + result = cio_file_native_unmap(cf); + + if (result != CIO_OK) { + return result; + } + } + + if (cio_file_native_is_open(cf)) { + if (cf->ctx != NULL) { + cio_log_warn(cf->ctx, + "[cio file] closing handle prior to delete: %s", + cf->path); + } + + result = cio_file_native_close(cf); + + if (result != CIO_OK) { + return result; + } + } + result = DeleteFileA(cf->path); if (result == 0) { @@ -489,6 +562,10 @@ int cio_file_native_sync(struct cio_file *cf, int sync_mode) { int result; + if (!cio_file_native_is_mapped(cf)) { + return CIO_ERROR; + } + result = FlushViewOfFile(cf->map, cf->alloc_size); if (result == 0) { diff --git a/lib/chunkio/tests/CMakeLists.txt b/lib/chunkio/tests/CMakeLists.txt index 7cee7e6c936..a5cea46acb9 100644 --- a/lib/chunkio/tests/CMakeLists.txt +++ b/lib/chunkio/tests/CMakeLists.txt @@ -9,6 +9,15 @@ if(CIO_BACKEND_FILESYSTEM) set(UNIT_TESTS_FILES ${UNIT_TESTS_FILES} fs.c + metadata_update.c + ) +endif() + +# Windows-specific inconsistency tests +if(WIN32 AND CIO_BACKEND_FILESYSTEM) + set(UNIT_TESTS_FILES + ${UNIT_TESTS_FILES} + fs_windows.c ) endif() @@ -35,7 +44,7 @@ foreach(source_file ${UNIT_TESTS_FILES}) add_test(${source_file_we} ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${source_file_we}) endforeach() -# Perf tests for dev purposes: note these tests are not registered, they need to +# Perf tests for dev purposes: note these tests are not registered, they need to # be executed manually set(UNIT_PERF_TESTS fs_perf.c diff --git a/lib/chunkio/tests/fs.c b/lib/chunkio/tests/fs.c index a976f46d103..130921dfa34 100644 --- a/lib/chunkio/tests/fs.c +++ b/lib/chunkio/tests/fs.c @@ -964,6 +964,143 @@ void test_legacy_failure() test_legacy_core(CIO_TRUE); } +/* + * Test case: Prevent unsigned underflow when writing large metadata to a + * chunk with small initial allocation. + * + * This test specifically validates the fix for the bug where calculating + * content_av = alloc_size - CIO_FILE_HEADER_MIN - size could underflow + * when size is large, causing the resize check to be skipped and leading + * to out-of-bounds writes. + * + * Scenario: + * - Create chunk with minimal initial size (e.g., ~100 bytes page-aligned) + * - Write small metadata (10 bytes) and small content (20 bytes) + * - Write large metadata (80 bytes) that would cause underflow: + * old code: content_av = 100 - 24 - 80 = -4 (wraps to huge unsigned) + * - Verify resize happens correctly and no buffer overrun occurs + */ +static void test_metadata_unsigned_underflow() +{ + int ret; + int err; + char *meta_buf; + int meta_len; + void *content_buf; + size_t content_size; + struct cio_ctx *ctx; + struct cio_chunk *chunk; + struct cio_stream *stream; + struct cio_options cio_opts; + + /* Test data */ + const char *small_meta = "small"; + const char *large_meta = "this-is-a-very-large-metadata-string-that-would-cause-unsigned-underflow-in-old-code"; + const char *content_data = "test-content"; + + /* Cleanup any existing test directory */ + cio_utils_recursive_delete(CIO_ENV); + + /* Initialize options */ + cio_options_init(&cio_opts); + cio_opts.root_path = CIO_ENV; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_INFO; + cio_opts.flags = CIO_CHECKSUM; + + /* Create context */ + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + if (!ctx) { + printf("cannot create context\n"); + exit(1); + } + + /* Create stream */ + stream = cio_stream_create(ctx, "test_stream_underflow", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + if (!stream) { + printf("cannot create stream\n"); + cio_destroy(ctx); + exit(1); + } + + /* Create chunk with minimal initial size (forces small alloc_size) */ + chunk = cio_chunk_open(ctx, stream, "test_chunk_underflow", CIO_OPEN, 100, &err); + TEST_CHECK(chunk != NULL); + if (!chunk) { + printf("cannot open chunk\n"); + cio_destroy(ctx); + exit(1); + } + + /* Step 1: Write small initial metadata */ + ret = cio_meta_write(chunk, (char *) small_meta, strlen(small_meta)); + TEST_CHECK(ret == CIO_OK); + + /* Step 2: Write some content data */ + ret = cio_chunk_write(chunk, content_data, strlen(content_data)); + TEST_CHECK(ret == CIO_OK); + + /* Verify initial state */ + ret = cio_meta_read(chunk, &meta_buf, &meta_len); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(meta_len == (int) strlen(small_meta)); + TEST_CHECK(memcmp(meta_buf, small_meta, strlen(small_meta)) == 0); + + ret = cio_chunk_get_content_copy(chunk, &content_buf, &content_size); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(content_size == strlen(content_data)); + TEST_CHECK(memcmp(content_buf, content_data, strlen(content_data)) == 0); + free(content_buf); + + /* Step 3: Write large metadata that would cause underflow in old code + * This is the critical test - the new metadata (80 bytes) is larger than + * what would fit without resize, and with a small initial alloc_size, + * the old calculation would have underflowed. + */ + ret = cio_meta_write(chunk, (char *) large_meta, strlen(large_meta)); + TEST_CHECK(ret == CIO_OK); + + /* Step 4: Verify metadata was written correctly */ + ret = cio_meta_read(chunk, &meta_buf, &meta_len); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(meta_len == (int) strlen(large_meta)); + TEST_CHECK(memcmp(meta_buf, large_meta, strlen(large_meta)) == 0); + + /* Step 5: Verify content data integrity - must be preserved */ + ret = cio_chunk_get_content_copy(chunk, &content_buf, &content_size); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(content_size == strlen(content_data)); + TEST_CHECK(memcmp(content_buf, content_data, strlen(content_data)) == 0); + + /* Step 6: Sync to disk to ensure persistence */ + ret = cio_chunk_sync(chunk); + TEST_CHECK(ret == CIO_OK); + + /* Step 7: Put chunk down and up again to test persistence */ + ret = cio_chunk_down(chunk); + TEST_CHECK(ret == CIO_OK); + + ret = cio_chunk_up(chunk); + TEST_CHECK(ret == CIO_OK); + + /* Step 8: Final validation after persistence cycle */ + ret = cio_meta_read(chunk, &meta_buf, &meta_len); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(meta_len == (int) strlen(large_meta)); + TEST_CHECK(memcmp(meta_buf, large_meta, strlen(large_meta)) == 0); + + ret = cio_chunk_get_content_copy(chunk, &content_buf, &content_size); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(content_size == strlen(content_data)); + TEST_CHECK(memcmp(content_buf, content_data, strlen(content_data)) == 0); + + /* Cleanup */ + free(content_buf); + cio_destroy(ctx); +} + TEST_LIST = { {"fs_write", test_fs_write}, {"fs_checksum", test_fs_checksum}, @@ -976,5 +1113,6 @@ TEST_LIST = { {"fs_deep_hierachy", test_deep_hierarchy}, {"legacy_success", test_legacy_success}, {"legacy_failure", test_legacy_failure}, + {"metadata_unsigned_underflow", test_metadata_unsigned_underflow}, { 0 } }; diff --git a/lib/chunkio/tests/fs_windows.c b/lib/chunkio/tests/fs_windows.c new file mode 100644 index 00000000000..8f3b5a63f25 --- /dev/null +++ b/lib/chunkio/tests/fs_windows.c @@ -0,0 +1,443 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Windows File Handling Inconsistency Tests + * ========================================== + * + * This test suite highlights inconsistencies between Windows and Unix + * implementations of file handling in chunkio: + * + * 1. Delete while open/mapped: Windows allows deletion of open/mapped files + * while Unix correctly rejects it + * + * 2. Sync without mapping: Windows accesses cf->map without checking if it's + * NULL, which can cause crashes + * + * 3. File mapping size mismatch: CreateFileMapping uses current file size + * but MapViewOfFile may request a larger size, causing potential issues + * + * 4. File descriptor check: cio_file.c uses Unix-specific cf->fd check + * instead of platform-agnostic cio_file_native_is_open() + * + * These tests are designed to demonstrate the issues and verify behavior. + */ + +#ifdef _WIN32 + +#include +#include +#include +#include +#include +#include + +/* Note: We still need to include cio_file_native.h for testing native functions + * directly to verify bug fixes, but we use public APIs for state checks */ + +#include "cio_tests_internal.h" + +#define CIO_ENV "tmp" + +/* Logging callback */ +static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line, + char *str) +{ + (void) ctx; + (void) level; + + printf("[cio-test-win32] %-60s => %s:%i\n", str, file, line); + return 0; +} + +/* + * ISSUE #1: Test deleting a file that is open/mapped + * + * Expected behavior: Delete should succeed after automatically releasing + * any outstanding mappings and handles. + */ +static void test_win32_delete_while_open() +{ + int ret; + int err; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct cio_file *cf; + struct cio_options cio_opts; + + printf("\n=== Test: Delete file while open ===\n"); + + cio_utils_recursive_delete("tmp"); + + cio_options_init(&cio_opts); + cio_opts.root_path = "tmp"; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_DEBUG; + + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + + stream = cio_stream_create(ctx, "test", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + + /* Open and map a file */ + chunk = cio_chunk_open(ctx, stream, "test-file-open", CIO_OPEN, 1000, &err); + TEST_CHECK(chunk != NULL); + + cf = (struct cio_file *) chunk->backend; + TEST_CHECK(cf != NULL); + + /* Verify file is open (using public API) */ + TEST_CHECK(cio_chunk_is_up(chunk) == CIO_TRUE); + + /* Delete while open - should succeed and close resources automatically */ + ret = cio_file_native_delete(cf); + printf("Result of delete while open: %d (expected: CIO_OK=%d)\n", + ret, CIO_OK); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(cio_file_native_is_open(cf) == CIO_FALSE); + TEST_CHECK(cio_file_native_is_mapped(cf) == CIO_FALSE); + + cio_chunk_close(chunk, CIO_FALSE); + cio_stream_delete(stream); + cio_destroy(ctx); +} + +/* + * ISSUE #2: Test deleting a file that is mapped + * + * Expected behavior: Delete should succeed after the implementation releases + * the mapping safely. + */ +static void test_win32_delete_while_mapped() +{ + int ret; + int err; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct cio_file *cf; + struct cio_options cio_opts; + + printf("\n=== Test: Delete file while mapped ===\n"); + + cio_utils_recursive_delete("tmp"); + + cio_options_init(&cio_opts); + cio_opts.root_path = "tmp"; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_DEBUG; + + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + + stream = cio_stream_create(ctx, "test", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + + /* Open and map a file */ + chunk = cio_chunk_open(ctx, stream, "test-file-mapped", CIO_OPEN, 1000, &err); + TEST_CHECK(chunk != NULL); + + cf = (struct cio_file *) chunk->backend; + TEST_CHECK(cf != NULL); + + /* Write some data to ensure mapping */ + ret = cio_chunk_write(chunk, "test data", 9); + TEST_CHECK(ret == 0); + + /* Verify file is mapped (using public API) */ + TEST_CHECK(cio_chunk_is_up(chunk) == CIO_TRUE); + + /* Delete while mapped - should succeed and release mapping */ + ret = cio_file_native_delete(cf); + printf("Result of delete while mapped: %d (expected: CIO_OK=%d)\n", + ret, CIO_OK); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(cio_file_native_is_open(cf) == CIO_FALSE); + TEST_CHECK(cio_file_native_is_mapped(cf) == CIO_FALSE); + + cio_chunk_close(chunk, CIO_FALSE); + cio_stream_delete(stream); + cio_destroy(ctx); +} + +/* + * ISSUE #3: Test syncing a file that is not mapped + * + * Expected behavior: Should check if mapped before accessing cf->map + * Current behavior: Accesses cf->map without checking, may crash + */ +static void test_win32_sync_without_map() +{ + int ret; + int err; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct cio_file *cf; + struct cio_options cio_opts; + + printf("\n=== Test: Sync file without mapping ===\n"); + + cio_utils_recursive_delete("tmp"); + + cio_options_init(&cio_opts); + cio_opts.root_path = "tmp"; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_DEBUG; + + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + + stream = cio_stream_create(ctx, "test", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + + /* Open a file but don't map it */ + chunk = cio_chunk_open(ctx, stream, "test-file-sync", CIO_OPEN, 1000, &err); + TEST_CHECK(chunk != NULL); + + cf = (struct cio_file *) chunk->backend; + TEST_CHECK(cf != NULL); + + /* Manually unmap if it was auto-mapped (using public API) */ + if (cio_chunk_is_up(chunk) == CIO_TRUE) { + ret = cio_file_down(chunk); + TEST_CHECK(ret == 0); + } + + /* Verify file is not mapped (using public API) */ + TEST_CHECK(cio_chunk_is_up(chunk) == CIO_FALSE); + printf("Verified: chunk is down (not mapped)\n"); + + /* Set synced flag to FALSE to force sync path (since cio_file_down syncs before unmapping) */ + cf->synced = CIO_FALSE; + + /* Try to sync without mapping using public API */ + /* cio_file_sync should auto-remap and emit a warning */ + printf("Attempting sync on unmapped file using cio_file_sync()...\n"); + printf("cio_file_sync() should remap and warn instead of failing\n"); + + ret = cio_file_sync(chunk); + printf("Result of sync without map: %d (expected: 0 for success with warning)\n", ret); + + TEST_CHECK(ret == 0); + TEST_CHECK(cio_chunk_is_up(chunk) == CIO_FALSE); + TEST_CHECK(cio_file_native_is_open(cf) == CIO_FALSE); + + cio_chunk_close(chunk, CIO_FALSE); + cio_stream_delete(stream); + cio_destroy(ctx); +} + +/* + * ISSUE #4: Test file mapping size mismatch + * + * Expected behavior: CreateFileMapping should use map_size, not current file size + * Current behavior: Creates mapping based on file size, then tries to map larger view + */ +static void test_win32_map_size_mismatch() +{ + int ret; + int err; + size_t file_size; + size_t map_size; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct cio_file *cf; + struct cio_options cio_opts; + + printf("\n=== Test: File mapping size mismatch ===\n"); + + cio_utils_recursive_delete("tmp"); + + cio_options_init(&cio_opts); + cio_opts.root_path = "tmp"; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_DEBUG; + + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + + stream = cio_stream_create(ctx, "test", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + + /* Create a small file first */ + chunk = cio_chunk_open(ctx, stream, "test-file-size", CIO_OPEN, 1024, &err); + TEST_CHECK(chunk != NULL); + + cf = (struct cio_file *) chunk->backend; + TEST_CHECK(cf != NULL); + + /* Write minimal data */ + ret = cio_chunk_write(chunk, "test", 4); + TEST_CHECK(ret == 0); + + /* Sync to ensure file is written */ + ret = cio_chunk_sync(chunk); + TEST_CHECK(ret == 0); + + /* Get actual file size */ + ret = cio_file_native_get_size(cf, &file_size); + TEST_CHECK(ret == CIO_OK); + printf("Actual file size: %zu bytes\n", file_size); + + /* Close the chunk to unmap */ + cio_chunk_close(chunk, CIO_FALSE); + + /* Reopen file */ + chunk = cio_chunk_open(ctx, stream, "test-file-size", CIO_OPEN_RD, 0, &err); + TEST_CHECK(chunk != NULL); + + cf = (struct cio_file *) chunk->backend; + TEST_CHECK(cf != NULL); + + /* Unmap if cio_chunk_open auto-mapped the file (using public API) */ + if (cio_chunk_is_up(chunk) == CIO_TRUE) { + ret = cio_file_down(chunk); + TEST_CHECK(ret == 0); + } + + /* Ensure file is still open */ + if (!cio_file_native_is_open(cf)) { + ret = cio_file_native_open(cf); + TEST_CHECK(ret == CIO_OK); + } + + /* Try to map with a size larger than the file */ + map_size = file_size + 4096; /* Request 4KB more than file size */ + printf("Attempting to map %zu bytes (file is %zu bytes)\n", map_size, file_size); + + /* This is where the issue occurs: CreateFileMapping uses current file size (0,0), + * but MapViewOfFile tries to map a larger size */ + ret = cio_file_native_map(cf, map_size); + printf("Result of mapping %zu bytes to %zu byte file: %d\n", + map_size, file_size, ret); + + /* For read-only files, mapping beyond file size is not possible on Windows. + * The mapping should be limited to file_size, and alloc_size should reflect + * the actual mapped size (file_size), not the requested size (map_size). + * This ensures consistency between CreateFileMappingA and MapViewOfFile sizes. */ + if (ret == CIO_OK) { + printf("Mapping succeeded\n"); + printf("Requested map_size: %zu, file_size: %zu\n", map_size, file_size); + + /* Verify what was actually mapped (using public API) */ + if (cio_chunk_is_up(chunk) == CIO_TRUE) { + printf("File is mapped, alloc_size: %zu\n", cf->alloc_size); + + /* For read-only files, alloc_size should match file_size (the actual mapped size), + * not the requested map_size (which exceeds file size) */ + /* This ensures consistency: CreateFileMappingA and MapViewOfFile both use actual_map_size */ + TEST_CHECK(cf->alloc_size == file_size); + + if (cf->alloc_size != file_size) { + printf("ISSUE DETECTED: alloc_size (%zu) doesn't match file_size (%zu)\n", + cf->alloc_size, file_size); + printf("For read-only files, mapping is limited to file_size\n"); + } + } + + ret = cio_file_native_unmap(cf); + TEST_CHECK(ret == CIO_OK); + } + else { + printf("Mapping failed when size mismatch occurs\n"); + /* For read-only files, this might be expected if map_size > file_size */ + } + + cio_file_native_close(cf); + cio_chunk_close(chunk, CIO_FALSE); + cio_stream_delete(stream); + cio_destroy(ctx); +} + +/* + * Test accessing file descriptor check inconsistency + * This tests the issue in cio_file.c line 804 where it checks cf->fd > 0 + * instead of using cio_file_native_is_open(cf) + */ +static void test_win32_fd_check_inconsistency() +{ + int ret; + int err; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct cio_file *cf; + struct cio_options cio_opts; + + printf("\n=== Test: File descriptor check inconsistency ===\n"); + + cio_utils_recursive_delete("tmp"); + + cio_options_init(&cio_opts); + cio_opts.root_path = "tmp"; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_DEBUG; + + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + + stream = cio_stream_create(ctx, "test", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + + /* Open a file */ + chunk = cio_chunk_open(ctx, stream, "test-file-fd", CIO_OPEN, 1000, &err); + TEST_CHECK(chunk != NULL); + + cf = (struct cio_file *) chunk->backend; + TEST_CHECK(cf != NULL); + + /* Verify file is open (using public API) */ + ret = cio_chunk_is_up(chunk); + TEST_CHECK(ret == CIO_TRUE); + printf("cio_chunk_is_up(chunk): %d\n", ret); + + /* Check cf->fd value on Windows (internal check for documentation) */ + /* On Windows, cf->fd is typically -1, but the file is still open via backing_file */ + printf("cf->fd value: %d (internal, not used on Windows)\n", cf->fd); + printf("Note: cio_file.c now uses cio_file_native_is_open() instead of cf->fd > 0\n"); + + cio_chunk_close(chunk, CIO_FALSE); + cio_stream_delete(stream); + cio_destroy(ctx); +} + +TEST_LIST = { + {"win32_delete_while_open", test_win32_delete_while_open}, + {"win32_delete_while_mapped", test_win32_delete_while_mapped}, + {"win32_sync_without_map", test_win32_sync_without_map}, + {"win32_map_size_mismatch", test_win32_map_size_mismatch}, + {"win32_fd_check_inconsistency", test_win32_fd_check_inconsistency}, + {NULL, NULL} +}; + +#else /* _WIN32 */ + +#include "cio_tests_internal.h" + +/* Empty test list for non-Windows platforms */ +TEST_LIST = { + {0} +}; + +#endif /* _WIN32 */ + diff --git a/lib/chunkio/tests/metadata_update.c b/lib/chunkio/tests/metadata_update.c new file mode 100644 index 00000000000..fc425f397ec --- /dev/null +++ b/lib/chunkio/tests/metadata_update.c @@ -0,0 +1,288 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2019 Eduardo Silva + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cio_tests_internal.h" + +#define CIO_ENV_META_TEST "/tmp/cio-metadata-update-test/" + +/* Logging callback */ +static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line, + char *str) +{ + (void) ctx; + (void) level; + (void) file; + (void) line; + + printf("[cio-test-metadata] %s\n", str); + return 0; +} + +/* + * Test case: Validate that updating metadata after writing content data + * correctly moves the content data and preserves both metadata and content + * integrity. + * + * This test specifically validates the fix for the bug where memmove() + * was using metadata size instead of content data size when moving content + * after metadata update. + */ +static void test_metadata_update_with_content() +{ + int ret; + int err; + char *meta_buf; + int meta_len; + void *content_buf; + size_t content_size; + size_t expected_content_size; + struct cio_ctx *ctx; + struct cio_chunk *chunk; + struct cio_stream *stream; + struct cio_options cio_opts; + + /* Test data */ + const char *initial_meta = "initial-metadata"; + const char *updated_meta = "this-is-a-much-longer-metadata-string-that-will-require-content-to-be-moved"; + const char *content_data = "This is test content data that must be preserved when metadata is updated."; + const char *more_content = " Additional content appended after metadata update."; + + /* Expected final content */ + char *expected_content; + size_t expected_content_len; + + /* Cleanup any existing test directory */ + cio_utils_recursive_delete(CIO_ENV_META_TEST); + + /* Initialize options */ + cio_options_init(&cio_opts); + cio_opts.root_path = CIO_ENV_META_TEST; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_INFO; + cio_opts.flags = CIO_CHECKSUM; + + /* Create context */ + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + if (!ctx) { + printf("cannot create context\n"); + exit(1); + } + + /* Create stream */ + stream = cio_stream_create(ctx, "test_stream", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + if (!stream) { + printf("cannot create stream\n"); + cio_destroy(ctx); + exit(1); + } + + /* Create chunk */ + chunk = cio_chunk_open(ctx, stream, "test_chunk", CIO_OPEN, 1000, &err); + TEST_CHECK(chunk != NULL); + if (!chunk) { + printf("cannot open chunk\n"); + cio_destroy(ctx); + exit(1); + } + + /* Step 1: Write initial metadata */ + ret = cio_meta_write(chunk, (char *) initial_meta, strlen(initial_meta)); + TEST_CHECK(ret == CIO_OK); + + /* Step 2: Write some content data */ + ret = cio_chunk_write(chunk, content_data, strlen(content_data)); + TEST_CHECK(ret == CIO_OK); + + expected_content_size = strlen(content_data); + + /* Step 3: Update metadata to a larger size (this triggers content move) */ + /* This is the critical test case - when metadata grows, content data + * must be moved correctly using cf->data_size, not the metadata size */ + ret = cio_meta_write(chunk, (char *) updated_meta, strlen(updated_meta)); + TEST_CHECK(ret == CIO_OK); + + /* Step 4: Write more content after metadata update */ + ret = cio_chunk_write(chunk, more_content, strlen(more_content)); + TEST_CHECK(ret == CIO_OK); + + expected_content_size += strlen(more_content); + + /* Build expected content */ + expected_content_len = strlen(content_data) + strlen(more_content); + expected_content = malloc(expected_content_len + 1); + TEST_CHECK(expected_content != NULL); + if (!expected_content) { + cio_destroy(ctx); + exit(1); + } + memcpy(expected_content, content_data, strlen(content_data)); + memcpy(expected_content + strlen(content_data), more_content, strlen(more_content)); + expected_content[expected_content_len] = '\0'; + + /* Step 5: Sync to disk */ + ret = cio_chunk_sync(chunk); + TEST_CHECK(ret == CIO_OK); + + /* Step 6: Put chunk down */ + ret = cio_chunk_down(chunk); + TEST_CHECK(ret == CIO_OK); + + /* Verify chunk is down */ + ret = cio_chunk_is_up(chunk); + TEST_CHECK(ret == CIO_FALSE); + + /* Step 7: Put chunk up again */ + ret = cio_chunk_up(chunk); + TEST_CHECK(ret == CIO_OK); + + /* Verify chunk is up */ + ret = cio_chunk_is_up(chunk); + TEST_CHECK(ret == CIO_TRUE); + + /* Step 8: Validate metadata */ + ret = cio_meta_read(chunk, &meta_buf, &meta_len); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(meta_len == (int) strlen(updated_meta)); + TEST_CHECK(memcmp(meta_buf, updated_meta, strlen(updated_meta)) == 0); + + /* Step 9: Validate content data */ + ret = cio_chunk_get_content_copy(chunk, &content_buf, &content_size); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(content_size == expected_content_size); + TEST_CHECK(memcmp(content_buf, expected_content, expected_content_len) == 0); + + /* Cleanup */ + free(expected_content); + free(content_buf); + cio_destroy(ctx); +} + +/* + * Test case: Update metadata multiple times with varying sizes to ensure + * content data integrity is maintained throughout. + */ +static void test_metadata_multiple_updates() +{ + int ret; + int err; + char *meta_buf; + int meta_len; + void *content_buf; + size_t content_size; + struct cio_ctx *ctx; + struct cio_chunk *chunk; + struct cio_stream *stream; + struct cio_options cio_opts; + const char *test_strings[] = { + "small", + "medium-sized-metadata", + "very-long-metadata-string-that-exceeds-previous-sizes", + "tiny", + "another-medium-metadata-string" + }; + const char *content = "Test content that must remain intact"; + int i; + + /* Cleanup any existing test directory */ + cio_utils_recursive_delete(CIO_ENV_META_TEST); + + /* Initialize options */ + cio_options_init(&cio_opts); + cio_opts.root_path = CIO_ENV_META_TEST; + cio_opts.log_cb = log_cb; + cio_opts.log_level = CIO_LOG_INFO; + cio_opts.flags = CIO_CHECKSUM; + + /* Create context */ + ctx = cio_create(&cio_opts); + TEST_CHECK(ctx != NULL); + + /* Create stream */ + stream = cio_stream_create(ctx, "test_stream", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + + /* Create chunk */ + chunk = cio_chunk_open(ctx, stream, "test_chunk2", CIO_OPEN, 1000, &err); + TEST_CHECK(chunk != NULL); + + /* Write initial content */ + ret = cio_chunk_write(chunk, content, strlen(content)); + TEST_CHECK(ret == CIO_OK); + + /* Update metadata multiple times with different sizes */ + for (i = 0; i < 5; i++) { + ret = cio_meta_write(chunk, (char *) test_strings[i], strlen(test_strings[i])); + TEST_CHECK(ret == CIO_OK); + + /* Verify metadata after each update */ + ret = cio_meta_read(chunk, &meta_buf, &meta_len); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(meta_len == (int) strlen(test_strings[i])); + TEST_CHECK(memcmp(meta_buf, test_strings[i], strlen(test_strings[i])) == 0); + + /* Verify content remains intact */ + ret = cio_chunk_get_content_copy(chunk, &content_buf, &content_size); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(content_size == strlen(content)); + TEST_CHECK(memcmp(content_buf, content, strlen(content)) == 0); + free(content_buf); + } + + /* Sync and test persistence */ + ret = cio_chunk_sync(chunk); + TEST_CHECK(ret == CIO_OK); + + ret = cio_chunk_down(chunk); + TEST_CHECK(ret == CIO_OK); + + ret = cio_chunk_up(chunk); + TEST_CHECK(ret == CIO_OK); + + /* Final validation after up/down cycle */ + ret = cio_meta_read(chunk, &meta_buf, &meta_len); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(meta_len == (int) strlen(test_strings[4])); + TEST_CHECK(memcmp(meta_buf, test_strings[4], strlen(test_strings[4])) == 0); + + ret = cio_chunk_get_content_copy(chunk, &content_buf, &content_size); + TEST_CHECK(ret == CIO_OK); + TEST_CHECK(content_size == strlen(content)); + TEST_CHECK(memcmp(content_buf, content, strlen(content)) == 0); + + /* Cleanup */ + free(content_buf); + cio_destroy(ctx); +} + +TEST_LIST = { + {"metadata_update_with_content", test_metadata_update_with_content}, + {"metadata_multiple_updates", test_metadata_multiple_updates}, + { 0 } +}; diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 9f1c5af4e33..656a17815bc 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -39,6 +39,8 @@ #include #include #include +#include +#include #ifdef FLB_HAVE_CHUNK_TRACE @@ -60,6 +62,155 @@ struct input_chunk_raw { size_t buf_size; }; +struct flb_input_chunk_meta_view { + char *buffer; + int length; + const char *tag; + int tag_length; + uint8_t flags; + uint8_t *routing_data; + uint16_t routing_data_length; +}; + +static inline int input_chunk_has_magic_bytes(char *buf, int len) +{ + unsigned char *p; + + if (len < FLB_INPUT_CHUNK_META_HEADER) { + return FLB_FALSE; + } + + p = (unsigned char *) buf; + if (p[0] == FLB_INPUT_CHUNK_MAGIC_BYTE_0 && + p[1] == FLB_INPUT_CHUNK_MAGIC_BYTE_1) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int input_chunk_metadata_view(struct flb_input_chunk *ic, + struct flb_input_chunk_meta_view *view) +{ + int ret; + int len; + int has_magic; + int payload_length; + int offset; + int computed_tag_len; + uint8_t flags; + uint16_t routing_length; + char *buf; + char *terminator; + const char *tag_start; + + ret = cio_meta_read(ic->chunk, &buf, &len); + if (ret == -1) { + return -1; + } + + view->buffer = buf; + view->length = len; + view->flags = 0; + view->routing_data = NULL; + view->routing_data_length = 0; + view->tag = buf; + view->tag_length = len; + + has_magic = input_chunk_has_magic_bytes(buf, len); + if (has_magic == FLB_FALSE) { + return 0; + } + + payload_length = len - FLB_INPUT_CHUNK_META_HEADER; + if (payload_length < 0) { + payload_length = 0; + } + + tag_start = buf + FLB_INPUT_CHUNK_META_HEADER; + view->tag = tag_start; + view->tag_length = payload_length; + flags = (uint8_t) buf[3]; + view->flags = flags; + + terminator = memchr(tag_start, '\0', payload_length); + if (terminator) { + view->tag_length = (int) (terminator - tag_start); + } + + if ((flags & FLB_CHUNK_FLAG_DIRECT_ROUTES) == 0) { + return 0; + } + + if (payload_length <= 0) { + return 0; + } + + if (!terminator) { + return 1; + } + + computed_tag_len = view->tag_length; + offset = computed_tag_len + 1; + + if (payload_length < offset + (int) sizeof(uint16_t)) { + return 1; + } + + routing_length = (uint16_t) (((unsigned char) tag_start[offset] << 8) | + (unsigned char) tag_start[offset + 1]); + + if (payload_length < offset + (int) sizeof(uint16_t) + routing_length) { + return 1; + } + + view->routing_data_length = routing_length; + view->routing_data = ((uint8_t *) tag_start) + offset + (int) sizeof(uint16_t); + + return 0; +} + +#define ROUTE_PLUGIN_NAME_LEN(route) \ + ((route)->plugin_name_length > 0 ? (int) (route)->plugin_name_length : \ + (route)->plugin_name != NULL ? (int) strlen((route)->plugin_name) : 0) + +int flb_chunk_route_plugin_matches(struct flb_output_instance *o_ins, + const struct flb_chunk_direct_route *route) +{ + int stored_length; + int candidate_length; + + if (!route) { + return FLB_FALSE; + } + + if (!route->plugin_name || route->plugin_name[0] == '\0') { + return FLB_TRUE; + } + + if (!o_ins || !o_ins->p || !o_ins->p->name) { + return FLB_FALSE; + } + + stored_length = ROUTE_PLUGIN_NAME_LEN(route); + if (stored_length <= 0) { + return FLB_FALSE; + } + + candidate_length = (int) strlen(o_ins->p->name); + if (candidate_length != stored_length) { + return FLB_FALSE; + } + + if (strncmp(o_ins->p->name, route->plugin_name, (size_t) stored_length) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +#undef ROUTE_PLUGIN_NAME_LEN + #ifdef FLB_HAVE_IN_STORAGE_BACKLOG extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, @@ -602,23 +753,172 @@ int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_siz i_ins->config); } +static int input_chunk_collect_output_references(struct flb_config *config, + const struct flb_chunk_direct_route *route, + struct flb_output_instance ***out_matches, + size_t *out_count) +{ + size_t index; + size_t count; + int alias_length; + int label_length; + int name_length; + const char *label; + uint32_t stored_id; + struct mk_list *head; + struct flb_output_instance *o_ins; + struct flb_output_instance **matches; + + if (!config || !route || !out_matches || !out_count) { + return -1; + } + + *out_matches = NULL; + *out_count = 0; + + label = route->label; + label_length = 0; + stored_id = route->id; + if (label != NULL) { + label_length = route->label_length; + if (label_length == 0) { + label_length = (int) strlen(label); + } + } + + count = 0; + if (label != NULL && label_length > 0) { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + count++; + } + } + } + + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + name_length = (int) strlen(o_ins->name); + if (name_length == label_length && + strncmp(o_ins->name, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0) { + continue; + } + } + count++; + } + } + + if (count == 0) { + return 0; + } + } + else { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if ((uint32_t) o_ins->id == stored_id && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + count++; + } + } + + if (count == 0) { + return 0; + } + } + + matches = flb_calloc(count, sizeof(struct flb_output_instance *)); + if (!matches) { + flb_errno(); + return -1; + } + + index = 0; + if (label != NULL && label_length > 0) { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + matches[index++] = o_ins; + } + } + } + + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + name_length = (int) strlen(o_ins->name); + if (name_length == label_length && + strncmp(o_ins->name, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0) { + continue; + } + } + matches[index++] = o_ins; + } + } + } + else { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if ((uint32_t) o_ins->id == stored_id && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + matches[index++] = o_ins; + } + } + } + + *out_matches = matches; + *out_count = index; + + return 0; +} + /* Create an input chunk using a Chunk I/O */ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, int event_type, void *chunk) { - int records = 0; + int records; int tag_len; int has_routes; int ret; + int direct_status; + int direct_loaded; + int direct_index; + int direct_missing; + uint32_t missing_id; + uint16_t missing_label_length; + struct flb_chunk_direct_route *direct_routes; + const char *missing_label; + int direct_count; uint64_t ts; char *buf_data; size_t buf_size; size_t offset; ssize_t bytes; const char *tag_buf; + struct flb_output_instance **direct_matches; + size_t direct_match_count; + size_t direct_match_index; struct flb_input_chunk *ic; + records = 0; + /* Create context for the input instance */ ic = flb_calloc(1, sizeof(struct flb_input_chunk)); if (!ic) { @@ -778,6 +1078,98 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, return NULL; } + direct_routes = NULL; + direct_count = 0; + direct_loaded = FLB_FALSE; + missing_label = NULL; + missing_label_length = 0; + direct_status = flb_input_chunk_get_direct_routes(ic, &direct_routes, &direct_count); + if (direct_status == 0 && direct_count > 0) { + direct_missing = FLB_FALSE; + missing_id = 0; + for (direct_index = 0; direct_index < direct_count; direct_index++) { + direct_matches = NULL; + direct_match_count = 0; + ret = input_chunk_collect_output_references(in->config, + &direct_routes[direct_index], + &direct_matches, + &direct_match_count); + if (ret == -1) { + flb_plg_error(in, + "failed collecting restored routes for chunk %s", + flb_input_chunk_get_name(ic)); + } + + if (ret != 0 || direct_match_count == 0) { + direct_missing = FLB_TRUE; + missing_id = direct_routes[direct_index].id; + missing_label = direct_routes[direct_index].label; + missing_label_length = direct_routes[direct_index].label_length; + if (missing_label_length == 0 && missing_label != NULL) { + missing_label_length = (uint16_t) strlen(missing_label); + } + if (direct_matches != NULL) { + flb_free(direct_matches); + } + break; + } + + if (direct_matches != NULL) { + flb_free(direct_matches); + } + } + + if (direct_missing == FLB_FALSE) { + memset(ic->routes_mask, 0, + sizeof(flb_route_mask_element) * in->config->route_mask_size); + has_routes = 0; + for (direct_index = 0; direct_index < direct_count; direct_index++) { + direct_matches = NULL; + direct_match_count = 0; + ret = input_chunk_collect_output_references(in->config, + &direct_routes[direct_index], + &direct_matches, + &direct_match_count); + if (ret != 0 || direct_match_count == 0 || direct_matches == NULL) { + if (direct_matches != NULL) { + flb_free(direct_matches); + } + continue; + } + + for (direct_match_index = 0; + direct_match_index < direct_match_count; + direct_match_index++) { + flb_routes_mask_set_bit(ic->routes_mask, + direct_matches[direct_match_index]->id, + in->config); + has_routes++; + } + + flb_free(direct_matches); + } + direct_loaded = FLB_TRUE; + } + else { + flb_plg_warn(in, + "direct route output id=%u label=%.*s not found for chunk %s, falling back to tag routing", + (unsigned int) missing_id, + (int) missing_label_length, + missing_label ? missing_label : "", + flb_input_chunk_get_name(ic)); + } + } + else if (direct_status == -2) { + flb_plg_warn(in, + "invalid direct routing metadata for chunk %s, falling back to tag routing", + flb_input_chunk_get_name(ic)); + } + + if (direct_routes) { + flb_input_chunk_destroy_direct_routes(direct_routes, direct_count); + direct_routes = NULL; + } + bytes = flb_input_chunk_get_real_size(ic); if (bytes < 0) { flb_warn("[input chunk] could not retrieve chunk real size"); @@ -786,10 +1178,22 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, return NULL; } - has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in); - if (has_routes == 0) { - flb_warn("[input chunk] no matching route for backoff log chunk %s", - flb_input_chunk_get_name(ic)); + if (direct_loaded == FLB_FALSE) { + has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in); + if (has_routes == 0) { + flb_warn("[input chunk] no matching route for backoff log chunk %s", + flb_input_chunk_get_name(ic)); + } + } + else if (has_routes == 0) { + flb_plg_warn(in, + "direct routing metadata for chunk %s produced no routes, falling back to tag routing", + flb_input_chunk_get_name(ic)); + has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in); + if (has_routes == 0) { + flb_warn("[input chunk] no matching route for backoff log chunk %s", + flb_input_chunk_get_name(ic)); + } } mk_list_add(&ic->_head, &in->chunks); @@ -871,6 +1275,658 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type, return 0; } +int flb_input_chunk_write_header_v2(struct cio_chunk *chunk, + int event_type, + char *tag, int tag_len, + const struct flb_chunk_direct_route *routes, + int route_count) +{ + int has_labels; + int has_plugins; + int wide_ids; + int index; + int max_tag_len; + int meta_size; + int offset; + int ret; + int id_offset; + int label_offset; + int plugin_lengths_offset; + int plugin_data_offset; + int id_bytes; + uint16_t label_length; + uint16_t plugin_length; + uint16_t routing_length; + uint16_t stored_count; + uint16_t *resolved_lengths; + uint16_t *resolved_plugin_lengths; + uint16_t stored_label_length; + size_t labels_total; + size_t plugins_total; + size_t routing_payload_bytes; + size_t computed_length; + uint8_t flags; + char *meta; + + has_labels = FLB_FALSE; + has_plugins = FLB_FALSE; + wide_ids = FLB_FALSE; + index = 0; + max_tag_len = 0; + meta_size = 0; + offset = 0; + ret = 0; + id_offset = 0; + label_offset = 0; + plugin_lengths_offset = 0; + plugin_data_offset = 0; + id_bytes = (int) sizeof(uint16_t); + label_length = 0; + plugin_length = 0; + routing_length = 0; + stored_count = 0; + resolved_lengths = NULL; + resolved_plugin_lengths = NULL; + labels_total = 0; + plugins_total = 0; + routing_payload_bytes = 0; + computed_length = 0; + flags = 0; + meta = NULL; + + if (!chunk || !tag || !routes || route_count <= 0) { + return -1; + } + + if (route_count > UINT16_MAX) { + return -1; + } + + resolved_lengths = flb_calloc((size_t) route_count, sizeof(uint16_t)); + if (!resolved_lengths) { + flb_errno(); + return -1; + } + + resolved_plugin_lengths = flb_calloc((size_t) route_count, sizeof(uint16_t)); + if (!resolved_plugin_lengths) { + flb_errno(); + flb_free(resolved_lengths); + return -1; + } + + for (index = 0; index < route_count; index++) { + if (routes[index].id > UINT16_MAX) { + wide_ids = FLB_TRUE; + } + label_length = routes[index].label_length; + plugin_length = routes[index].plugin_name_length; + if (routes[index].label != NULL) { + if (label_length == 0) { + computed_length = strlen(routes[index].label); + if (computed_length > UINT16_MAX) { + computed_length = UINT16_MAX; + } + label_length = (uint16_t) computed_length; + } + else if (label_length > UINT16_MAX) { + label_length = UINT16_MAX; + } + + if (routes[index].label_is_alias != 0 && + label_length > FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK) { + label_length = FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK; + } + + if (label_length > 0) { + has_labels = FLB_TRUE; + } + } + else { + label_length = 0; + } + + resolved_lengths[index] = label_length; + labels_total += (size_t) label_length; + + if (routes[index].plugin_name != NULL) { + if (plugin_length == 0) { + computed_length = strlen(routes[index].plugin_name); + if (computed_length > UINT16_MAX) { + computed_length = UINT16_MAX; + } + plugin_length = (uint16_t) computed_length; + } + else if (plugin_length > UINT16_MAX) { + plugin_length = UINT16_MAX; + } + + if (plugin_length > 0) { + has_plugins = FLB_TRUE; + } + } + else { + plugin_length = 0; + } + + resolved_plugin_lengths[index] = plugin_length; + plugins_total += (size_t) plugin_length; + } + + if (wide_ids == FLB_TRUE) { + id_bytes = (int) sizeof(uint32_t); + } + else { + id_bytes = (int) sizeof(uint16_t); + } + + routing_payload_bytes = sizeof(uint16_t) + + ((size_t) route_count * (size_t) id_bytes); + if (has_labels == FLB_TRUE) { + routing_payload_bytes += ((size_t) route_count * sizeof(uint16_t)) + + labels_total; + } + if (has_plugins == FLB_TRUE) { + routing_payload_bytes += ((size_t) route_count * sizeof(uint16_t)) + + plugins_total; + } + + if (routing_payload_bytes > UINT16_MAX) { + flb_free(resolved_lengths); + flb_free(resolved_plugin_lengths); + return -1; + } + + routing_length = (uint16_t) routing_payload_bytes; + max_tag_len = 65535 - (int) (FLB_INPUT_CHUNK_META_HEADER + 1 + sizeof(uint16_t) + routing_length); + if (max_tag_len < 0) { + max_tag_len = 0; + } + if (tag_len > max_tag_len) { + tag_len = max_tag_len; + } + + meta_size = FLB_INPUT_CHUNK_META_HEADER + tag_len + 1 + sizeof(uint16_t) + routing_length; + meta = flb_calloc(1, meta_size); + if (!meta) { + flb_errno(); + flb_free(resolved_lengths); + flb_free(resolved_plugin_lengths); + return -1; + } + + meta[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0; + meta[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1; + + if (event_type == FLB_INPUT_LOGS) { + meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS; + } + else if (event_type == FLB_INPUT_METRICS) { + meta[2] = FLB_INPUT_CHUNK_TYPE_METRICS; + } + else if (event_type == FLB_INPUT_TRACES) { + meta[2] = FLB_INPUT_CHUNK_TYPE_TRACES; + } + else if (event_type == FLB_INPUT_PROFILES) { + meta[2] = FLB_INPUT_CHUNK_TYPE_PROFILES; + } + + flags = FLB_CHUNK_FLAG_DIRECT_ROUTES; + if (has_labels == FLB_TRUE) { + flags |= FLB_CHUNK_FLAG_DIRECT_ROUTE_LABELS; + } + if (wide_ids == FLB_TRUE) { + flags |= FLB_CHUNK_FLAG_DIRECT_ROUTE_WIDE_IDS; + } + if (has_plugins == FLB_TRUE) { + flags |= FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS; + } + meta[3] = (char) flags; + + memcpy(meta + FLB_INPUT_CHUNK_META_HEADER, tag, tag_len); + meta[FLB_INPUT_CHUNK_META_HEADER + tag_len] = '\0'; + + offset = FLB_INPUT_CHUNK_META_HEADER + tag_len + 1; + meta[offset] = (uint8_t) (routing_length >> 8); + meta[offset + 1] = (uint8_t) (routing_length & 0xFF); + + stored_count = (uint16_t) route_count; + meta[offset + 2] = (uint8_t) (stored_count >> 8); + meta[offset + 3] = (uint8_t) (stored_count & 0xFF); + + id_offset = offset + 4; + for (index = 0; index < route_count; index++) { + if (wide_ids == FLB_TRUE) { + meta[id_offset] = (uint8_t) ((routes[index].id >> 24) & 0xFF); + meta[id_offset + 1] = (uint8_t) ((routes[index].id >> 16) & 0xFF); + meta[id_offset + 2] = (uint8_t) ((routes[index].id >> 8) & 0xFF); + meta[id_offset + 3] = (uint8_t) (routes[index].id & 0xFF); + } + else { + meta[id_offset] = (uint8_t) ((routes[index].id >> 8) & 0xFF); + meta[id_offset + 1] = (uint8_t) (routes[index].id & 0xFF); + } + id_offset += id_bytes; + } + + label_offset = offset + 4 + (route_count * id_bytes); + + if (has_labels == FLB_TRUE) { + for (index = 0; index < route_count; index++) { + stored_label_length = resolved_lengths[index]; + if (routes[index].label_is_alias != 0 && stored_label_length > 0) { + if (stored_label_length > FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK) { + stored_label_length = FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK; + } + stored_label_length |= FLB_CHUNK_DIRECT_ROUTE_LABEL_ALIAS_FLAG; + } + meta[label_offset] = (uint8_t) (stored_label_length >> 8); + meta[label_offset + 1] = (uint8_t) (stored_label_length & 0xFF); + label_offset += sizeof(uint16_t); + } + + for (index = 0; index < route_count; index++) { + label_length = resolved_lengths[index]; + if (label_length > 0 && routes[index].label != NULL) { + memcpy(meta + label_offset, routes[index].label, label_length); + } + label_offset += label_length; + } + } + + plugin_lengths_offset = label_offset; + + if (has_plugins == FLB_TRUE) { + for (index = 0; index < route_count; index++) { + plugin_length = resolved_plugin_lengths[index]; + meta[plugin_lengths_offset] = (uint8_t) (plugin_length >> 8); + meta[plugin_lengths_offset + 1] = (uint8_t) (plugin_length & 0xFF); + plugin_lengths_offset += sizeof(uint16_t); + } + + plugin_data_offset = plugin_lengths_offset; + for (index = 0; index < route_count; index++) { + plugin_length = resolved_plugin_lengths[index]; + if (plugin_length > 0 && routes[index].plugin_name != NULL) { + memcpy(meta + plugin_data_offset, + routes[index].plugin_name, + plugin_length); + } + plugin_data_offset += plugin_length; + } + } + + ret = cio_meta_write(chunk, (char *) meta, meta_size); + if (ret == -1) { + flb_error("[input chunk] could not write metadata"); + flb_free(resolved_lengths); + flb_free(resolved_plugin_lengths); + flb_free(meta); + return -1; + } + + flb_free(resolved_lengths); + flb_free(resolved_plugin_lengths); + flb_free(meta); + + return 0; +} + +int flb_input_chunk_has_direct_routes(struct flb_input_chunk *ic) +{ + int ret; + struct flb_input_chunk_meta_view view; + + ret = input_chunk_metadata_view(ic, &view); + if (ret == -1) { + return FLB_FALSE; + } + + if ((view.flags & FLB_CHUNK_FLAG_DIRECT_ROUTES) == 0) { + return FLB_FALSE; + } + + if (view.routing_data == NULL) { + return FLB_FALSE; + } + + if (view.routing_data_length < sizeof(uint16_t)) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +int flb_input_chunk_get_direct_routes(struct flb_input_chunk *ic, + struct flb_chunk_direct_route **routes, + int *route_count) +{ + int index; + int labels_present; + int plugins_present; + int wide_ids; + int ret; + int id_offset; + int lengths_offset; + int label_data_offset; + int plugin_lengths_offset; + int plugin_data_offset; + int id_bytes; + size_t remaining; + size_t plugin_remaining; + uint16_t routing_length; + uint16_t stored_count; + uint16_t *label_lengths; + uint8_t *label_alias_flags; + uint16_t *plugin_lengths; + struct flb_chunk_direct_route *result; + struct flb_input_chunk_meta_view view; + uint32_t read_id; + + index = 0; + labels_present = FLB_FALSE; + plugins_present = FLB_FALSE; + wide_ids = FLB_FALSE; + ret = 0; + id_offset = 0; + lengths_offset = 0; + label_data_offset = 0; + plugin_lengths_offset = 0; + plugin_data_offset = 0; + id_bytes = (int) sizeof(uint16_t); + remaining = 0; + plugin_remaining = 0; + routing_length = 0; + stored_count = 0; + label_lengths = NULL; + label_alias_flags = NULL; + plugin_lengths = NULL; + result = NULL; + read_id = 0; + + if (!routes || !route_count) { + return -1; + } + + *routes = NULL; + *route_count = 0; + + ret = input_chunk_metadata_view(ic, &view); + if (ret == -1) { + return -1; + } + + if ((view.flags & FLB_CHUNK_FLAG_DIRECT_ROUTES) == 0) { + return 0; + } + + if (view.routing_data == NULL || view.routing_data_length < sizeof(uint16_t)) { + return -2; + } + + routing_length = view.routing_data_length; + stored_count = (uint16_t) (((unsigned char) view.routing_data[0] << 8) | + (unsigned char) view.routing_data[1]); + + if (stored_count == 0) { + return 0; + } + + wide_ids = ((view.flags & FLB_CHUNK_FLAG_DIRECT_ROUTE_WIDE_IDS) != 0); + if (wide_ids == FLB_TRUE) { + id_bytes = (int) sizeof(uint32_t); + } + else { + id_bytes = (int) sizeof(uint16_t); + } + + if ((size_t) routing_length < (sizeof(uint16_t) + + ((size_t) stored_count * (size_t) id_bytes))) { + return -2; + } + + labels_present = ((view.flags & FLB_CHUNK_FLAG_DIRECT_ROUTE_LABELS) != 0); + plugins_present = ((view.flags & FLB_CHUNK_FLAG_DIRECT_ROUTE_PLUGIN_IDS) != 0); + + result = flb_calloc((size_t) stored_count, sizeof(struct flb_chunk_direct_route)); + if (!result) { + flb_errno(); + return -1; + } + + id_offset = sizeof(uint16_t); + for (index = 0; index < stored_count; index++) { + if (wide_ids == FLB_TRUE) { + read_id = ((uint32_t) ((unsigned char) view.routing_data[id_offset]) << 24) | + ((uint32_t) ((unsigned char) view.routing_data[id_offset + 1]) << 16) | + ((uint32_t) ((unsigned char) view.routing_data[id_offset + 2]) << 8) | + (uint32_t) ((unsigned char) view.routing_data[id_offset + 3]); + } + else { + read_id = ((uint32_t) ((unsigned char) view.routing_data[id_offset]) << 8) | + (uint32_t) ((unsigned char) view.routing_data[id_offset + 1]); + } + result[index].id = read_id; + result[index].label = NULL; + result[index].label_length = 0; + result[index].plugin_name = NULL; + result[index].plugin_name_length = 0; + id_offset += id_bytes; + } + + lengths_offset = sizeof(uint16_t) + (stored_count * id_bytes); + if ((size_t) lengths_offset > routing_length) { + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + + label_data_offset = lengths_offset; + + if (labels_present == FLB_TRUE) { + label_lengths = flb_calloc((size_t) stored_count, sizeof(uint16_t)); + if (!label_lengths) { + flb_errno(); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -1; + } + + label_alias_flags = flb_calloc((size_t) stored_count, sizeof(uint8_t)); + if (!label_alias_flags) { + flb_errno(); + flb_free(label_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -1; + } + + for (index = 0; index < stored_count; index++) { + if ((size_t) (label_data_offset + (int) sizeof(uint16_t)) > routing_length) { + flb_free(label_alias_flags); + flb_free(label_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + label_lengths[index] = (uint16_t) (((unsigned char) view.routing_data[label_data_offset] << 8) | + (unsigned char) view.routing_data[label_data_offset + 1]); + if (plugins_present == FLB_TRUE) { + if ((label_lengths[index] & FLB_CHUNK_DIRECT_ROUTE_LABEL_ALIAS_FLAG) != 0) { + label_alias_flags[index] = FLB_TRUE; + label_lengths[index] &= FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK; + } + else { + label_alias_flags[index] = FLB_FALSE; + } + } + else { + /* Even when plugins_present is FALSE, the alias flag may still be encoded */ + if ((label_lengths[index] & FLB_CHUNK_DIRECT_ROUTE_LABEL_ALIAS_FLAG) != 0) { + label_alias_flags[index] = FLB_TRUE; + label_lengths[index] &= FLB_CHUNK_DIRECT_ROUTE_LABEL_LENGTH_MASK; + } + else { + label_alias_flags[index] = FLB_TRUE; + } + } + label_data_offset += sizeof(uint16_t); + } + + if ((size_t) label_data_offset > routing_length) { + flb_free(label_alias_flags); + flb_free(label_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + + remaining = routing_length - (size_t) label_data_offset; + + for (index = 0; index < stored_count; index++) { + if (label_lengths[index] == 0) { + result[index].label = NULL; + result[index].label_length = 0; + if (label_alias_flags) { + result[index].label_is_alias = label_alias_flags[index]; + } + else if (plugins_present == FLB_FALSE) { + result[index].label_is_alias = FLB_TRUE; + } + continue; + } + + if (label_lengths[index] > remaining) { + flb_free(label_alias_flags); + flb_free(label_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + + result[index].label = flb_malloc((size_t) label_lengths[index] + 1); + if (!result[index].label) { + flb_errno(); + flb_free(label_alias_flags); + flb_free(label_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -1; + } + + memcpy((char *) result[index].label, + view.routing_data + label_data_offset, + label_lengths[index]); + ((char *) result[index].label)[label_lengths[index]] = '\0'; + result[index].label_length = label_lengths[index]; + if (label_alias_flags) { + result[index].label_is_alias = label_alias_flags[index]; + } + else if (plugins_present == FLB_FALSE) { + result[index].label_is_alias = FLB_TRUE; + } + label_data_offset += label_lengths[index]; + remaining -= label_lengths[index]; + } + + flb_free(label_alias_flags); + flb_free(label_lengths); + } + else { + remaining = routing_length - (size_t) label_data_offset; + } + + plugin_lengths_offset = label_data_offset; + + if (plugins_present == FLB_TRUE) { + plugin_lengths = flb_calloc((size_t) stored_count, sizeof(uint16_t)); + if (!plugin_lengths) { + flb_errno(); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -1; + } + + for (index = 0; index < stored_count; index++) { + if ((size_t) (plugin_lengths_offset + (int) sizeof(uint16_t)) > routing_length) { + flb_free(plugin_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + plugin_lengths[index] = (uint16_t) (((unsigned char) view.routing_data[plugin_lengths_offset] << 8) | + (unsigned char) view.routing_data[plugin_lengths_offset + 1]); + plugin_lengths_offset += sizeof(uint16_t); + } + + plugin_data_offset = plugin_lengths_offset; + if ((size_t) plugin_data_offset > routing_length) { + flb_free(plugin_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + + plugin_remaining = routing_length - (size_t) plugin_data_offset; + + for (index = 0; index < stored_count; index++) { + if (plugin_lengths[index] == 0) { + result[index].plugin_name = NULL; + result[index].plugin_name_length = 0; + continue; + } + + if (plugin_lengths[index] > plugin_remaining) { + flb_free(plugin_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -2; + } + + result[index].plugin_name = flb_malloc((size_t) plugin_lengths[index] + 1); + if (!result[index].plugin_name) { + flb_errno(); + flb_free(plugin_lengths); + flb_input_chunk_destroy_direct_routes(result, stored_count); + return -1; + } + + memcpy((char *) result[index].plugin_name, + view.routing_data + plugin_data_offset, + plugin_lengths[index]); + ((char *) result[index].plugin_name)[plugin_lengths[index]] = '\0'; + result[index].plugin_name_length = plugin_lengths[index]; + plugin_data_offset += plugin_lengths[index]; + plugin_remaining -= plugin_lengths[index]; + } + + flb_free(plugin_lengths); + } + else { + for (index = 0; index < stored_count; index++) { + result[index].plugin_name = NULL; + result[index].plugin_name_length = 0; + } + } + + *routes = result; + *route_count = stored_count; + + return 0; +} + +void flb_input_chunk_destroy_direct_routes(struct flb_chunk_direct_route *routes, + int route_count) +{ + int index; + + index = 0; + + if (!routes) { + return; + } + + for (index = 0; index < route_count; index++) { + if (routes[index].label != NULL && routes[index].label_length > 0) { + flb_free((void *) routes[index].label); + } + if (routes[index].plugin_name != NULL) { + flb_free((void *) routes[index].plugin_name); + } + } + + flb_free(routes); +} + struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, const char *tag, int tag_len) { @@ -2111,23 +3167,6 @@ flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic) return ch->name; } -static inline int input_chunk_has_magic_bytes(char *buf, int len) -{ - unsigned char *p; - - if (len < FLB_INPUT_CHUNK_META_HEADER) { - return FLB_FALSE; - } - - p = (unsigned char *) buf; - if (p[0] == FLB_INPUT_CHUNK_MAGIC_BYTE_0 && - p[1] == FLB_INPUT_CHUNK_MAGIC_BYTE_1 && p[3] == 0) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - /* * Get the event type by retrieving metadata header. NOTE: this function only event type discovery by looking at the * headers bytes of a chunk that exists on disk. @@ -2173,29 +3212,20 @@ int flb_input_chunk_get_event_type(struct flb_input_chunk *ic) int flb_input_chunk_get_tag(struct flb_input_chunk *ic, const char **tag_buf, int *tag_len) { - int len; int ret; - char *buf; + struct flb_input_chunk_meta_view view; - ret = cio_meta_read(ic->chunk, &buf, &len); + ret = input_chunk_metadata_view(ic, &view); if (ret == -1) { *tag_len = -1; *tag_buf = NULL; return -1; } - /* If magic bytes exists, just set the offset */ - if (input_chunk_has_magic_bytes(buf, len)) { - *tag_len = len - FLB_INPUT_CHUNK_META_HEADER; - *tag_buf = buf + FLB_INPUT_CHUNK_META_HEADER; - } - else { - /* Old Chunk version without magic bytes */ - *tag_len = len; - *tag_buf = buf; - } + *tag_buf = view.tag; + *tag_len = view.tag_length; - return ret; + return 0; } /* diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 4eb281a7497..9bc799d02ac 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -102,9 +102,18 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, size_t out_size = 0; size_t chunk_size_sz = 0; ssize_t chunk_size; + int direct_count; + int direct_index; + int write_ret; + int label_is_alias; struct cfl_list *head; struct flb_input_chunk *chunk = NULL; struct flb_router_path *route_path; + struct flb_chunk_direct_route *direct_routes; + size_t label_length; + size_t plugin_length; + const char *label_source; + const char *plugin_name; if (!ins || !payload || !payload->tag || !payload->route) { return -1; @@ -198,6 +207,112 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, } } + direct_routes = NULL; + direct_count = 0; + direct_index = 0; + write_ret = 0; + label_length = 0; + label_source = NULL; + plugin_length = 0; + plugin_name = NULL; + + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (!route_path->ins) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config) == 0) { + continue; + } + + direct_count++; + } + + if (direct_count > 0) { + direct_routes = flb_calloc((size_t) direct_count, + sizeof(struct flb_chunk_direct_route)); + if (!direct_routes) { + flb_errno(); + direct_count = 0; + } + } + + if (direct_routes && direct_count > 0) { + direct_index = 0; + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (!route_path->ins) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config) == 0) { + continue; + } + + if (direct_index < direct_count) { + label_source = route_path->ins->alias; + label_length = 0; + plugin_name = NULL; + plugin_length = 0; + label_is_alias = FLB_FALSE; + if (!label_source || label_source[0] == '\0') { + label_source = route_path->ins->name; + } + else { + label_is_alias = FLB_TRUE; + } + if (label_source) { + label_length = strlen(label_source); + if (label_length > UINT16_MAX) { + label_length = UINT16_MAX; + } + } + if (route_path->ins->p && route_path->ins->p->name) { + plugin_name = route_path->ins->p->name; + plugin_length = strlen(plugin_name); + if (plugin_length > UINT16_MAX) { + plugin_length = UINT16_MAX; + } + } + else { + plugin_name = NULL; + plugin_length = 0; + } + + direct_routes[direct_index].id = (uint32_t) route_path->ins->id; + direct_routes[direct_index].label = label_source; + direct_routes[direct_index].label_length = (uint16_t) label_length; + direct_routes[direct_index].label_is_alias = (uint8_t) label_is_alias; + direct_routes[direct_index].plugin_name = plugin_name; + direct_routes[direct_index].plugin_name_length = (uint16_t) plugin_length; + direct_index++; + } + } + + if (direct_index == direct_count) { + write_ret = flb_input_chunk_write_header_v2(chunk->chunk, + chunk->event_type, + payload->tag, + flb_sds_len(payload->tag), + direct_routes, + direct_count); + if (write_ret != 0) { + flb_plg_warn(ins, + "failed to persist direct routes for chunk %s", + flb_input_chunk_get_name(chunk)); + } + } + } + + if (direct_routes) { + flb_free(direct_routes); + } + return 0; } diff --git a/src/flb_task.c b/src/flb_task.c index 5dcf6fdd334..f38540e32f5 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include /* * Every task created must have an unique ID, this function lookup the @@ -71,6 +73,141 @@ static inline void map_free_task_id(int id, struct flb_config *config) config->task_map[id].task = NULL; } +static int task_collect_output_references(struct flb_config *config, + const struct flb_chunk_direct_route *route, + struct flb_output_instance ***out_matches, + size_t *out_count) +{ + size_t index; + size_t count; + int alias_length; + int label_length; + int name_length; + const char *label; + uint32_t stored_id; + struct mk_list *head; + struct flb_output_instance *o_ins; + struct flb_output_instance **matches; + + if (!config || !route || !out_matches || !out_count) { + return -1; + } + + *out_matches = NULL; + *out_count = 0; + + label = route->label; + label_length = 0; + stored_id = route->id; + if (label != NULL) { + label_length = route->label_length; + if (label_length == 0) { + label_length = (int) strlen(label); + } + } + + count = 0; + if (label != NULL && label_length > 0) { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + count++; + } + } + } + + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + name_length = (int) strlen(o_ins->name); + if (name_length == label_length && + strncmp(o_ins->name, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0) { + continue; + } + } + count++; + } + } + + if (count == 0) { + return 0; + } + } + else { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if ((uint32_t) o_ins->id == stored_id && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + count++; + } + } + + if (count == 0) { + return 0; + } + } + + matches = flb_calloc(count, sizeof(struct flb_output_instance *)); + if (!matches) { + flb_errno(); + return -1; + } + + index = 0; + if (label != NULL && label_length > 0) { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + matches[index++] = o_ins; + } + } + } + + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + name_length = (int) strlen(o_ins->name); + if (name_length == label_length && + strncmp(o_ins->name, label, (size_t) label_length) == 0 && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + if (o_ins->alias != NULL) { + alias_length = (int) strlen(o_ins->alias); + if (alias_length == label_length && + strncmp(o_ins->alias, label, (size_t) label_length) == 0) { + continue; + } + } + matches[index++] = o_ins; + } + } + } + else { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if ((uint32_t) o_ins->id == stored_id && + flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) { + matches[index++] = o_ins; + } + } + } + + *out_matches = matches; + *out_count = index; + + return 0; +} + void flb_task_retry_destroy(struct flb_task_retry *retry) { int ret; @@ -361,6 +498,19 @@ struct flb_task *flb_task_create(uint64_t ref_id, int count = 0; int total_events = 0; int direct_count = 0; + int stored_routes_result = 0; + int ret = 0; + int stored_routes_used = FLB_FALSE; + int stored_routes_valid = FLB_TRUE; + int stored_routes_alloc_failed = FLB_FALSE; + int direct_output_count = 0; + int direct_output_index = 0; + uint32_t missing_output_id = 0; + uint16_t missing_output_label_length = 0; + const char *missing_output_label; + struct flb_output_instance **stored_matches; + size_t stored_match_count; + size_t stored_match_index; struct flb_task *task; struct flb_event_chunk *evc; struct flb_task_route *route; @@ -371,6 +521,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, struct mk_list *o_head; struct flb_router_chunk_context router_context; int router_context_initialized = FLB_FALSE; + struct flb_chunk_direct_route *direct_routes; /* No error status */ *err = FLB_FALSE; @@ -426,6 +577,145 @@ struct flb_task *flb_task_create(uint64_t ref_id, #endif /* Direct connects betweek input <> outputs (API based) */ + direct_routes = NULL; + missing_output_label = NULL; + missing_output_label_length = 0; + if (flb_input_chunk_has_direct_routes(task_ic) == FLB_TRUE) { + stored_routes_result = flb_input_chunk_get_direct_routes(task_ic, + &direct_routes, + &direct_output_count); + if (stored_routes_result == 0 && direct_output_count > 0) { + stored_routes_valid = FLB_TRUE; + missing_output_id = 0; + for (direct_output_index = 0; + direct_output_index < direct_output_count; + direct_output_index++) { + stored_matches = NULL; + stored_match_count = 0; + ret = task_collect_output_references(config, + &direct_routes[direct_output_index], + &stored_matches, + &stored_match_count); + if (ret == -1) { + flb_error("[task] failed collecting restored routes for chunk %s", + flb_input_chunk_get_name(task_ic)); + } + + if (ret != 0 || stored_match_count == 0) { + stored_routes_valid = FLB_FALSE; + missing_output_id = direct_routes[direct_output_index].id; + missing_output_label = direct_routes[direct_output_index].label; + missing_output_label_length = direct_routes[direct_output_index].label_length; + if (missing_output_label_length == 0 && missing_output_label != NULL) { + missing_output_label_length = (uint16_t) strlen(missing_output_label); + } + if (stored_matches != NULL) { + flb_free(stored_matches); + } + break; + } + + if (stored_matches != NULL) { + flb_free(stored_matches); + } + } + + if (stored_routes_valid == FLB_TRUE) { + direct_count = 0; + stored_routes_alloc_failed = FLB_FALSE; + for (direct_output_index = 0; + direct_output_index < direct_output_count; + direct_output_index++) { + stored_matches = NULL; + stored_match_count = 0; + ret = task_collect_output_references(config, + &direct_routes[direct_output_index], + &stored_matches, + &stored_match_count); + if (ret != 0 || stored_match_count == 0 || stored_matches == NULL) { + if (stored_matches != NULL) { + flb_free(stored_matches); + } + continue; + } + + for (stored_match_index = 0; + stored_match_index < stored_match_count; + stored_match_index++) { + route = flb_calloc(1, sizeof(struct flb_task_route)); + if (!route) { + flb_errno(); + stored_routes_alloc_failed = FLB_TRUE; + break; + } + + route->status = FLB_TASK_ROUTE_INACTIVE; + route->out = stored_matches[stored_match_index]; + mk_list_add(&route->_head, &task->routes); + direct_count++; + } + + flb_free(stored_matches); + + if (stored_routes_alloc_failed == FLB_TRUE) { + break; + } + } + + if (stored_routes_alloc_failed == FLB_TRUE) { + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } + if (direct_routes) { + flb_input_chunk_destroy_direct_routes(direct_routes, + direct_output_count); + } + task->event_chunk->data = NULL; + flb_task_destroy(task, FLB_TRUE); + return NULL; + } + + if (direct_count > 0) { + stored_routes_used = FLB_TRUE; + } + } + else { + flb_warn("[task] input=%s/%s stored direct route id=%u label=%.*s not found for chunk %s, falling back to configured routes", + i_ins->p->name, + flb_input_name(i_ins), + (unsigned int) missing_output_id, + (int) missing_output_label_length, + missing_output_label ? missing_output_label : "", + flb_input_chunk_get_name(task_ic)); + } + } + else if (stored_routes_result == -2) { + flb_warn("[task] input=%s/%s invalid stored direct routing metadata for chunk %s, falling back to configured routes", + i_ins->p->name, + flb_input_name(i_ins), + flb_input_chunk_get_name(task_ic)); + } + } + + if (stored_routes_used == FLB_TRUE) { + if (direct_routes) { + flb_input_chunk_destroy_direct_routes(direct_routes, direct_output_count); + } + flb_debug("[task] restored direct task=%p id=%i with %i route(s)", + task, task->id, direct_count); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } + return task; + } + + if (direct_routes) { + flb_input_chunk_destroy_direct_routes(direct_routes, direct_output_count); + direct_routes = NULL; + } + if (cfl_list_size(&i_ins->routes_direct) > 0) { direct_count = 0; diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 551a734fab8..fa020e6eb06 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -26,6 +26,7 @@ set(UNIT_TESTS_FILES mp.c mp_chunk_cobj.c input_chunk.c + input_chunk_routes.c flb_time.c file.c csv.c diff --git a/tests/internal/input_chunk_routes.c b/tests/internal/input_chunk_routes.c new file mode 100644 index 00000000000..49ad951726a --- /dev/null +++ b/tests/internal/input_chunk_routes.c @@ -0,0 +1,784 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "flb_tests_internal.h" + +#define TEST_STREAM_PATH "/tmp/flb-chunk-direct-test" +#define TEST_STREAM_PATH_MATCH "/tmp/flb-chunk-direct-test-match" +#define TEST_STREAM_PATH_NULL "/tmp/flb-chunk-direct-test-null" + +static int write_test_log_payload(struct cio_chunk *chunk) +{ + msgpack_sbuffer sbuf; + msgpack_packer pck; + int ret; + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); + + /* + * Compose a single Fluent Bit log record: [timestamp, map] + * Using a simple positive integer timestamp keeps validation minimal. + */ + msgpack_pack_array(&pck, 2); + msgpack_pack_uint64(&pck, 0); + msgpack_pack_map(&pck, 1); + msgpack_pack_str(&pck, 3); + msgpack_pack_str_body(&pck, "key", 3); + msgpack_pack_str(&pck, 5); + msgpack_pack_str_body(&pck, "value", 5); + + ret = cio_chunk_write(chunk, sbuf.data, sbuf.size); + msgpack_sbuffer_destroy(&sbuf); + + return ret; +} + +static int init_test_config(struct flb_config *config, + struct flb_input_instance *in, + struct flb_input_plugin *plugin) +{ + int ret; + + memset(config, 0, sizeof(*config)); + mk_list_init(&config->outputs); + mk_list_init(&config->inputs); + + /* Initialize environment (required by flb_input_instance_init) */ + config->env = flb_env_create(); + if (config->env == NULL) { + return -1; + } + + ret = flb_routes_mask_set_size(64, config); + if (ret != 0) { + flb_env_destroy(config->env); + config->env = NULL; + return -1; + } + + memset(in, 0, sizeof(*in)); + in->config = config; + in->p = plugin; + in->log_level = FLB_LOG_OFF; + snprintf(in->name, sizeof(in->name), "dummy.0"); + in->routable = FLB_TRUE; + mk_list_init(&in->_head); + mk_list_init(&in->chunks); + mk_list_init(&in->chunks_up); + mk_list_init(&in->chunks_down); + mk_list_init(&in->tasks); + mk_list_init(&in->collectors); + cfl_list_init(&in->routes_direct); + cfl_list_init(&in->routes); + + /* Add instance to config inputs list (required by flb_input_instance_destroy) */ + mk_list_add(&in->_head, &config->inputs); + + /* Initialize properties list (required by flb_input_instance_init) */ + mk_list_init(&in->properties); + mk_list_init(&in->net_properties); + + /* Initialize hash tables for chunks (required by flb_input_chunk_destroy) */ + in->ht_log_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, 0); + if (!in->ht_log_chunks) { + return -1; + } + + in->ht_metric_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, 0); + if (!in->ht_metric_chunks) { + flb_hash_table_destroy(in->ht_log_chunks); + in->ht_log_chunks = NULL; + return -1; + } + + in->ht_trace_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, 0); + if (!in->ht_trace_chunks) { + flb_hash_table_destroy(in->ht_log_chunks); + flb_hash_table_destroy(in->ht_metric_chunks); + in->ht_log_chunks = NULL; + in->ht_metric_chunks = NULL; + return -1; + } + + in->ht_profile_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, 0); + if (!in->ht_profile_chunks) { + flb_hash_table_destroy(in->ht_log_chunks); + flb_hash_table_destroy(in->ht_metric_chunks); + flb_hash_table_destroy(in->ht_trace_chunks); + in->ht_log_chunks = NULL; + in->ht_metric_chunks = NULL; + in->ht_trace_chunks = NULL; + return -1; + } + + return 0; +} + +static int add_test_output(struct flb_config *config, + struct flb_output_instance *out, + struct flb_output_plugin *plugin, + int id, + const char *alias) +{ + memset(out, 0, sizeof(*out)); + out->config = config; + out->p = plugin; + out->log_level = FLB_LOG_OFF; + out->id = id; + snprintf(out->name, sizeof(out->name), "%s.%d", + plugin->name ? plugin->name : "out", id); + + if (alias) { + out->alias = flb_strdup(alias); + if (!out->alias) { + return -1; + } + } + + mk_list_init(&out->_head); + mk_list_init(&out->properties); + mk_list_init(&out->net_properties); + mk_list_init(&out->upstreams); + mk_list_init(&out->flush_list); + mk_list_init(&out->flush_list_destroy); + + mk_list_add(&out->_head, &config->outputs); + + return 0; +} + +static void cleanup_test_output(struct flb_output_instance *out) +{ + if (out->alias) { + flb_free(out->alias); + out->alias = NULL; + } +} + +static void cleanup_test_routing_scenario(struct flb_input_chunk *ic, + struct flb_output_instance *stdout_one, + struct flb_output_instance *stdout_two, + struct flb_output_instance *http_out, + struct flb_input_instance *in, + struct flb_config *config, + struct cio_chunk *chunk, + struct cio_ctx *ctx, + int config_ready, + const char *stream_path) +{ + if (ic) { + flb_input_chunk_destroy(ic, FLB_TRUE); + } + + cleanup_test_output(stdout_one); + cleanup_test_output(stdout_two); + cleanup_test_output(http_out); + + if (config_ready == FLB_TRUE) { + flb_input_instance_exit(in, config); + + /* Manual cleanup for stack-allocated instance */ + /* Remove from list first (before destroying hash tables) */ + mk_list_del(&in->_head); + + /* Destroy hash tables */ + if (in->ht_log_chunks) { + flb_hash_table_destroy(in->ht_log_chunks); + in->ht_log_chunks = NULL; + } + if (in->ht_metric_chunks) { + flb_hash_table_destroy(in->ht_metric_chunks); + in->ht_metric_chunks = NULL; + } + if (in->ht_trace_chunks) { + flb_hash_table_destroy(in->ht_trace_chunks); + in->ht_trace_chunks = NULL; + } + if (in->ht_profile_chunks) { + flb_hash_table_destroy(in->ht_profile_chunks); + in->ht_profile_chunks = NULL; + } + + /* Release properties */ + flb_kv_release(&in->properties); + flb_kv_release(&in->net_properties); + + /* Destroy metrics (created by flb_input_instance_init) */ +#ifdef FLB_HAVE_METRICS + if (in->cmt) { + cmt_destroy(in->cmt); + in->cmt = NULL; + } + if (in->metrics) { + flb_metrics_destroy(in->metrics); + in->metrics = NULL; + } +#endif + + /* Destroy config map if created */ + if (in->tls_config_map) { + flb_config_map_destroy(in->tls_config_map); + in->tls_config_map = NULL; + } + if (in->net_config_map) { + flb_config_map_destroy(in->net_config_map); + in->net_config_map = NULL; + } + + flb_routes_empty_mask_destroy(config); + if (config->env) { + flb_env_destroy(config->env); + config->env = NULL; + } + } + + if (chunk) { + cio_chunk_close(chunk, CIO_TRUE); + } + + if (ctx) { + cio_destroy(ctx); + } + + cio_utils_recursive_delete(stream_path); +} + +static int write_legacy_chunk_metadata(struct cio_chunk *chunk, + int event_type, + const char *tag, + int tag_len) +{ + int ret; + int meta_size; + char *meta; + + meta_size = FLB_INPUT_CHUNK_META_HEADER + tag_len; + meta = flb_malloc(meta_size); + if (!meta) { + flb_errno(); + return -1; + } + + meta[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0; + meta[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1; + + if (event_type == FLB_INPUT_LOGS) { + meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS; + } + else if (event_type == FLB_INPUT_METRICS) { + meta[2] = FLB_INPUT_CHUNK_TYPE_METRICS; + } + else if (event_type == FLB_INPUT_TRACES) { + meta[2] = FLB_INPUT_CHUNK_TYPE_TRACES; + } + else if (event_type == FLB_INPUT_PROFILES) { + meta[2] = FLB_INPUT_CHUNK_TYPE_PROFILES; + } + else { + meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS; + } + + meta[3] = 0; + + memcpy(meta + FLB_INPUT_CHUNK_META_HEADER, tag, tag_len); + + ret = cio_meta_write(chunk, meta, meta_size); + + flb_free(meta); + + return ret; +} + +static void test_chunk_metadata_direct_routes() +{ + struct cio_options opts; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct flb_input_chunk ic; + struct flb_chunk_direct_route output_routes[2]; + struct flb_chunk_direct_route *loaded_routes; + char *content_buf; + const char *tag_buf; + const char *tag_string; + const char payload[] = "direct route payload validation string"; + int tag_len; + int route_count; + int ret; + int err; + int expected_tag_len; + size_t content_size; + size_t payload_size; + + payload_size = sizeof(payload) - 1; + tag_string = "test.tag"; + expected_tag_len = strlen(tag_string); + + cio_utils_recursive_delete(TEST_STREAM_PATH); + memset(&opts, 0, sizeof(opts)); + cio_options_init(&opts); + opts.root_path = TEST_STREAM_PATH; + opts.flags = CIO_OPEN; + ctx = cio_create(&opts); + TEST_CHECK(ctx != NULL); + if (!ctx) { + return; + } + + stream = cio_stream_create(ctx, "direct", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + if (!stream) { + cio_destroy(ctx); + return; + } + + chunk = cio_chunk_open(ctx, stream, "meta", CIO_OPEN, 1024, &err); + TEST_CHECK(chunk != NULL); + if (!chunk) { + cio_destroy(ctx); + return; + } + + ret = cio_chunk_is_up(chunk); + if (ret == CIO_FALSE) { + ret = cio_chunk_up_force(chunk); + TEST_CHECK(ret == CIO_OK); + } + + tag_len = expected_tag_len; + ret = write_legacy_chunk_metadata(chunk, FLB_INPUT_LOGS, + tag_string, tag_len); + TEST_CHECK(ret == 0); + + ret = cio_chunk_write(chunk, payload, payload_size); + TEST_CHECK(ret == 0); + + ret = cio_chunk_get_content(chunk, &content_buf, &content_size); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(content_buf != NULL); + TEST_CHECK(content_size == payload_size); + if (content_size == payload_size) { + TEST_CHECK(memcmp(content_buf, payload, payload_size) == 0); + } + } + + memset(output_routes, 0, sizeof(output_routes)); + + output_routes[0].id = 511; + output_routes[0].label = "alpha"; + output_routes[0].label_length = 5; + output_routes[0].label_is_alias = FLB_TRUE; + output_routes[0].plugin_name = "stdout"; + output_routes[0].plugin_name_length = 6; + output_routes[1].id = 70000; + output_routes[1].label = "beta"; + output_routes[1].label_length = 4; + output_routes[1].label_is_alias = FLB_FALSE; + output_routes[1].plugin_name = "http"; + output_routes[1].plugin_name_length = 4; + ret = flb_input_chunk_write_header_v2(chunk, + FLB_INPUT_LOGS, + (char *) tag_string, + tag_len, + output_routes, + 2); + TEST_CHECK(ret == 0); + + memset(&ic, 0, sizeof(ic)); + ic.chunk = chunk; + + TEST_CHECK(flb_input_chunk_has_direct_routes(&ic) == FLB_TRUE); + + ret = cio_chunk_get_content(chunk, &content_buf, &content_size); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(content_buf != NULL); + TEST_CHECK(content_size == payload_size); + if (content_size == payload_size) { + TEST_CHECK(memcmp(content_buf, payload, payload_size) == 0); + } + } + + ret = flb_input_chunk_get_direct_routes(&ic, &loaded_routes, &route_count); + TEST_CHECK(ret == 0); + TEST_CHECK(route_count == 2); + if (ret == 0 && route_count == 2) { + TEST_CHECK(loaded_routes != NULL); + if (loaded_routes) { + TEST_CHECK(loaded_routes[0].id == 511); + TEST_CHECK(loaded_routes[1].id == 70000); + TEST_CHECK(loaded_routes[0].label != NULL); + TEST_CHECK(loaded_routes[1].label != NULL); + if (loaded_routes[0].label && loaded_routes[1].label) { + TEST_CHECK(strcmp(loaded_routes[0].label, "alpha") == 0); + TEST_CHECK(strcmp(loaded_routes[1].label, "beta") == 0); + } + TEST_CHECK(loaded_routes[0].label_is_alias != 0); + TEST_CHECK(loaded_routes[1].label_is_alias == 0); + TEST_CHECK(loaded_routes[0].plugin_name != NULL); + TEST_CHECK(loaded_routes[1].plugin_name != NULL); + if (loaded_routes[0].plugin_name && loaded_routes[1].plugin_name) { + TEST_CHECK(strcmp(loaded_routes[0].plugin_name, "stdout") == 0); + TEST_CHECK(strcmp(loaded_routes[1].plugin_name, "http") == 0); + } + flb_input_chunk_destroy_direct_routes(loaded_routes, route_count); + } + } + + ret = flb_input_chunk_get_tag(&ic, &tag_buf, &tag_len); + TEST_CHECK(ret == 0); + TEST_CHECK(tag_len == expected_tag_len); + if (ret == 0 && tag_len == expected_tag_len) { + TEST_CHECK(memcmp(tag_buf, tag_string, expected_tag_len) == 0); + } + + cio_chunk_close(chunk, CIO_TRUE); + cio_destroy(ctx); + cio_utils_recursive_delete(TEST_STREAM_PATH); +} + +static void test_chunk_restore_alias_plugin_match_multiple() +{ + struct cio_options opts; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct flb_input_chunk *ic; + struct flb_config config; + struct flb_input_instance in; + struct flb_input_plugin input_plugin; + struct flb_output_instance stdout_one; + struct flb_output_instance stdout_two; + struct flb_output_instance http_out; + struct flb_output_plugin stdout_plugin; + struct flb_output_plugin http_plugin; + struct flb_chunk_direct_route route; + const char *tag_string; + int tag_len; + int ret; + int err; + int config_ready; + + ctx = NULL; + stream = NULL; + chunk = NULL; + ic = NULL; + config_ready = FLB_FALSE; + tag_string = "test.tag"; + tag_len = (int) strlen(tag_string); + + cio_utils_recursive_delete(TEST_STREAM_PATH_MATCH); + memset(&opts, 0, sizeof(opts)); + cio_options_init(&opts); + opts.root_path = TEST_STREAM_PATH_MATCH; + opts.flags = CIO_OPEN; + + ctx = cio_create(&opts); + TEST_CHECK(ctx != NULL); + if (!ctx) { + return; + } + + stream = cio_stream_create(ctx, "direct", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + if (!stream) { + goto cleanup; + } + + chunk = cio_chunk_open(ctx, stream, "meta", CIO_OPEN, 1024, &err); + TEST_CHECK(chunk != NULL); + if (!chunk) { + goto cleanup; + } + + ret = cio_chunk_is_up(chunk); + if (ret == CIO_FALSE) { + ret = cio_chunk_up_force(chunk); + TEST_CHECK(ret == CIO_OK); + if (ret != CIO_OK) { + goto cleanup; + } + } + + ret = write_legacy_chunk_metadata(chunk, FLB_INPUT_LOGS, + tag_string, tag_len); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = write_test_log_payload(chunk); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + memset(&route, 0, sizeof(route)); + route.id = 25; + route.label = "shared"; + route.label_length = 6; + route.label_is_alias = FLB_TRUE; + route.plugin_name = "stdout"; + route.plugin_name_length = 6; + + ret = flb_input_chunk_write_header_v2(chunk, + FLB_INPUT_LOGS, + (char *) tag_string, + tag_len, + &route, + 1); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + memset(&input_plugin, 0, sizeof(input_plugin)); + input_plugin.name = (char *) "dummy"; + memset(&stdout_plugin, 0, sizeof(stdout_plugin)); + stdout_plugin.name = (char *) "stdout"; + memset(&http_plugin, 0, sizeof(http_plugin)); + http_plugin.name = (char *) "http"; + + ret = init_test_config(&config, &in, &input_plugin); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + config_ready = FLB_TRUE; + +#ifdef FLB_HAVE_METRICS + cmt_initialize(); +#endif + + ret = flb_input_instance_init(&in, &config); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = add_test_output(&config, &stdout_one, &stdout_plugin, 1, "shared"); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = add_test_output(&config, &stdout_two, &stdout_plugin, 2, "shared"); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = add_test_output(&config, &http_out, &http_plugin, 3, "shared"); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ic = flb_input_chunk_map(&in, FLB_INPUT_LOGS, chunk); + TEST_CHECK(ic != NULL); + if (!ic) { + goto cleanup; + } + + chunk = NULL; + + TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, + stdout_one.id, + &config) == 1); + TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, + stdout_two.id, + &config) == 1); + TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, + http_out.id, + &config) == 0); + +cleanup: + cleanup_test_routing_scenario(ic, &stdout_one, &stdout_two, &http_out, + &in, &config, chunk, ctx, config_ready, + TEST_STREAM_PATH_MATCH); +} + +static void test_chunk_restore_alias_plugin_null_matches_all() +{ + struct cio_options opts; + struct cio_ctx *ctx; + struct cio_stream *stream; + struct cio_chunk *chunk; + struct flb_input_chunk *ic; + struct flb_config config; + struct flb_input_instance in; + struct flb_input_plugin input_plugin; + struct flb_output_instance stdout_one; + struct flb_output_instance stdout_two; + struct flb_output_instance http_out; + struct flb_output_plugin stdout_plugin; + struct flb_output_plugin http_plugin; + struct flb_chunk_direct_route route; + const char *tag_string; + int tag_len; + int ret; + int err; + int config_ready; + + ctx = NULL; + stream = NULL; + chunk = NULL; + ic = NULL; + config_ready = FLB_FALSE; + tag_string = "test.tag"; + tag_len = (int) strlen(tag_string); + + cio_utils_recursive_delete(TEST_STREAM_PATH_NULL); + memset(&opts, 0, sizeof(opts)); + cio_options_init(&opts); + opts.root_path = TEST_STREAM_PATH_NULL; + opts.flags = CIO_OPEN; + + ctx = cio_create(&opts); + TEST_CHECK(ctx != NULL); + if (!ctx) { + return; + } + + stream = cio_stream_create(ctx, "direct", CIO_STORE_FS); + TEST_CHECK(stream != NULL); + if (!stream) { + goto cleanup; + } + + chunk = cio_chunk_open(ctx, stream, "meta", CIO_OPEN, 1024, &err); + TEST_CHECK(chunk != NULL); + if (!chunk) { + goto cleanup; + } + + ret = cio_chunk_is_up(chunk); + if (ret == CIO_FALSE) { + ret = cio_chunk_up_force(chunk); + TEST_CHECK(ret == CIO_OK); + if (ret != CIO_OK) { + goto cleanup; + } + } + + ret = write_legacy_chunk_metadata(chunk, FLB_INPUT_LOGS, + tag_string, tag_len); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = write_test_log_payload(chunk); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + memset(&route, 0, sizeof(route)); + route.id = 30; + route.label = "shared"; + route.label_length = 6; + route.label_is_alias = FLB_TRUE; + route.plugin_name = NULL; + route.plugin_name_length = 0; + + ret = flb_input_chunk_write_header_v2(chunk, + FLB_INPUT_LOGS, + (char *) tag_string, + tag_len, + &route, + 1); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + memset(&input_plugin, 0, sizeof(input_plugin)); + input_plugin.name = (char *) "dummy"; + memset(&stdout_plugin, 0, sizeof(stdout_plugin)); + stdout_plugin.name = (char *) "stdout"; + memset(&http_plugin, 0, sizeof(http_plugin)); + http_plugin.name = (char *) "http"; + + ret = init_test_config(&config, &in, &input_plugin); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + config_ready = FLB_TRUE; + +#ifdef FLB_HAVE_METRICS + cmt_initialize(); +#endif + + ret = flb_input_instance_init(&in, &config); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = add_test_output(&config, &stdout_one, &stdout_plugin, 4, "shared"); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = add_test_output(&config, &stdout_two, &stdout_plugin, 5, "shared"); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ret = add_test_output(&config, &http_out, &http_plugin, 6, "shared"); + TEST_CHECK(ret == 0); + if (ret != 0) { + goto cleanup; + } + + ic = flb_input_chunk_map(&in, FLB_INPUT_LOGS, chunk); + TEST_CHECK(ic != NULL); + if (!ic) { + goto cleanup; + } + + chunk = NULL; + + TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, + stdout_one.id, + &config) == 1); + TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, + stdout_two.id, + &config) == 1); + TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, + http_out.id, + &config) == 1); + +cleanup: + cleanup_test_routing_scenario(ic, &stdout_one, &stdout_two, &http_out, + &in, &config, chunk, ctx, config_ready, + TEST_STREAM_PATH_NULL); +} + +TEST_LIST = { + { "chunk_metadata_direct_routes", test_chunk_metadata_direct_routes }, + { "chunk_restore_alias_plugin_match_multiple", test_chunk_restore_alias_plugin_match_multiple }, + { "chunk_restore_alias_plugin_null_matches_all", test_chunk_restore_alias_plugin_null_matches_all }, + { 0 } +};