-
Notifications
You must be signed in to change notification settings - Fork 1.8k
out_doris: add new doris out plugin #9514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Does this need additional dependencies? |
This component has no new additional dependencies. |
|
It seems that msvc does not have |
Yes, you have to support all legacy targets I'm afraid so the various vendored libraries have atomic support in place. I'm not sure if there is a general way in Fluent Bit to do it @cosmo0920 ? |
We need to use InterlockedAdd in Windows for the equivalent operation of __sync_fetch_and_add. |
Thank you both! |
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
plugins/out_doris/doris.c (1)
259-272: CRITICAL: Redirect parsing crashes on missing/malformed Location header.Past review comments marked this as "Addressed in commits 9b08297 to 41a623c" but the unsafe parsing is still present. This code will segfault if:
- Location header is missing (
strstrreturns NULL at line 261, then line 262 dereferences it)- "@" is absent (line 262:
strstrreturns NULL, adding 1 gives invalid pointer)- The format doesn't match expectations
This is also brittle for IPv6 addresses and doesn't handle authentication tokens in URLs correctly.
Apply defensive parsing with NULL checks at each step:
if (c->resp.status == 307) { // redirect // example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load? char* location = strstr(c->resp.data, "Location:"); + if (!location) { + flb_plg_error(ctx->ins, "307 redirect missing Location header"); + out_ret = FLB_RETRY; + goto cleanup; + } + location += 9; // skip "Location:" + while (*location == ' ') location++; // skip whitespace + char* start = strstr(location, "@"); + if (!start) { + flb_plg_error(ctx->ins, "malformed Location header (no @)"); + out_ret = FLB_RETRY; + goto cleanup; + } + start += 1; + char* mid = strstr(start, ":"); + if (!mid) { + flb_plg_error(ctx->ins, "malformed Location header (no port)"); + out_ret = FLB_RETRY; + goto cleanup; + } + char* end = strstr(mid, "/api"); + if (!end) { + flb_plg_error(ctx->ins, "malformed Location header (no /api)"); + out_ret = FLB_RETRY; + goto cleanup; + } + char redirect_host[1024] = {0}; + size_t host_len = mid - start; + if (host_len >= sizeof(redirect_host)) { + flb_plg_error(ctx->ins, "redirect host too long"); + out_ret = FLB_RETRY; + goto cleanup; + } memcpy(redirect_host, start, mid - start); char redirect_port[10] = {0}; + size_t port_len = end - (mid + 1); + if (port_len >= sizeof(redirect_port)) { + flb_plg_error(ctx->ins, "redirect port too long"); + out_ret = FLB_RETRY; + goto cleanup; + } memcpy(redirect_port, mid + 1, end - (mid + 1)); out_ret = http_put(ctx, redirect_host, atoi(redirect_port), body, body_len, tag, tag_len, label, label_len, "be"); +cleanup:; }Better yet, consider using Fluent Bit's URL parsing utilities (e.g.,
flb_utils_url_split) if available for more robust parsing.
🧹 Nitpick comments (3)
plugins/out_doris/doris.h (1)
31-36: Consider documenting atomic access requirement for therunningfield.Based on past review comments, the
runningfield is accessed via platform-specific atomic wrapper functions in the implementation. Adding a comment here would help prevent future bugs from direct non-atomic access.struct flb_doris_progress_reporter { + /* Accessed via atomic operations - do not read/write directly */ int running; size_t total_bytes;tests/runtime/out_doris.c (1)
88-92: Remove redundant NULL check forres_data.The
res_dataparameter is already checked for NULL at lines 74-77. This duplicate check is harmless but unnecessary.if (!TEST_CHECK(res_ret == 0)) { TEST_MSG("callback ret=%d", res_ret); } - if (!TEST_CHECK(res_data != NULL)) { - TEST_MSG("res_data is NULL"); - flb_sds_destroy(out_line); - return; - }plugins/out_doris/doris.c (1)
45-59: Minor: Document return value semantics mismatch insync_fetch_and_add.
InterlockedAddreturns the new value (after addition), while__sync_fetch_and_addreturns the old value (before addition). This semantic inconsistency doesn't cause bugs currently since the return value is never used (lines 408, 425, 426, 428), but could confuse future developers or cause bugs if the return value is later used.Consider either:
- Documenting this mismatch with a comment, or
- Using
InterlockedExchangeAddon Windows to match__sync_fetch_and_addsemantics+/* Note: Return value semantics differ between platforms - not used in this code */ static inline void sync_fetch_and_add(size_t *dest, size_t value) { #ifdef FLB_SYSTEM_WINDOWS #ifdef _WIN64 InterlockedAdd64((LONG64 volatile *) dest, (LONG64) value);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
cmake/plugins_options.cmake(1 hunks)cmake/windows-setup.cmake(1 hunks)plugins/CMakeLists.txt(1 hunks)plugins/out_doris/CMakeLists.txt(1 hunks)plugins/out_doris/doris.c(1 hunks)plugins/out_doris/doris.h(1 hunks)plugins/out_doris/doris_conf.c(1 hunks)plugins/out_doris/doris_conf.h(1 hunks)tests/runtime/CMakeLists.txt(1 hunks)tests/runtime/out_doris.c(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- cmake/plugins_options.cmake
- plugins/CMakeLists.txt
- cmake/windows-setup.cmake
- plugins/out_doris/doris_conf.c
- plugins/out_doris/doris_conf.h
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.
Applied to files:
plugins/out_doris/doris.hplugins/out_doris/doris.c
📚 Learning: 2025-09-04T12:32:46.030Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:0-0
Timestamp: 2025-09-04T12:32:46.030Z
Learning: In Fluent Bit plugins, avoid creating plugin-local config_map entries that duplicate core functionality. The core already provides Retry_Limit parsed into ins->retry_limit, so plugins should use that directly rather than defining their own retry_limit configuration option.
Applied to files:
plugins/out_doris/doris.c
🧬 Code graph analysis (2)
tests/runtime/out_doris.c (2)
src/flb_sds.c (1)
flb_sds_destroy(389-399)src/flb_lib.c (10)
flb_create(143-225)flb_service_set(652-678)flb_input(266-276)flb_output(279-289)flb_stop(1011-1055)flb_destroy(228-263)flb_output_set(520-551)flb_output_set_test(584-615)flb_start(983-994)flb_lib_push(843-870)
plugins/out_doris/doris.c (6)
plugins/out_doris/doris_conf.c (2)
flb_doris_conf_create(91-222)flb_doris_conf_destroy(224-241)src/flb_hash_table.c (4)
flb_hash_table_create(99-137)flb_hash_table_destroy(197-215)flb_hash_table_get_ptr(591-603)flb_hash_table_add(401-494)src/flb_upstream.c (4)
flb_upstream_destroy(656-698)flb_upstream_create(290-363)flb_upstream_conn_get(711-844)flb_upstream_conn_release(862-947)src/flb_http_client.c (5)
flb_http_client(814-859)flb_http_add_header(963-995)flb_http_basic_auth(1323-1327)flb_http_do(1572-1632)flb_http_client_destroy(1688-1695)src/flb_pack.c (2)
flb_pack_json(530-535)flb_pack_msgpack_to_json_format(1169-1450)src/flb_sds.c (1)
flb_sds_destroy(389-399)
🔇 Additional comments (12)
plugins/out_doris/CMakeLists.txt (1)
1-6: LGTM! Clean CMake configuration.The build configuration correctly defines source files and registers the out_doris plugin following the standard Fluent Bit plugin pattern.
tests/runtime/CMakeLists.txt (1)
242-242: LGTM! Test registration follows established pattern.The Doris runtime test is correctly registered in the output plugins section, consistent with other output plugin tests like Chronicle (line 241).
plugins/out_doris/doris.h (1)
23-29: LGTM! Header is now self-contained with proper includes and forward declarations.The header correctly includes
flb_sds.hfor theflb_sds_ttypedef and provides forward declarations for all opaque pointer types used in the structs. This addresses previous review concerns about header self-containment.tests/runtime/out_doris.c (2)
65-103: LGTM! Test callback properly validates output.The callback correctly:
- Validates input parameters with TEST_CHECK
- Searches for expected substrings in the output
- Properly cleans up with flb_sds_destroy
- Thread-safely increments the output counter
148-207: LGTM! Well-structured runtime tests.Both test functions (
flb_test_jsonandflb_test_time_key) properly:
- Create and configure the test context
- Set up the Doris output with appropriate parameters
- Register the formatter test callback to validate output
- Push test data and verify results
- Clean up resources
The tests effectively validate JSON formatting and time_key configuration.
Also applies to: 209-265
plugins/out_doris/doris.c (7)
64-87: LGTM! Proper plugin initialization.The initialization correctly:
- Initializes TLS for BE pool
- Creates the Doris configuration context
- Sets the plugin context
- Registers HTTP debug callbacks
89-137: LGTM! Worker lifecycle properly manages per-worker BE connection pools.The worker init/exit callbacks correctly:
- Create a TLS hash table for each worker's BE connection pool
- Clean up by destroying all upstreams in the pool before destroying the hash table
- Properly iterate with
mk_list_foreach_safefor safe deletion during iterationThis design cleanly isolates connection pools per worker thread.
273-326: LGTM! Response parsing properly guards against failures.The HTTP 200 response handling correctly:
- Bails out early if
flb_pack_jsonfails (line 277-280)- Uses goto labels for proper cleanup (free_buf, parse_done)
- Parses the Status field from the response
- Treats "Success" and "Publish Timeout" as success
- Properly cleans up msgpack resources
This addresses previous review concerns about using uninitialized buffers.
356-386: LGTM! Payload composition is straightforward.The function correctly converts msgpack to JSON format with proper error handling and logging based on the
log_requestconfiguration.
388-431: LGTM! Flush callback properly orchestrates delivery and tracks progress.The flush implementation correctly:
- Composes the payload from event chunk data
- Generates unique labels when enabled (with timestamp and UUID)
- Calls
http_putto deliver the data- Updates progress counters atomically using the
sync_fetch_and_addwrapper- Tracks both successful and failed rows appropriately
- Properly cleans up the payload buffer
442-506: LGTM! Configuration map provides comprehensive options.The config map exposes all necessary parameters for the Doris output plugin with sensible defaults:
- Required fields: endpoint_type, user, database, table
- Optional fields with defaults: password (""), label_prefix ("fluentbit"), time_key ("date")
- Behavioral controls: log_request (true), log_progress_interval (10)
- Multiple custom headers supported via FLB_CONFIG_MAP_MULT
529-545: LGTM! Plugin descriptor correctly configured.The plugin registration specifies:
- Lifecycle callbacks (init, flush, exit, worker_init, worker_exit)
- Test formatter for runtime validation
- Network and TLS capability flags
- 2 workers for concurrent processing
This properly exposes the Doris output plugin to the Fluent Bit engine.
ready for ci and review |
|
Hey folks, please excuse the intromission but I noticed the It seems to me that those are used to track some metrics accross instance workers (that's ok) but I can't see a reason not to use a If using a |
|
Please don't merge this PR, I've noticed issues that need to be fixed and I'll submit a formal review today. |
Signed-off-by: composer <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
plugins/out_doris/doris.c (2)
259-272: CRITICAL: Unsafe redirect parsing with multiple NULL pointer dereference risks.The redirect handling code has severe safety issues that will cause crashes in common scenarios:
- No NULL checks: If the
Location:header is missing,locationwill beNULLand the subsequentstrstr(location, "@")will crash. Similarly, missing@,:, or/apiwill cause NULL pointer dereferences.- Assumes userinfo: The code always looks for
@to skip credentials, but the Location header may not contain userinfo (e.g.,http://host:port/api/...).- No bounds checking: The
memcpyoperations don't validate thatmid - startorend - (mid + 1)are positive or fit within the destination buffers.- No IPv6 support: IPv6 addresses in brackets (
[::1]) will break the:parsing.- No redirect loop prevention: Recursive calls without a hop counter can infinite-loop if the server returns circular redirects.
Replace the unsafe string parsing with robust header extraction and URL parsing:
if (c->resp.status == 307) { // redirect - // example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load? - char* location = strstr(c->resp.data, "Location:"); - char* start = strstr(location, "@") + 1; - char* mid = strstr(start, ":"); - char* end = strstr(mid, "/api"); - char redirect_host[1024] = {0}; - memcpy(redirect_host, start, mid - start); - char redirect_port[10] = {0}; - memcpy(redirect_port, mid + 1, end - (mid + 1)); - - out_ret = http_put(ctx, redirect_host, atoi(redirect_port), - body, body_len, tag, tag_len, label, label_len, "be"); + /* Extract Location header safely */ + char redirect_host[256] = {0}; + int redirect_port = 0; + char *location = strstr(c->resp.data, "Location:"); + + if (!location) { + flb_plg_error(ctx->ins, "307 redirect missing Location header"); + out_ret = FLB_RETRY; + goto cleanup; + } + + location += 9; /* skip "Location:" */ + while (*location == ' ' || *location == '\t') location++; + + /* Parse URL: find host and port after scheme and optional userinfo */ + char *scheme_end = strstr(location, "://"); + if (!scheme_end) { + flb_plg_error(ctx->ins, "invalid Location URL format"); + out_ret = FLB_RETRY; + goto cleanup; + } + + char *host_start = scheme_end + 3; + /* Skip optional userinfo (user:pass@) */ + char *at_sign = strchr(host_start, '@'); + char *slash = strchr(host_start, '/'); + if (at_sign && (!slash || at_sign < slash)) { + host_start = at_sign + 1; + } + + /* Extract host and port */ + char *port_start = NULL; + char *host_end = NULL; + + if (*host_start == '[') { /* IPv6 */ + host_end = strchr(host_start, ']'); + if (!host_end) { + flb_plg_error(ctx->ins, "malformed IPv6 address in Location"); + out_ret = FLB_RETRY; + goto cleanup; + } + host_start++; /* skip '[' */ + if (host_end[1] == ':') { + port_start = host_end + 2; + } + } else { /* IPv4 or hostname */ + char *colon = strchr(host_start, ':'); + slash = strchr(host_start, '/'); + if (colon && (!slash || colon < slash)) { + host_end = colon; + port_start = colon + 1; + } else { + host_end = slash ? slash : host_start + strlen(host_start); + } + } + + size_t host_len = host_end - host_start; + if (host_len >= sizeof(redirect_host)) { + flb_plg_error(ctx->ins, "redirect host too long"); + out_ret = FLB_RETRY; + goto cleanup; + } + memcpy(redirect_host, host_start, host_len); + redirect_host[host_len] = '\0'; + + if (port_start) { + redirect_port = atoi(port_start); + if (redirect_port <= 0 || redirect_port > 65535) { + flb_plg_error(ctx->ins, "invalid redirect port: %d", redirect_port); + out_ret = FLB_RETRY; + goto cleanup; + } + } else { + redirect_port = 80; /* default HTTP port */ + } + + flb_plg_debug(ctx->ins, "redirect to %s:%d", redirect_host, redirect_port); + out_ret = http_put(ctx, redirect_host, redirect_port, + body, body_len, tag, tag_len, label, label_len, "be"); +cleanup:; }Additionally, consider adding a redirect hop counter (pass as parameter, max 5) to prevent infinite loops.
45-59: Use cmt_atomic functions instead of platform-specific atomics.Based on comments from leonardo-albertovich in the PR discussion, you should use the existing
cmt_atomicfunctions from the vendored cmetrics library rather than implementing platform-specific atomic operations here. This avoids spreading#ifdefblocks throughout the plugin code and provides a portable atomic API across all platforms.Replace the custom
sync_fetch_and_addwith cmt_atomic APIs:-#ifdef FLB_SYSTEM_WINDOWS -#include <windows.h> -#endif - -static inline void sync_fetch_and_add(size_t *dest, size_t value) { -#ifdef FLB_SYSTEM_WINDOWS - #ifdef _WIN64 - InterlockedAdd64((LONG64 volatile *) dest, (LONG64) value); - #else - InterlockedAdd((LONG volatile *) dest, (LONG) value); - #endif -#else - __sync_fetch_and_add(dest, value); -#endif -} +#include <cmetrics/cmt_atomic.h>Then update the call sites (lines 408, 425-426, 428) to use
cmt_atomic_fetch_addor the appropriate cmt_atomic function.Based on learnings.
🧹 Nitpick comments (1)
plugins/out_doris/doris.c (1)
225-227: Consider making the "Expect: 100-continue" header configurable.Based on earlier feedback from yunsur, the
Expect: 100-continueheader can cause log duplication in Doris group commit mode with Duplicate Key Model due to retry/retransmission behavior. While you indicated this header is necessary for the Doris HTTP stream load API, consider adding a configuration option to allow users to disable it when it causes issues in their specific Doris setup.Add a boolean config option (e.g.,
use_100_continue) defaulting totruefor backwards compatibility, and conditionally add the header:flb_http_add_header(c, "format", 6, "json", 4); flb_http_add_header(c, "read_json_by_line", 17, "true", 4); - if (strcasecmp(endpoint_type, "fe") == 0) { + if (strcasecmp(endpoint_type, "fe") == 0 && ctx->use_100_continue) { flb_http_add_header(c, "Expect", 6, "100-continue", 12); } flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);And add to
config_map:{ FLB_CONFIG_MAP_BOOL, "use_100_continue", "true", 0, FLB_TRUE, offsetof(struct flb_out_doris, use_100_continue), "Use 'Expect: 100-continue' header for FE requests (may cause duplicates in some Doris configurations)" },
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
plugins/out_doris/doris.c(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-04T12:32:46.030Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:0-0
Timestamp: 2025-09-04T12:32:46.030Z
Learning: In Fluent Bit plugins, avoid creating plugin-local config_map entries that duplicate core functionality. The core already provides Retry_Limit parsed into ins->retry_limit, so plugins should use that directly rather than defining their own retry_limit configuration option.
Applied to files:
plugins/out_doris/doris.c
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.
Applied to files:
plugins/out_doris/doris.c
🧬 Code graph analysis (1)
plugins/out_doris/doris.c (6)
plugins/out_doris/doris_conf.c (2)
flb_doris_conf_create(91-222)flb_doris_conf_destroy(224-241)src/flb_hash_table.c (4)
flb_hash_table_create(99-137)flb_hash_table_destroy(197-215)flb_hash_table_get_ptr(591-603)flb_hash_table_add(401-494)src/flb_upstream.c (4)
flb_upstream_destroy(656-698)flb_upstream_create(290-363)flb_upstream_conn_get(711-844)flb_upstream_conn_release(862-947)src/flb_http_client.c (5)
flb_http_client(814-859)flb_http_add_header(963-995)flb_http_basic_auth(1323-1327)flb_http_do(1572-1632)flb_http_client_destroy(1688-1695)src/flb_pack.c (1)
flb_pack_msgpack_to_json_format(1169-1450)src/flb_sds.c (1)
flb_sds_destroy(389-399)
| u = flb_upstream_create(ctx->u->base.config, | ||
| host, | ||
| port, | ||
| ctx->u->base.flags, | ||
| ctx->u->base.tls_context); | ||
| if (u) { | ||
| flb_hash_table_add(be_pool, address, len, u, 0); | ||
| } else { | ||
| flb_plg_error(ctx->ins, "no doris be connections available to %s:%i", | ||
| host, port); | ||
| return FLB_RETRY; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential upstream leak when hash table insertion fails.
If flb_hash_table_add fails (returns -1), the newly created upstream u is leaked because it's neither stored in the pool nor destroyed. Check the return value and destroy the upstream on failure.
Apply this fix:
u = flb_upstream_create(ctx->u->base.config,
host,
port,
ctx->u->base.flags,
ctx->u->base.tls_context);
if (u) {
- flb_hash_table_add(be_pool, address, len, u, 0);
+ ret = flb_hash_table_add(be_pool, address, len, u, 0);
+ if (ret == -1) {
+ flb_upstream_destroy(u);
+ flb_plg_error(ctx->ins, "failed to add upstream to BE pool for %s:%i",
+ host, port);
+ return FLB_RETRY;
+ }
} else {
flb_plg_error(ctx->ins, "no doris be connections available to %s:%i",
host, port);
return FLB_RETRY;
}🤖 Prompt for AI Agents
In plugins/out_doris/doris.c around lines 185 to 196, the code creates a new
upstream u and calls flb_hash_table_add without checking its return value, which
can leak u on insertion failure; update the code to check flb_hash_table_add's
return value, and if it returns -1, call flb_upstream_destroy(u) to free the
upstream, log the error, and return FLB_RETRY (instead of leaving u alive),
otherwise proceed as before when insertion succeeds.


Add new doris out plugin.
#9501
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent-bit-docs/pull/1483
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests
Chores