Skip to content

Conversation

@jinyongchoi
Copy link
Contributor

@jinyongchoi jinyongchoi commented Jan 9, 2026

Move mk_list_init(&ctx->topics) to early initialization to prevent SIGSEGV crash when rd_kafka_new() fails and flb_out_kafka_destroy() is called with an uninitialized topics list.

Also fix memory leak of rd_kafka_conf_t by calling rd_kafka_conf_destroy() on failure path and in flb_out_kafka_destroy() for other error paths. Set ctx->conf to NULL after successful rd_kafka_new() since ownership is transferred to the rd_kafka_t handle.

Fixes #11358


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • [N/A] Example configuration file for the change
  • [N/A] Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found
cmake -DFLB_CHUNK_TRACE=Off -DFLB_SIMD=Yes -DFLB_OUT_KAFKA=On -DFLB_RELEASE=On -DFLB_DEBUG=On -DFLB_JEMALLOC=On ..
Fluent Bit v5.0.0
* Copyright (C) 2015-2025 The Fluent Bit Authors
* Fluent Bit is a CNCF graduated project under the Fluent organization
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____           _            
|  ___| |                | |   | ___ (_) |         |  ___||  _  |         | |           
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   _|___ \ | |/' |______ __| | _____   __
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \|  /| |______/ _` |/ _ \ \ / /
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V //\__/ /\ |_/ /     | (_| |  __/\ V / 
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)\___/       \__,_|\___| \_/


[2026/01/09 16:35:52.311371051] [ info] [fluent bit] version=5.0.0, commit=7d0f98ef1b, pid=583636
[2026/01/09 16:35:52.343602487] [ info] [storage] ver=1.5.4, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2026/01/09 16:35:52.344007684] [ info] [simd    ] SSE2
[2026/01/09 16:35:52.344790280] [ info] [cmetrics] version=1.0.6
[2026/01/09 16:35:52.345063476] [ info] [ctraces ] version=0.6.6
[2026/01/09 16:35:52.356656738] [ info] [input:dummy:dummy.0] initializing
[2026/01/09 16:35:52.357212303] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2026/01/09 16:35:52.469694003] [error] [output:kafka:kafka.0] failed to create producer: `max.in.flight` must be set <= 5 when `enable.idempotence` is true
[2026/01/09 16:35:52.472582803] [error] [output:kafka:kafka.0] failed to initialize
[2026/01/09 16:35:52.472910412] [error] [output] failed to initialize 'kafka' plugin
[2026/01/09 16:35:52.478619952] [error] [engine] output initialization failed
[2026/01/09 16:35:53.484799683] [ info] [input] pausing dummy.0
==583636== 
==583636== HEAP SUMMARY:
==583636==     in use at exit: 0 bytes in 0 blocks
==583636==   total heap usage: 6,618 allocs, 6,618 frees, 668,313 bytes allocated
==583636== 
==583636== All heap blocks were freed -- no leaks are possible
==583636== 
==583636== For lists of detected and suppressed errors, rerun with: -s
==583636== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [N/A] Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

Bug Fixes

  • Improved Kafka plugin resource initialization and cleanup procedures to enhance stability during error scenarios and prevent potential resource leaks.

✏️ Tip: You can customize this high-level summary in your review settings.

Move mk_list_init(&ctx->topics) to early initialization to prevent
SIGSEGV crash when rd_kafka_new() fails and flb_out_kafka_destroy()
is called with an uninitialized topics list.

Also fix memory leak of rd_kafka_conf_t by calling rd_kafka_conf_destroy()
on failure path and in flb_out_kafka_destroy() for other error paths.
Set ctx->conf to NULL after successful rd_kafka_new() since ownership
is transferred to the rd_kafka_t handle.

Signed-off-by: jinyong.choi <[email protected]>
@coderabbitai
Copy link

coderabbitai bot commented Jan 9, 2026

📝 Walkthrough

Walkthrough

This change fixes a SIGSEGV crash in the Kafka output plugin that occurs when rd_kafka_new() fails during initialization. It reorders initialization steps to ensure the topics list is initialized before potential error cleanup paths, and adds proper resource cleanup on configuration creation failures.

Changes

Cohort / File(s) Summary
Kafka Output Plugin Configuration
plugins/out_kafka/kafka_config.c
Moved topics list initialization to context creation time; added cleanup of configuration on producer creation failure; implemented ownership transfer of configuration after successful rd_kafka_new() call; added safe destruction of existing configuration in destroy function.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

A rabbit hops through init's land,
Where topics lists now proudly stand—
Before the producer's call takes flight,
All resources cleaned up right! 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and specifically describes the main fixes: preventing SIGSEGV crash and memory leak when rd_kafka_new() fails.
Linked Issues check ✅ Passed The code changes directly address all objectives from issue #11358: initializing ctx->topics before rd_kafka_new() prevents SIGSEGV, adding rd_kafka_conf_destroy() on failure paths fixes memory leaks, and setting ctx->conf to NULL after successful rd_kafka_new() ensures proper ownership transfer.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing the rd_kafka_new() failure handling in the Kafka output plugin initialization with no extraneous modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e19e07e and 7d0f98e.

📒 Files selected for processing (1)
  • plugins/out_kafka/kafka_config.c
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (32)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: Agent
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COMPILER_STRICT_POINTER_TYPES=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-centos-7
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: PR - fuzzing test
🔇 Additional comments (4)
plugins/out_kafka/kafka_config.c (4)

52-52: LGTM - Fixes SIGSEGV on initialization failure.

Early initialization of the topics list prevents a crash when rd_kafka_new() fails and flb_kafka_topic_destroy_all() (called from flb_out_kafka_destroy() at line 305) attempts to iterate an uninitialized list.


311-313: LGTM - Completes the memory leak fix.

This safety check ensures ctx->conf is destroyed on early failure paths (e.g., if flb_out_kafka_destroy() is called after flb_kafka_conf_create() succeeds but before rd_kafka_new() is attempted). The null check prevents double-free in cases where the configuration was already destroyed or ownership was transferred.


238-249: LGTM - Correct ownership handling and memory leak fix.

The changes properly handle configuration ownership in both success and failure cases according to librdkafka semantics:

  • Failure path (lines 243-244): When rd_kafka_new() fails, the configuration is destroyed since ownership was never transferred. Setting ctx->conf = NULL prevents double-free in flb_out_kafka_destroy().

  • Success path (lines 248-249): The configuration pointer is nulled because ownership transfers to ctx->kafka.rk, which will destroy it when rd_kafka_destroy() is called.


203-208: Cleanup coverage is properly handled for all error paths.

The error path at line 206 correctly calls flb_out_kafka_destroy(ctx), which will properly clean up ctx->conf (lines 311-313). All other error paths in this section are accounted for: the opaque creation failure at line 206 uses the cleanup function, and no additional allocation/error paths exist between conf creation (line 95) and rd_kafka_new() (line 238) that lack cleanup. The review is complete as written.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a SIGSEGV crash and memory leak in the Kafka output plugin when rd_kafka_new() fails. The fix ensures proper resource cleanup and initialization order.

Key changes:

  • Move mk_list_init(&ctx->topics) to early initialization to prevent SIGSEGV when the uninitialized list is accessed during cleanup
  • Add rd_kafka_conf_destroy() calls on error paths to fix memory leaks when configuration object ownership is not transferred
  • Set ctx->conf to NULL after successful rd_kafka_new() to track ownership transfer

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this handling is correct. Good catch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

out_kafka: Crash(SIGSEGV) on Kafka producer initialization failure

2 participants