Skip to content

Conversation

@edsiper
Copy link
Member

@edsiper edsiper commented Oct 31, 2025


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Per-record routing now supports group metadata/attributes and OpenTelemetry resource/scope contexts; condition evaluation gains an extensible variant-provider path.
    • Configuration loading improved to resolve paths and accept additional formats.
    • New log-event group markers and macros for group boundaries.
  • Bug Fixes / Improvements

    • Safer routing and payload generation with stricter filtering, better cleanup, preserved group ordering, and recursion-safety in the decoder.
  • Tests

    • Added unit and runtime tests plus OTLP test data covering context-aware and OpenTelemetry routing.

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds per-record, context-aware condition evaluation with a pluggable variant getter; extends record context enums for group and OpenTelemetry contexts; wires context-aware rules through router/config parsing; refactors input payload building to two-pass per-route matching with bitmaps and route masking; improves decoder group handling; and adds unit/runtime tests for metadata/group/OTEL routing.

Changes

Cohort / File(s) Summary
Headers / Public API
include/fluent-bit/flb_conditionals.h, include/fluent-bit/flb_router.h, include/fluent-bit/flb_log_event.h, include/fluent-bit/flb_log_event_decoder.h
Forward declarations and typedef (flb_condition_get_variant_fn) added; enum record_context_type expanded with GROUP/OTEL contexts; new prototype flb_condition_evaluate_ex() and flb_router_condition_evaluate_record(); flb_route_condition_rule gains context; flb_route_output gains ins; log event macros and decoder recursion_depth field added.
Condition evaluation core
src/flb_conditionals.c, src/flb_router_condition.c, src/flb_router_config.c
Added pluggable variant-provider API (flb_condition_evaluate_ex) with default_get_record_variant; route-specific variant extractors and route_logs_get_variant; updated compilation/evaluation to use rule->context; parsing honors optional context field.
Log input routing / payload building
src/flb_input_log.c, src/flb_task.c
build_payload_for_route / build_payload_for_default_route signatures updated to accept ins; implemented two-pass per-route matching with matched bitmaps, group-aware encode ordering; stricter route/ins guards; task routing honors routes_mask gating.
Log event decoder
src/flb_log_event_decoder.c
Added recursion depth guard; transfer msgpack zone ownership for group start (no memcpy); extract group metadata/attributes with fallbacks; handle invalid markers by skipping while preserving state.
Library / config loader
src/flb_lib.c
Switched config loading to flb_cf APIs with realpath resolution and cf_main lifecycle management.
Tests & runtime fixtures
tests/internal/*, tests/runtime/*, tests/runtime/data/*, tests/internal/data/config_format/yaml/*
New builders for metadata/group/OTEL chunks and OTLP JSON→msgpack helpers; many new unit tests for contexts and conditional routing; runtime OTLP routing test, test data JSON, YAML test config and CMake entries added; test registration updates.

Sequence Diagram(s)

sequenceDiagram
    participant Router as Router
    participant CondEval as ConditionEvaluator
    participant Variant as VariantProvider
    participant Input as LogInput
    participant Payload as PayloadBuilder

    Note over Router,CondEval: Pluggable variant-based evaluation
    Router->>CondEval: flb_condition_evaluate_ex(cond, ctx, get_variant_fn)
    loop per rule
        CondEval->>Variant: get_variant_fn(rule, ctx)
        Variant-->>CondEval: variant (BODY / METADATA / GROUP_* / OTEL_*)
        CondEval->>CondEval: evaluate rule (short-circuit AND/OR)
    end
    CondEval-->>Router: result

    Note over Input,Payload: Two-pass per-route payload build
    Input->>Payload: build_payload_for_route(ins, payload, records...)
    Payload->>CondEval: pass 1 — evaluate records, set matched bitmap
    Payload->>Payload: pass 2 — encode GROUP_START, matched records, GROUP_END
    Payload-->>Router: encoded payload (size, total_records)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Areas to focus on:

  • src/flb_input_log.c — two-pass matching, bitmap lifecycle, GROUP_START/END ordering and error paths.
  • src/flb_router_condition.c & src/flb_conditionals.c — variant-provider mapping, null/short-circuit semantics, AND/OR behavior when variants are missing.
  • src/flb_log_event_decoder.c — msgpack zone ownership transfer and pointer lifetimes.
  • src/flb_lib.c — config realpath handling and flb_cf lifecycle/cleanup.
  • New tests and runtime assets — test assumptions, registrations and data paths.

Possibly related PRs

Suggested reviewers

  • koleini
  • fujimotos

Poem

🐰 I hop through chunks with variant in paw,

metadata, groups, OTEL — I scope them all.
Rules get checked per record, bitmap taps the beat,
two-pass I dance and encode each matched treat.
A carrot cheer — routing now stands tall! 🎉

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "router: add contexts to conditional support for direct route paths" directly and accurately describes the primary change in this pull request. The core objective involves extending the conditional evaluation system to support multiple record contexts (metadata, group attributes, OTEL resource/scope attributes, etc.) beyond just the record body. This context-aware conditional support is implemented throughout the codebase across header files, implementation files, and test files. The title clearly identifies the subsystem (router), the type of change (adding contexts), what component is affected (conditional support), and the use case (direct route paths). The phrasing is specific and meaningful enough that a developer reviewing the git history would immediately understand this enhances conditional routing with multi-context evaluation capabilities.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch router-conditionals-contexts

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@edsiper edsiper marked this pull request as draft October 31, 2025 16:41
@edsiper edsiper changed the title router: add conditional support for direct route paths router: add contexts to conditional support for direct route paths Oct 31, 2025
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 345 to 457
return -1;
}

/* Track which records match THIS specific route */
matched_by_route = flb_calloc(record_count, sizeof(uint8_t));
if (!matched_by_route) {
flb_errno();
flb_log_event_encoder_destroy(encoder);
return -1;
}

matched = 0;

/* First pass: evaluate conditions and mark matching records */
for (i = 0; i < record_count; i++) {
condition_result = flb_condition_evaluate(compiled, records[i]);
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0) {
if (record_type == FLB_LOG_EVENT_GROUP_START) {
continue;
}
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
group_end = records[i];
continue;
}
}

condition_result = flb_router_condition_evaluate_record(payload->route, records[i]);
if (condition_result != FLB_TRUE) {
continue;
}

ret = encode_chunk_record(encoder, records[i]);
matched_by_route[i] = 1;
matched_non_default[i] = 1;
matched++;
}

/* If no matches, return early */
if (matched == 0) {
flb_free(matched_by_route);
flb_log_event_encoder_destroy(encoder);
return 0;
}

/* Second pass: find GROUP_START record */
for (i = 0; i < record_count; i++) {
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0 &&
record_type == FLB_LOG_EVENT_GROUP_START) {
group_start_record = records[i];
break;
}
}

if (group_start_record != NULL) {
ret = encode_chunk_record(encoder, group_start_record);
if (ret != 0) {
flb_free(matched_by_route);
flb_log_event_encoder_destroy(encoder);
return -1;
}
}

matched_non_default[i] = 1;
matched++;
/* Encode matching records */
for (i = 0; i < record_count; i++) {
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0 &&
record_type == FLB_LOG_EVENT_NORMAL) {
if (matched_by_route[i]) {
ret = encode_chunk_record(encoder, records[i]);
if (ret != 0) {
flb_free(matched_by_route);
flb_log_event_encoder_destroy(encoder);
return -1;
}
}
}
}

if (matched == 0) {
if (group_end != NULL && group_start_record != NULL) {
ret = encode_chunk_record(encoder, group_end);
if (ret != 0) {
flb_free(matched_by_route);
flb_log_event_encoder_destroy(encoder);
return -1;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve group metadata per matched record set

When building conditional payloads, the new logic records only a single group_start_record and group_end for the entire chunk and always emits that pair around every matched record. If the original chunk contains multiple FLB_LOG_EVENT_GROUP_START/END sections, records matched from later groups will be wrapped with the first group’s metadata and the last group’s end marker. This produces incorrect OTEL resource/scope metadata on the encoded payload and can even form an invalid group pair. The payload builder needs to track group boundaries per record (or skip grouping entirely) rather than reusing the first/last group for all matches.

Useful? React with 👍 / 👎.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
tests/internal/router_config.c (1)

347-368: Initialize the encoder so the success path can clean up.

Here we hand-roll the msgpack sbuffer, but the callers finish by calling flb_log_event_encoder_destroy(). Because initialized never flips to FLB_TRUE, the destroy helper becomes a no-op and the buffer leaks on every invocation. Please use the normal encoder init path (and destroy it on failure) so the tests leave no allocations behind.

Apply this diff:

@@
-    memset(encoder, 0, sizeof(*encoder));
-    msgpack_sbuffer_init(&encoder->buffer);
+    memset(encoder, 0, sizeof(*encoder));
+    ret = flb_log_event_encoder_init(encoder, FLB_LOG_EVENT_FORMAT_DEFAULT);
+    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+        flb_free(otlp_json);
+        return -1;
+    }
@@
-    ret = flb_opentelemetry_logs_json_to_msgpack(encoder, otlp_json, json_len, NULL, &error_status);
+    ret = flb_opentelemetry_logs_json_to_msgpack(encoder, otlp_json, json_len, NULL, &error_status);
@@
-        msgpack_sbuffer_destroy(&encoder->buffer);
+        flb_log_event_encoder_destroy(encoder);
         return -1;
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f879a93 and 6b64eaa.

📒 Files selected for processing (10)
  • include/fluent-bit/flb_conditionals.h (2 hunks)
  • include/fluent-bit/flb_router.h (4 hunks)
  • plugins/processor_content_modifier/cm_logs.c (1 hunks)
  • src/flb_conditionals.c (6 hunks)
  • src/flb_input_log.c (11 hunks)
  • src/flb_log_event_decoder.c (1 hunks)
  • src/flb_router_condition.c (6 hunks)
  • src/flb_router_config.c (4 hunks)
  • src/flb_task.c (1 hunks)
  • tests/internal/router_config.c (6 hunks)
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • tests/internal/router_config.c
🧬 Code graph analysis (7)
src/flb_task.c (1)
src/flb_routes_mask.c (1)
  • flb_routes_mask_get_bit (125-140)
include/fluent-bit/flb_router.h (1)
src/flb_router_condition.c (1)
  • flb_router_condition_evaluate_record (413-436)
src/flb_router_condition.c (2)
lib/cfl/src/cfl_kvlist.c (1)
  • cfl_kvlist_fetch (418-421)
src/flb_conditionals.c (1)
  • flb_condition_evaluate_ex (363-438)
src/flb_router_config.c (3)
lib/cfl/src/cfl_kvlist.c (1)
  • cfl_kvlist_fetch (418-421)
src/flb_sds.c (1)
  • flb_sds_destroy (389-399)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_input_log.c (4)
include/fluent-bit/flb_mem.h (2)
  • flb_calloc (84-96)
  • flb_free (126-128)
src/flb_log_event_encoder.c (1)
  • flb_log_event_encoder_destroy (99-116)
src/flb_log_event_decoder.c (1)
  • flb_log_event_decoder_get_record_type (423-443)
src/flb_router_condition.c (1)
  • flb_router_condition_evaluate_record (413-436)
tests/internal/router_config.c (7)
src/flb_log_event_encoder.c (8)
  • flb_log_event_encoder_init (42-74)
  • flb_log_event_encoder_begin_record (246-254)
  • flb_log_event_encoder_destroy (99-116)
  • flb_log_event_encoder_set_current_timestamp (289-292)
  • flb_log_event_encoder_commit_record (256-274)
  • flb_log_event_encoder_group_init (390-407)
  • flb_log_event_encoder_group_header_end (409-419)
  • flb_log_event_encoder_group_end (421-443)
src/opentelemetry/flb_opentelemetry_logs.c (1)
  • flb_opentelemetry_logs_json_to_msgpack (812-889)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
src/flb_mp.c (1)
  • flb_mp_count (43-46)
src/flb_router_config.c (2)
  • flb_router_config_parse (1185-1223)
  • flb_router_routes_destroy (394-409)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/flb_router_condition.c (4)
  • flb_router_chunk_context_init (152-163)
  • flb_condition_eval_logs (275-314)
  • flb_router_chunk_context_reset (165-185)
  • flb_router_chunk_context_destroy (187-190)
include/fluent-bit/flb_conditionals.h (1)
src/flb_conditionals.c (1)
  • flb_condition_evaluate_ex (363-438)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (30)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-centos-7
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
🔇 Additional comments (6)
include/fluent-bit/flb_router.h (1)

86-93: Route rules now carry explicit context. This keeps per-record evaluation self-contained with the selected record context, eliminating the implicit defaulting we previously had. Good call.

src/flb_task.c (1)

443-450: Direct-route mask check keeps parity with regular routing. The extra guard means conditional per-record decisions apply equally to direct connections, avoiding stale routes sneaking through. Nice consistency.

plugins/processor_content_modifier/cm_logs.c (1)

319-323: Skipping group-start events for body/metadata contexts saves us. This short-circuit avoids touching empty record objects and matches the new context-aware flow.

src/flb_router_config.c (1)

518-574: Context parser covers the expanded surface cleanly. The helper gives us case-insensitive aliases and a sane default, while the failure path tears everything back down. This makes the config UX much clearer.

include/fluent-bit/flb_conditionals.h (1)

31-104: Variant-provider hook and context expansion look solid. This keeps the classic API intact while letting router/other callers supply richer record views via the new enum values.

src/flb_log_event_decoder.c (1)

371-402: Ownership transfer resolves double-free risk

Great call switching to msgpack_unpacked_release_zone and re-pointing metadata/attribute pointers off the group record’s storage — this keeps the zone lifetime coherent while avoiding the prior double-free hazard.

Comment on lines +345 to 349
/* Check if route has a condition (flb_router_condition_evaluate_record handles NULL conditions) */
if (!payload->route->condition) {
return 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't bail out when the route lacks a condition.

Per-record direct routes frequently set per_record_routing without attaching a condition. This early exit means we never emit a payload for them, yet we still mark the route as handled and strip it from the base chunk—dropping data entirely. Let flb_router_condition_evaluate_record() handle the NULL case (it already returns FLB_TRUE) so direct routes keep working.

Apply this diff:

-    /* Check if route has a condition (flb_router_condition_evaluate_record handles NULL conditions) */
-    if (!payload->route->condition) {
-        return 0;
-    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Check if route has a condition (flb_router_condition_evaluate_record handles NULL conditions) */
if (!payload->route->condition) {
return 0;
}
🤖 Prompt for AI Agents
In src/flb_input_log.c around lines 345 to 348, remove the early bail-out that
returns 0 when payload->route->condition is NULL so that we do not skip emitting
per-record direct routes; instead call flb_router_condition_evaluate_record()
unconditionally (it already handles NULL and returns FLB_TRUE) and let its
result drive route handling. Ensure the check and the return statement are
deleted so direct routes with NULL conditions continue to be evaluated and
emitted.

Comment on lines +444 to 479
payload->size = encoder->buffer.size;
payload->data = flb_malloc(payload->size);
if (!payload->data) {
flb_log_event_encoder_destroy(encoder);
flb_errno();
return -1;
}

memcpy(payload->data, encoder->output_buffer, encoder->output_length);
payload->size = encoder->output_length;
/* Copy the buffer data - msgpack_sbuffer uses flat memory, no zones */
memcpy(payload->data, encoder->buffer.data, payload->size);
payload->total_records = matched;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Include group markers in total_records.

We encode GROUP_START/GROUP_END records into the payload, but payload->total_records only counts normal events. That under-reports what we hand to flb_input_chunk_append_raw(), skewing chunk accounting (acks, limits, metrics). Please add the markers to the count in both builders.

Apply this diff:

@@
-    payload->total_records = matched;
+    size_t total_records = matched;
+
+    if (group_start_record != NULL) {
+        total_records++;
+
+        if (group_end != NULL) {
+            total_records++;
+        }
+    }
+
+    payload->total_records = total_records;
@@
-    payload->total_records = matched;
+    size_t total_records = matched;
+
+    if (group_start_record != NULL) {
+        total_records++;
+
+        if (group_end != NULL) {
+            total_records++;
+        }
+    }
+
+    payload->total_records = total_records;

Also applies to: 594-605

🤖 Prompt for AI Agents
In src/flb_input_log.c around lines 444-455 (and the other builder at 594-605),
the payload currently sets payload->total_records = matched but omits the
GROUP_START/GROUP_END marker records encoded into the payload; update both
builders to add the number of marker records appended to the payload record
count (e.g., payload->total_records = matched + marker_count), where
marker_count represents how many GROUP_START/GROUP_END entries were written into
the encoder, so chunk accounting (acks/limits/metrics) includes those markers as
well.

@edsiper
Copy link
Member Author

edsiper commented Oct 31, 2025

@codex review

@chatgpt-codex-connector
Copy link

Codex Review: Didn't find any major issues. 🚀

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@edsiper
Copy link
Member Author

edsiper commented Oct 31, 2025

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (2)
src/flb_input_log.c (2)

345-348: Do not skip per‑record routes with NULL conditions.

Per-record direct routes often omit a condition; bailing out here returns success without producing a payload, so those routes never receive data. We already noted this regression earlier—flb_router_condition_evaluate_record() handles NULL by returning FLB_TRUE. Drop this early return so the route still emits all records when no condition is defined.

-    /* Check if route has a condition (flb_router_condition_evaluate_record handles NULL conditions) */
-    if (!payload->route->condition) {
-        return 0;
-    }

467-478: Count the group markers in total_records.

We still report only the number of normal records (matched) even though we encode the surrounding GROUP_START/GROUP_END entries. That under-reports what we append to flb_input_chunk_append_raw(), breaking ack/limit accounting. Please track how many marker records you encode in each builder and include them in payload->total_records. (Same fix needed in both payload builders.)

-    payload->total_records = matched;
+    payload->total_records = matched + marker_count;

(Ensure marker_count is accumulated whenever you encode a group start/end record in both builders.)

Also applies to: 648-651

🧹 Nitpick comments (1)
tests/internal/router_config.c (1)

275-344: Consider optimizing the double JSON formatting.

The JSON string is formatted twice: once to calculate the length and again to populate the buffer. While functional, this could be optimized by using a single pass with a pre-sized buffer or dynamic allocation.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6b64eaa and db85748.

📒 Files selected for processing (6)
  • src/flb_input_log.c (11 hunks)
  • src/flb_lib.c (5 hunks)
  • tests/internal/router_config.c (6 hunks)
  • tests/runtime/CMakeLists.txt (1 hunks)
  • tests/runtime/data/opentelemetry/routing_logs.json (1 hunks)
  • tests/runtime/in_opentelemetry_routing.c (1 hunks)
🧰 Additional context used
🧠 Learnings (9)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/flb_lib.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-09-14T09:46:09.531Z
Learnt from: aminvakil
Repo: fluent/fluent-bit PR: 10844
File: conf/fluent-bit:13-15
Timestamp: 2025-09-14T09:46:09.531Z
Learning: For fluent-bit Debian packaging, /opt/fluent-bit/bin/ is the appropriate installation path since the package may be installed from non-official Debian sources, making /opt compliant with FHS for optional software packages.

Applied to files:

  • src/flb_lib.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
🪛 GitHub Actions: PR - Windows checks
src/flb_lib.c

[error] 1-1: Cannot open include file: 'unistd.h' (POSIX header) on Windows. Compilation failed during fluent-bit static build.

🪛 GitHub Actions: Run unit tests
tests/runtime/in_opentelemetry_routing.c

[error] 200-200: Check ret == 0... failed. OTLP comprehensive routing test encountered configuration load issues (otlp_comprehensive_routing_test.yaml missing or unreadable).


[error] 351-351: Check ctx != NULL... failed. OTLP routing test context was NULL during assertion.

tests/internal/router_config.c

[error] 1060-1060: Check cf != NULL... failed. Router config parsing encountered a NULL config context.

🔇 Additional comments (8)
tests/internal/router_config.c (8)

12-13: LGTM!

The new includes are appropriate for the added OTEL and msgpack functionality.

Also applies to: 21-21


91-156: LGTM!

The function properly handles metadata and body values with appropriate error handling and cleanup.


158-248: LGTM!

The function correctly constructs grouped log events with proper group initialization, metadata/attribute handling, and cleanup.


346-366: Note: Different encoder initialization pattern requires special cleanup.

This function uses msgpack_sbuffer_init directly instead of flb_log_event_encoder_init like the other helpers. This means callers must call both msgpack_sbuffer_destroy and flb_log_event_encoder_destroy for proper cleanup (as seen in the test functions). Consider documenting this requirement or making the cleanup pattern consistent across all helpers.


1289-1370: LGTM!

The test correctly validates metadata context evaluation with proper setup, matching/non-matching cases, and cleanup.


1372-1526: LGTM!

The test thoroughly validates both group metadata and group attributes contexts with appropriate test cases and cleanup.


1528-1771: LGTM!

The test comprehensively validates all OTEL context types (resource attributes, scope metadata, scope attributes) with proper cleanup including the required msgpack_sbuffer_destroy calls.


2048-2048: LGTM!

All new test functions are properly registered in TEST_LIST.

Also applies to: 2052-2054

Comment on lines +1022 to +1029
if (handled) {
if (appended > 0) {
return appended;
}
return 1;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t strip conditional routes when nothing matched.

When no route produced a payload, appended stays 0, yet we still return 1. input_log_append() treats that as “handled” and calls input_chunk_remove_conditional_routes(), removing those routes from the original chunk—dropping data. Only signal “handled” when at least one payload was appended.

-    if (handled) {
-        if (appended > 0) {
-            return appended;
-        }
-        return 1;
-    }
-
-    return 0;
+    if (handled) {
+        return appended;
+    }
+
+    return 0;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (handled) {
if (appended > 0) {
return appended;
}
return 1;
}
if (handled) {
return appended;
}
return 0;
🤖 Prompt for AI Agents
In src/flb_input_log.c around lines 1022 to 1028, the current branch returns 1
(treated as “handled”) even when appended == 0 which causes
input_chunk_remove_conditional_routes() to drop routes; change the logic so that
the function only signals handled (return 1) when appended > 0 and otherwise
return 0 (not handled) so conditional routes are preserved; i.e., keep the check
for handled but ensure you return 0 if appended == 0 instead of 1.

Comment on lines 41 to 44
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix Windows build: guard POSIX-only header and _access usage

PR - Windows checks currently fails with Cannot open include file: 'unistd.h' on MSVC. Bringing in <unistd.h> (and the naked access() that depends on it) breaks Windows builds. Please gate the POSIX path with #ifndef FLB_SYSTEM_WINDOWS, include <io.h> on Windows, and map to _access/_R_OK so both platforms compile again.

-#include <sys/stat.h>
-#include <unistd.h>
-#include <errno.h>
-#include <stdlib.h>
+#include <sys/stat.h>
+#include <errno.h>
+#include <stdlib.h>
+#ifdef FLB_SYSTEM_WINDOWS
+#include <io.h>
+#ifndef R_OK
+#define R_OK 4
+#endif
+#define access _access
+#else
+#include <unistd.h>
+#endif
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <errno.h>
#include <stdlib.h>
#ifdef FLB_SYSTEM_WINDOWS
#include <io.h>
#ifndef R_OK
#define R_OK 4
#endif
#define access _access
#else
#include <unistd.h>
#endif
🤖 Prompt for AI Agents
In src/flb_lib.c around lines 41 to 44, the POSIX-only include and access usage
causes MSVC failures; wrap the POSIX includes (unistd.h and any access use) in
#ifndef FLB_SYSTEM_WINDOWS and for Windows include <io.h> and map to
_access/_R_OK: so add a conditional that includes <unistd.h> for non-Windows,
and for FLB_SYSTEM_WINDOWS include <io.h> and define access as _access and R_OK
as _R_OK (or use an inline wrapper) so the code compiles on both platforms.

Comment on lines +725 to +739
/* Set configuration root path */
if (cfg) {
real_path = realpath(cfg, NULL);
if (real_path) {
end = strrchr(real_path, FLB_DIRCHAR);
if (end) {
end++;
*end = '\0';
if (ctx->config->conf_path) {
flb_free(ctx->config->conf_path);
}
ctx->config->conf_path = flb_strdup(real_path);
}
free(real_path);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Provide a Windows-safe real-path fallback

realpath() is unavailable on Windows/MSVC, so once the include issue is addressed this block will still fail to build on that platform. Please wrap this section with #ifndef FLB_SYSTEM_WINDOWS and use a Windows alternative such as _fullpath/GetFullPathNameA (making sure to duplicate the result before freeing) under the FLB_SYSTEM_WINDOWS branch, so both toolchains keep working.

🤖 Prompt for AI Agents
In src/flb_lib.c around lines 725 to 739, the current use of realpath(cfg, NULL)
is not portable to Windows; wrap the existing realpath block in #ifndef
FLB_SYSTEM_WINDOWS and add an #else branch for FLB_SYSTEM_WINDOWS that uses a
Windows-safe alternative (e.g. call _fullpath or GetFullPathNameA to resolve
cfg), ensure you allocate/duplicate the resulting path into a heap buffer
compatible with the rest of the code, perform the same strrchr/trim logic on
that duplicated path, free the Windows-specific temporary buffer appropriately,
and preserve the existing behavior of updating ctx->config->conf_path (freeing
the old value first).

Comment on lines +250 to +368
static int build_log_chunk_with_otel(const char *service_name,
const char *scope_name,
const char *scope_version,
const char *scope_attribute_key,
const char *scope_attribute_value,
struct flb_log_event_encoder *encoder,
struct flb_event_chunk *chunk)
{
char *otlp_json = NULL;
int ret;
int error_status = 0;
size_t json_len;
const char *attr_key = scope_attribute_key;

if (!encoder || !chunk) {
return -1;
}

/* Extract attribute key from "scope.attr" format if needed */
if (strncmp(scope_attribute_key, "scope.", 6) == 0) {
attr_key = scope_attribute_key + 6; /* Skip "scope." prefix */
}

/* Build OTLP JSON format with nested structure for service.name */
/* Note: We create a nested service object to match $service['name'] accessor */
json_len = snprintf(NULL, 0,
"{"
"\"resourceLogs\":["
"{"
"\"resource\":{"
"\"attributes\":["
"{\"key\":\"service\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"name\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"scopeLogs\":["
"{"
"\"scope\":{"
"\"name\":\"%s\","
"\"version\":\"%s\","
"\"attributes\":["
"{\"key\":\"scope\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"%s\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"logRecords\":["
"{"
"\"timeUnixNano\":\"1728172800000000000\","
"\"severityNumber\":9,"
"\"severityText\":\"INFO\","
"\"body\":{\"stringValue\":\"test log\"}"
"}"
"]"
"}"
"]"
"}"
"]"
"}",
service_name, scope_name, scope_version, attr_key, scope_attribute_value);

otlp_json = flb_malloc(json_len + 1);
if (!otlp_json) {
return -1;
}

snprintf(otlp_json, json_len + 1,
"{"
"\"resourceLogs\":["
"{"
"\"resource\":{"
"\"attributes\":["
"{\"key\":\"service\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"name\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"scopeLogs\":["
"{"
"\"scope\":{"
"\"name\":\"%s\","
"\"version\":\"%s\","
"\"attributes\":["
"{\"key\":\"scope\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"%s\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"logRecords\":["
"{"
"\"timeUnixNano\":\"1728172800000000000\","
"\"severityNumber\":9,"
"\"severityText\":\"INFO\","
"\"body\":{\"stringValue\":\"test log\"}"
"}"
"]"
"}"
"]"
"}"
"]"
"}",
service_name, scope_name, scope_version, attr_key, scope_attribute_value);

/* Initialize encoder buffer (needed for msgpack_sbuffer_write) */
memset(encoder, 0, sizeof(*encoder));
msgpack_sbuffer_init(&encoder->buffer);

/* Convert OTLP JSON to msgpack using the actual converter */
ret = flb_opentelemetry_logs_json_to_msgpack(encoder, otlp_json, json_len, NULL, &error_status);
flb_free(otlp_json);

if (ret != 0) {
msgpack_sbuffer_destroy(&encoder->buffer);
return -1;
}

/* Set up the chunk from the encoder output */
memset(chunk, 0, sizeof(*chunk));
chunk->type = FLB_EVENT_TYPE_LOGS;
chunk->data = encoder->output_buffer;
chunk->size = encoder->output_length;
/* Count actual events in the buffer */
chunk->total_events = flb_mp_count(encoder->output_buffer, encoder->output_length);

return 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add NULL checks for string parameters.

The function uses service_name, scope_name, scope_version, and scope_attribute_value in snprintf without checking if they're NULL, which could cause undefined behavior.

Apply this diff to add NULL checks:

 static int build_log_chunk_with_otel(const char *service_name,
                                      const char *scope_name,
                                      const char *scope_version,
                                      const char *scope_attribute_key,
                                      const char *scope_attribute_value,
                                      struct flb_log_event_encoder *encoder,
                                      struct flb_event_chunk *chunk)
 {
     char *otlp_json = NULL;
     int ret;
     int error_status = 0;
     size_t json_len;
     const char *attr_key = scope_attribute_key;
 
-    if (!encoder || !chunk) {
+    if (!encoder || !chunk || !service_name || !scope_name || 
+        !scope_version || !scope_attribute_key || !scope_attribute_value) {
         return -1;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static int build_log_chunk_with_otel(const char *service_name,
const char *scope_name,
const char *scope_version,
const char *scope_attribute_key,
const char *scope_attribute_value,
struct flb_log_event_encoder *encoder,
struct flb_event_chunk *chunk)
{
char *otlp_json = NULL;
int ret;
int error_status = 0;
size_t json_len;
const char *attr_key = scope_attribute_key;
if (!encoder || !chunk) {
return -1;
}
/* Extract attribute key from "scope.attr" format if needed */
if (strncmp(scope_attribute_key, "scope.", 6) == 0) {
attr_key = scope_attribute_key + 6; /* Skip "scope." prefix */
}
/* Build OTLP JSON format with nested structure for service.name */
/* Note: We create a nested service object to match $service['name'] accessor */
json_len = snprintf(NULL, 0,
"{"
"\"resourceLogs\":["
"{"
"\"resource\":{"
"\"attributes\":["
"{\"key\":\"service\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"name\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"scopeLogs\":["
"{"
"\"scope\":{"
"\"name\":\"%s\","
"\"version\":\"%s\","
"\"attributes\":["
"{\"key\":\"scope\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"%s\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"logRecords\":["
"{"
"\"timeUnixNano\":\"1728172800000000000\","
"\"severityNumber\":9,"
"\"severityText\":\"INFO\","
"\"body\":{\"stringValue\":\"test log\"}"
"}"
"]"
"}"
"]"
"}"
"]"
"}",
service_name, scope_name, scope_version, attr_key, scope_attribute_value);
otlp_json = flb_malloc(json_len + 1);
if (!otlp_json) {
return -1;
}
snprintf(otlp_json, json_len + 1,
"{"
"\"resourceLogs\":["
"{"
"\"resource\":{"
"\"attributes\":["
"{\"key\":\"service\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"name\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"scopeLogs\":["
"{"
"\"scope\":{"
"\"name\":\"%s\","
"\"version\":\"%s\","
"\"attributes\":["
"{\"key\":\"scope\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"%s\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"logRecords\":["
"{"
"\"timeUnixNano\":\"1728172800000000000\","
"\"severityNumber\":9,"
"\"severityText\":\"INFO\","
"\"body\":{\"stringValue\":\"test log\"}"
"}"
"]"
"}"
"]"
"}"
"]"
"}",
service_name, scope_name, scope_version, attr_key, scope_attribute_value);
/* Initialize encoder buffer (needed for msgpack_sbuffer_write) */
memset(encoder, 0, sizeof(*encoder));
msgpack_sbuffer_init(&encoder->buffer);
/* Convert OTLP JSON to msgpack using the actual converter */
ret = flb_opentelemetry_logs_json_to_msgpack(encoder, otlp_json, json_len, NULL, &error_status);
flb_free(otlp_json);
if (ret != 0) {
msgpack_sbuffer_destroy(&encoder->buffer);
return -1;
}
/* Set up the chunk from the encoder output */
memset(chunk, 0, sizeof(*chunk));
chunk->type = FLB_EVENT_TYPE_LOGS;
chunk->data = encoder->output_buffer;
chunk->size = encoder->output_length;
/* Count actual events in the buffer */
chunk->total_events = flb_mp_count(encoder->output_buffer, encoder->output_length);
return 0;
}
static int build_log_chunk_with_otel(const char *service_name,
const char *scope_name,
const char *scope_version,
const char *scope_attribute_key,
const char *scope_attribute_value,
struct flb_log_event_encoder *encoder,
struct flb_event_chunk *chunk)
{
char *otlp_json = NULL;
int ret;
int error_status = 0;
size_t json_len;
const char *attr_key = scope_attribute_key;
if (!encoder || !chunk || !service_name || !scope_name ||
!scope_version || !scope_attribute_key || !scope_attribute_value) {
return -1;
}
/* Extract attribute key from "scope.attr" format if needed */
if (strncmp(scope_attribute_key, "scope.", 6) == 0) {
attr_key = scope_attribute_key + 6; /* Skip "scope." prefix */
}
/* Build OTLP JSON format with nested structure for service.name */
/* Note: We create a nested service object to match $service['name'] accessor */
json_len = snprintf(NULL, 0,
"{"
"\"resourceLogs\":["
"{"
"\"resource\":{"
"\"attributes\":["
"{\"key\":\"service\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"name\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"scopeLogs\":["
"{"
"\"scope\":{"
"\"name\":\"%s\","
"\"version\":\"%s\","
"\"attributes\":["
"{\"key\":\"scope\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"%s\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"logRecords\":["
"{"
"\"timeUnixNano\":\"1728172800000000000\","
"\"severityNumber\":9,"
"\"severityText\":\"INFO\","
"\"body\":{\"stringValue\":\"test log\"}"
"}"
"]"
"}"
"]"
"}"
"]"
"}",
service_name, scope_name, scope_version, attr_key, scope_attribute_value);
otlp_json = flb_malloc(json_len + 1);
if (!otlp_json) {
return -1;
}
snprintf(otlp_json, json_len + 1,
"{"
"\"resourceLogs\":["
"{"
"\"resource\":{"
"\"attributes\":["
"{\"key\":\"service\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"name\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"scopeLogs\":["
"{"
"\"scope\":{"
"\"name\":\"%s\","
"\"version\":\"%s\","
"\"attributes\":["
"{\"key\":\"scope\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"%s\",\"value\":{\"stringValue\":\"%s\"}}]}}}"
"]"
"},"
"\"logRecords\":["
"{"
"\"timeUnixNano\":\"1728172800000000000\","
"\"severityNumber\":9,"
"\"severityText\":\"INFO\","
"\"body\":{\"stringValue\":\"test log\"}"
"}"
"]"
"}"
"]"
"}"
"]"
"}",
service_name, scope_name, scope_version, attr_key, scope_attribute_value);
/* Initialize encoder buffer (needed for msgpack_sbuffer_write) */
memset(encoder, 0, sizeof(*encoder));
msgpack_sbuffer_init(&encoder->buffer);
/* Convert OTLP JSON to msgpack using the actual converter */
ret = flb_opentelemetry_logs_json_to_msgpack(encoder, otlp_json, json_len, NULL, &error_status);
flb_free(otlp_json);
if (ret != 0) {
msgpack_sbuffer_destroy(&encoder->buffer);
return -1;
}
/* Set up the chunk from the encoder output */
memset(chunk, 0, sizeof(*chunk));
chunk->type = FLB_EVENT_TYPE_LOGS;
chunk->data = encoder->output_buffer;
chunk->size = encoder->output_length;
/* Count actual events in the buffer */
chunk->total_events = flb_mp_count(encoder->output_buffer, encoder->output_length);
return 0;
}

Comment on lines +397 to +476
flb_test_otlp_routing(
"otlp_comprehensive_routing_test.yaml",
"routing_logs.json",
expectations,
sizeof(expectations) / sizeof(expectations[0])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Provide the OTLP routing config used by this test.

The runtime test currently fails at flb_lib_config_file() because otlp_comprehensive_routing_test.yaml cannot be found/read (see pipeline errors: “Check ret == 0... failed”). Without that config, the test cannot start Fluent Bit and the entire suite aborts. Please add the YAML file to the tree (or update the path you pass here so it resolves to an existing file) before merging.

This patch improves context extraction from log records and enhances
the routing logic with better context handling.

Signed-off-by: Eduardo Silva <[email protected]>
This patch adds library-level API functions to support context
variables in routing configurations.

Signed-off-by: Eduardo Silva <[email protected]>
This patch adds runtime tests for OpenTelemetry input with context-based
conditional routing.

Signed-off-by: Eduardo Silva <[email protected]>
This patch fixes edge cases and improves robustness in context
extraction and handling.

Signed-off-by: Eduardo Silva <[email protected]>
This patch fixes test assertions and improves test reliability.

Signed-off-by: Eduardo Silva <[email protected]>
This patch adds comprehensive unit tests for conditional routing with
context support.

Signed-off-by: Eduardo Silva <[email protected]>
This patch adds YAML configuration examples for context-based
conditional routing.

Signed-off-by: Eduardo Silva <[email protected]>
This patch enhances runtime tests with additional context routing
scenarios.

Signed-off-by: Eduardo Silva <[email protected]>
Return FLB_FALSE when record is NULL instead of FLB_TRUE. NULL cond
still returns FLB_TRUE (no condition means pass), but NULL record
should fail the condition.

Signed-off-by: Eduardo Silva <[email protected]>
… data

Add comprehensive test coverage for group marker skipping when read_groups
is disabled, including:

- Normal group marker skipping (GROUP_START/END properly matched)
- Unmatched GROUP_START markers
- Unmatched GROUP_END markers
- Multiple consecutive group markers
- Mixed invalid group sequences
- Buffers containing only group markers

These tests verify that the decoder:
1. Correctly skips group markers when read_groups=false
2. Returns all normal log records in order
3. Never returns zeroed events
4. Handles corrupted/malformed group data gracefully
5. Doesn't crash on invalid group sequences

Also improve code documentation explaining that msgpack_unpack_next
handles reinitialization internally, so explicit cleanup isn't needed
in the recursive call path.

Signed-off-by: Eduardo Silva <[email protected]>
… marker handling

Add new test cases to verify robust handling of group markers and
corrupted data in the log event decoder:

- decoder_invalid_marker_preserves_group_state: Verifies that group
  state (metadata/attributes) is preserved when invalid group markers
  appear mid-group, preventing data loss from corruption

- Enhanced decoder_corrupted_group_timestamps: Updated to verify
  that invalid markers (negative timestamps not -1/-2) are properly
  skipped while continuing to process valid records

- decoder_group_end_start_sequence: Tests edge cases with GROUP_END
  followed by GROUP_START and verifies correct group state transitions

These tests ensure the decoder maintains data integrity and group
associations even when encountering corrupted or invalid group markers,
validating the improved error handling and state preservation logic.

Signed-off-by: Eduardo Silva <[email protected]>
Enhance the log event decoder to handle corrupted group markers more
robustly while preserving data integrity:

1. **Preserve group state on invalid markers**: Invalid group markers
   (negative timestamps not -1 or -2) no longer clear the active group
   state. This prevents data loss when corruption occurs mid-group,
   ensuring subsequent normal logs retain their group metadata.

2. **Add recursion depth guard**: Implement a safety limit (1000) to
   prevent potential stack overflow from excessive consecutive group
   markers or corruption. The decoder returns an error if the limit
   is reached.

3. **Add debug logging**: Log invalid group marker timestamps at debug
   level to aid in troubleshooting data corruption issues without
   cluttering normal operation logs.

4. **Improve documentation**: Enhanced comments explaining the negative
   timestamp contract and the improved error handling behavior.

These changes make the decoder more resilient to data corruption while
maintaining backward compatibility and ensuring data integrity is
preserved even when encountering invalid group markers.

Signed-off-by: Eduardo Silva <[email protected]>
@edsiper
Copy link
Member Author

edsiper commented Nov 1, 2025

@codex review

@chatgpt-codex-connector
Copy link

Codex Review: Didn't find any major issues. 👍

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
src/flb_input_log.c (3)

346-349: Don't skip routes without conditions.

We still return early when the route has no compiled condition, so per-record direct routes without a filter never emit a payload and the records get discarded. Let flb_router_condition_evaluate_record() make the decision (it already treats a NULL condition as FLB_TRUE) so we stop dropping data.

-    /* Check if route has a condition (flb_router_condition_evaluate_record handles NULL conditions) */
-    if (!payload->route->condition) {
-        return 0;
-    }

337-339: Keep chunk accounting in sync with the payload content.

payload->total_records still reports only the matched log events, even though we emit extra GROUP_START/GROUP_END records into the msgpack buffer. This undercounts what we append to the chunk and breaks ACK/limits/metrics. Please track how many markers you encode (in both builders) and add that to matched before assigning total_records.

@@
-    int matched;
+    int matched;
+    size_t marker_count = 0;
@@
                     ret = encode_chunk_record(encoder, group_start_record);
                     if (ret != 0) {
                         flb_free(matched_by_route);
                         flb_log_event_encoder_destroy(encoder);
                         return -1;
                     }
+                    marker_count++;
@@
                 ret = encode_chunk_record(encoder, group_end);
                 if (ret != 0) {
                     flb_free(matched_by_route);
                     flb_log_event_encoder_destroy(encoder);
                     return -1;
                 }
                 group_end = NULL;
+                marker_count++;
@@
     if (group_end != NULL) {
         ret = encode_chunk_record(encoder, group_end);
         if (ret != 0) {
             flb_free(matched_by_route);
             flb_log_event_encoder_destroy(encoder);
             return -1;
         }
+        marker_count++;
     }
@@
-    payload->total_records = matched;
+    payload->total_records = matched + marker_count;

Apply the same marker counting to build_payload_for_default_route() so default routes report correctly as well.

Also applies to: 468-479, 500-501, 641-652


1023-1028: Return handled only when data was appended.

We still propagate 1 even when appended is zero, so the caller removes those routes from the original chunk and loses data. Only report appended (which is zero when nothing was added) so route removal is skipped in that case.

-    if (handled) {
-        if (appended > 0) {
-            return appended;
-        }
-        return 1;
-    }
-
-    return 0;
+    if (handled) {
+        return appended;
+    }
+
+    return 0;
🧹 Nitpick comments (1)
tests/internal/log_event_decoder.c (1)

1431-1437: Consider adding a recursion depth limit test.

The tests comprehensively cover group marker handling but don't explicitly test the recursion depth limit (FLB_LOG_EVENT_DECODER_MAX_RECURSION_DEPTH = 1000). Consider adding a test that creates 1000+ consecutive group markers to verify the depth guard triggers correctly and returns FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1ba5db3 and e8d2444.

📒 Files selected for processing (16)
  • include/fluent-bit/flb_log_event.h (1 hunks)
  • include/fluent-bit/flb_log_event_decoder.h (1 hunks)
  • plugins/processor_content_modifier/cm_logs.c (1 hunks)
  • src/flb_conditionals.c (6 hunks)
  • src/flb_input_log.c (10 hunks)
  • src/flb_lib.c (6 hunks)
  • src/flb_log_event_decoder.c (4 hunks)
  • tests/internal/conditional_routing.c (1 hunks)
  • tests/internal/conditionals.c (3 hunks)
  • tests/internal/data/config_format/yaml/routing/context.yaml (1 hunks)
  • tests/internal/log_event_decoder.c (3 hunks)
  • tests/internal/router_config.c (6 hunks)
  • tests/runtime/CMakeLists.txt (1 hunks)
  • tests/runtime/data/opentelemetry/routing_logs.json (1 hunks)
  • tests/runtime/data/routing/otlp_comprehensive_routing_test.yaml (1 hunks)
  • tests/runtime/in_opentelemetry_routing.c (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • tests/runtime/data/routing/otlp_comprehensive_routing_test.yaml
  • tests/runtime/data/opentelemetry/routing_logs.json
  • plugins/processor_content_modifier/cm_logs.c
  • tests/runtime/in_opentelemetry_routing.c
🧰 Additional context used
🧠 Learnings (10)
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • tests/internal/log_event_decoder.c
  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • tests/internal/log_event_decoder.c
  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.

Applied to files:

  • src/flb_lib.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/flb_lib.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • src/flb_lib.c
  • tests/internal/router_config.c
📚 Learning: 2025-09-14T09:46:09.531Z
Learnt from: aminvakil
Repo: fluent/fluent-bit PR: 10844
File: conf/fluent-bit:13-15
Timestamp: 2025-09-14T09:46:09.531Z
Learning: For fluent-bit Debian packaging, /opt/fluent-bit/bin/ is the appropriate installation path since the package may be installed from non-official Debian sources, making /opt compliant with FHS for optional software packages.

Applied to files:

  • src/flb_lib.c
🧬 Code graph analysis (6)
tests/internal/log_event_decoder.c (4)
include/fluent-bit/flb_time.h (2)
  • flb_time_set (75-79)
  • flb_time_equal (99-101)
src/flb_log_event_decoder.c (6)
  • flb_log_event_decoder_init (103-120)
  • flb_log_event_decoder_get_error_description (480-541)
  • flb_log_event_decoder_read_groups (89-101)
  • flb_log_event_decoder_destroy (151-178)
  • flb_log_event_decoder_next (309-456)
  • flb_log_event_decoder_get_record_type (458-478)
src/flb_pack.c (1)
  • flb_msgpack_to_json_str (1459-1500)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
tests/internal/conditional_routing.c (14)
src/flb_log_event_encoder.c (5)
  • flb_log_event_encoder_init (42-74)
  • flb_log_event_encoder_begin_record (246-254)
  • flb_log_event_encoder_destroy (99-116)
  • flb_log_event_encoder_set_current_timestamp (289-292)
  • flb_log_event_encoder_commit_record (256-274)
lib/cfl/src/cfl_variant.c (5)
  • cfl_variant_destroy (265-285)
  • cfl_variant_create_from_kvlist (225-236)
  • cfl_variant_create_from_array (212-223)
  • cfl_variant_create_from_string_s (90-115)
  • cfl_variant_create_from_bool (148-159)
src/flb_router_config.c (3)
  • flb_router_config_parse (1185-1223)
  • flb_router_routes_destroy (394-409)
  • flb_router_apply_config (1337-1428)
src/config_format/flb_config_format.c (3)
  • flb_cf_destroy (151-157)
  • flb_cf_create (104-149)
  • flb_cf_section_create (640-722)
include/fluent-bit/flb_mem.h (2)
  • flb_calloc (84-96)
  • flb_free (126-128)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/flb_router_condition.c (5)
  • flb_router_chunk_context_init (152-163)
  • flb_condition_eval_logs (275-314)
  • flb_router_chunk_context_reset (165-185)
  • flb_router_chunk_context_destroy (187-190)
  • flb_router_path_should_route (379-402)
src/flb_router.c (1)
  • flb_router_exit (273-300)
src/flb_input_chunk.c (2)
  • flb_input_chunk_create (874-985)
  • flb_input_chunk_destroy (1066-1168)
src/flb_routes_mask.c (2)
  • flb_routes_mask_set_bit (76-91)
  • flb_routes_mask_get_bit (125-140)
lib/cfl/src/cfl_array.c (4)
  • cfl_array_create (24-49)
  • cfl_array_destroy (51-69)
  • cfl_array_append (117-165)
  • cfl_array_append_string (167-185)
lib/cfl/src/cfl_kvlist.c (4)
  • cfl_kvlist_create (26-38)
  • cfl_kvlist_insert_string (341-355)
  • cfl_kvlist_destroy (40-61)
  • cfl_kvlist_insert_s (291-318)
lib/cfl/include/cfl/cfl_array.h (2)
  • cfl_array_size (46-49)
  • cfl_array_fetch_by_index (36-44)
lib/cfl/src/cfl_sds.c (1)
  • cfl_sds_len (88-91)
src/flb_lib.c (4)
include/fluent-bit/flb_compat.h (1)
  • realpath (120-126)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_config.c (1)
  • flb_config_load_config_format (968-1081)
src/config_format/flb_config_format.c (1)
  • flb_cf_destroy (151-157)
tests/internal/router_config.c (6)
src/flb_log_event_encoder.c (8)
  • flb_log_event_encoder_init (42-74)
  • flb_log_event_encoder_begin_record (246-254)
  • flb_log_event_encoder_destroy (99-116)
  • flb_log_event_encoder_set_current_timestamp (289-292)
  • flb_log_event_encoder_commit_record (256-274)
  • flb_log_event_encoder_group_init (390-407)
  • flb_log_event_encoder_group_header_end (409-419)
  • flb_log_event_encoder_group_end (421-443)
src/opentelemetry/flb_opentelemetry_logs.c (1)
  • flb_opentelemetry_logs_json_to_msgpack (812-889)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
src/flb_mp.c (1)
  • flb_mp_count (43-46)
src/flb_router_config.c (2)
  • flb_router_config_parse (1185-1223)
  • flb_router_routes_destroy (394-409)
src/flb_router_condition.c (3)
  • flb_router_chunk_context_init (152-163)
  • flb_condition_eval_logs (275-314)
  • flb_router_chunk_context_reset (165-185)
src/flb_input_log.c (5)
plugins/filter_geoip2/libmaxminddb-1.12.2/src/maxminddb.c (1)
  • record_type (1055-1080)
include/fluent-bit/flb_mem.h (2)
  • flb_calloc (84-96)
  • flb_free (126-128)
src/flb_log_event_encoder.c (1)
  • flb_log_event_encoder_destroy (99-116)
src/flb_log_event_decoder.c (1)
  • flb_log_event_decoder_get_record_type (458-478)
src/flb_router_condition.c (1)
  • flb_router_condition_evaluate_record (413-436)
tests/internal/conditionals.c (1)
src/flb_conditionals.c (4)
  • flb_condition_add_rule (239-259)
  • flb_condition_create (223-237)
  • flb_condition_evaluate (440-452)
  • flb_condition_destroy (261-278)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
🔇 Additional comments (17)
tests/internal/data/config_format/yaml/routing/context.yaml (1)

1-27: LGTM!

This test configuration properly exercises the new context-aware conditional routing feature. The YAML structure is valid, with well-formed condition rules across three context types (metadata, group_attributes, otel_resource_attributes). The JPath field expressions and equality operators are syntactically correct.

tests/runtime/CMakeLists.txt (1)

64-64: Clean test registration for OpenTelemetry routing.

The new runtime test for OpenTelemetry routing is properly registered within the CMake build configuration. The placement is logical, using the existing FLB_IN_OPENTELEMETRY guard flag alongside the base OpenTelemetry input test—both tests relate to the same component but exercise different functionality (base input vs. routing-specific behavior). The test target name will be uniquely derived as flb-rt-in_opentelemetry_routing based on the source filename.

include/fluent-bit/flb_log_event_decoder.h (1)

65-65: LGTM - Good safety measure.

The recursion depth guard is a sensible addition to prevent stack overflow when processing deeply nested or corrupted group markers.

include/fluent-bit/flb_log_event.h (1)

36-49: LGTM - Clear documentation of the group marker contract.

The documentation clearly defines the semantics for normal records vs. group markers using timestamp values, and explicitly reserves negative timestamps for markers with a clear contract that only -1 and -2 are valid.

tests/internal/log_event_decoder.c (5)

275-295: LGTM - Clean helper function.

The pack_group_marker helper correctly encodes group markers with negative timestamps and appropriate metadata/attributes structure for testing.


297-459: Comprehensive test coverage for skip_groups behavior.

This test properly validates that group markers are transparently skipped when read_groups=false, verifying:

  • Only normal records are returned
  • Correct timestamp sequence
  • Group markers never leak through as zeroed events

461-707: Thorough coverage of corrupted group marker scenarios.

Tests validate proper handling of unmatched GROUP_START/GROUP_END, multiple consecutive markers, and mixed invalid states, ensuring the decoder gracefully skips problematic markers.


1023-1158: Good coverage for invalid negative timestamps.

This test confirms that timestamps like -3, -10, and -1000 are correctly identified as invalid (not the reserved -1 or -2) and skipped by the decoder.


1160-1245: Critical test for state preservation.

Validates that when an invalid marker (e.g., -3) is encountered mid-group, the decoder skips the invalid marker but preserves the active group metadata for subsequent normal records. This is important for resilience against partial corruption.

src/flb_log_event_decoder.c (5)

23-25: LGTM - Good safety infrastructure.

Adding the flb_log header and defining a recursion depth limit provides necessary infrastructure for the recursion guard implemented below.


355-361: Effective recursion depth guard.

The check prevents stack overflow from excessive recursion when processing long sequences of group markers or corrupted data. The limit of 1000 is reasonable and the warning message is helpful for debugging.


366-381: Proper handling of invalid group markers.

The code correctly identifies invalid negative timestamps (not -1 or -2), logs them for debugging, preserves the current group state, and recursively fetches the next valid record with proper depth tracking.


435-447: Past review issue has been resolved.

The previous review comment noted that group markers were not being skipped properly when read_groups=false, returning a zeroed event without advancing the decoder. The current implementation correctly recurses to fetch the next record (lines 442-446) with proper depth tracking, addressing the regression.


396-428: The review comment's concern is unfounded.

After msgpack_unpacked_release_zone(&context->unpacked_event) at line 402, the structure is not left in an undefined state. The msgpack library's msgpack_unpack_next function explicitly handles this case:

  1. It calls msgpack_unpacked_destroy(result) first, which safely checks if(result->zone != NULL) before freeing—so zone=NULL is handled correctly.
  2. A fresh zone is then created during unpacking via ctx.user.z = &result->zone.

This is the documented usage pattern for msgpack. Adding msgpack_unpacked_init() after msgpack_unpacked_release_zone() is unnecessary and inconsistent with the library's design, which intentionally handles zone cleanup and reinit implicitly through msgpack_unpack_next.

Likely an incorrect or invalid review comment.

src/flb_conditionals.c (3)

28-55: LGTM! Clean refactoring to pluggable variant getter.

The default_get_record_variant function correctly handles the base contexts (METADATA and BODY) with appropriate NULL guards. The default case returning NULL is correct, as extended contexts (group, OTEL) are handled by custom variant getters in the router layer.


363-438: LGTM! Solid evaluation logic with proper short-circuiting.

The pluggable variant getter architecture is well-designed. Key highlights:

  • NULL checks are defensive and correct (NULL cond passes, NULL getter fails)
  • Empty rule set handling follows standard logic (AND vacuously true, OR false)
  • Missing variant handling is correct: fail-fast for AND, skip for OR
  • Short-circuit optimization works correctly for both operators

440-452: LGTM! Past review comment properly addressed.

The wrapper correctly handles the NULL record case that was flagged in the previous review:

  • NULL cond returns TRUE (no conditions to evaluate)
  • NULL record now returns FALSE (cannot evaluate without data)

This prevents conditions from incorrectly passing when records are unavailable due to decoder failures or other errors.

@edsiper edsiper merged commit 668358c into master Nov 1, 2025
57 checks passed
@edsiper edsiper deleted the router-conditionals-contexts branch November 1, 2025 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants