Skip to content

Conversation

@castorsky
Copy link

@castorsky castorsky commented Jan 20, 2026

The error status set was adopted from the out_elasticsearch module and the opensearch_error_check function was modified.

Previously, any response containing 'errors=true' was considered faulty if there were any message status different from 409 (particularly, bulk was marked faulty when it contained only 200/201/409 statuses). This behavior caused retries for successfully ingested batches. Every response with mixed 200/201/409 statuses was scheduled for retry that always succeeded in the first try. Logs were filled with lines like these:

[2026/01/15 20:46:06.385812697] [debug] [output:opensearch:opensearch.0] HTTP Status=200 URI=/_bulk
{"took":195,"errors":true,<redacted>}
[2026/01/15 20:46:06.402135598] [debug] [retry] new retry created for task_id=0 attempts=1
[2026/01/15 20:46:06.402043602] [debug] [out flush] cb_destroy coro_id=14
[2026/01/15 20:46:06.403448167] [ warn] [engine] failed to flush chunk '1-1768509961.200379496.flb', retry in 7 seconds: task_id=0, input=kafka.0 > output=opensearch.0 (out_id=0)
...
[2026/01/15 20:46:13.295076881] [debug] [output:opensearch:opensearch.0] HTTP Status=200 URI=/_bulk
[2026/01/15 20:46:13.308202365] [debug] [output:opensearch:opensearch.0] OpenSearch response
{"took":82,"errors":true,<redacted>}
[2026/01/15 20:46:13.308491510] [debug] [upstream] KA connection #148 to ingest.opensearch.ev.local:9200 is now available
[2026/01/15 20:46:13.308559145] [debug] [out flush] cb_destroy coro_id=15
[2026/01/15 20:46:13.310492200] [ info] [engine] flush chunk '1-1768509961.200379496.flb' succeeded at retry 1: task_id=0, input=kafka.0 > output=opensearch.0 (out_id=0)
[2026/01/15 20:46:13.310540993] [debug] [task] destroy task=0x7f79c6a298c0 (task_id=0)
[2026/01/15 20:46:13.310569102] [debug] [input chunk] remove chunk 1-1768509961.200379496.flb with 2166784 bytes from plugin opensearch.0, the updated fs_chunks_size is 790528 bytes

Examples:

This response was considered "faulty" and scheduled for retry.
{
  "took": 195,
  "errors": true,
  "items": [
      {
          "create": {
              "_index": ".ds-test-stream-000681",
              "_id": "61A24A3D-5B91-C85C-E5C1-D7EFF24CB9BC",
              "_version": 1,
              "result": "created",
              "_shards": {
                  "total": 2,
                  "successful": 2,
                  "failed": 0
              },
              "_seq_no": 12783141,
              "_primary_term": 1,
              "status": 201
          }
      },
      {
          "create": {
              "_index": ".ds-test-inventory-000678",
              "_id": "45952ABA-9330-DBC8-3668-DDB6631921D2",
              "status": 409,
              "error": {
                  "type": "version_conflict_engine_exception",
                  "reason": "[45952ABA-9330-DBC8-3668-DDB6631921D2]: version conflict, document already exists (current version [1])",
                  "index": ".ds-test-inventory-000678",
                  "shard": "0",
                  "index_uuid": "9pz3nYu9Qi2KUX2lIK9uLQ"
              }
          }
      }
  ]
}
This is result response for retry task. It has only 409 statuses.
{
  "took": 82,
  "errors": true,
  "items": [
      {
          "create": {
              "_index": ".ds-test-stream-000681",
              "_id": "61A24A3D-5B91-C85C-E5C1-D7EFF24CB9BC",
              "status": 409,
              "error": {
                  "reason": "[61A24A3D-5B91-C85C-E5C1-D7EFF24CB9BC]: version conflict, document already exists (current version [1])",
                  "index": ".ds-test-stream-000681",
                  "shard": "0",
                  "index_uuid": "gBhqBE5hS9iwb2DDxh8QyQ"
              }
          }
      },
      {
          "create": {
              "_index": ".ds-test-inventory-000678",
              "_id": "45952ABA-9330-DBC8-3668-DDB6631921D2",
              "status": 409,
              "error": {
                  "type": "version_conflict_engine_exception",
                  "reason": "[45952ABA-9330-DBC8-3668-DDB6631921D2]: version conflict, document already exists (current version [1])",
                  "index": ".ds-test-inventory-000678",
                  "shard": "0",
                  "index_uuid": "9pz3nYu9Qi2KUX2lIK9uLQ"
              }
          }
      }
  ]
}

This commit introduces fixed logic for error treatment. The opensearch_error_check function now iterates over all message statuses in bulk and marks status bits in the 'check' flag. Afterward, the message batch is considered successful if there were only 2xx statuses (including 200, 201) or 409 (version conflict), and scheduled for retry if there were any errors (4xx/5xx statuses except 409, or failed response parsing).

The first response from examples is now considered as successful and won't be scheduled for retry.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes

    • More accurate detection of partial, malformed, or mixed-result OpenSearch responses; non-pure-success results now trigger retries and safer handling.
  • Improvements

    • Richer status reporting and refined retry behavior for more reliable deliveries.
    • Improved diagnostic tracing and clearer error logging to aid troubleshooting and surface parsing issues.

✏️ Tip: You can customize this high-level summary in your review settings.

@castorsky castorsky requested a review from a team as a code owner January 20, 2026 01:37
@coderabbitai
Copy link

coderabbitai bot commented Jan 20, 2026

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

The OpenSearch plugin replaces boolean error returns with a bit-flag status accumulator (FLB_OS_STATUS_*). opensearch_error_check() now sets flag bits instead of returning booleans; cb_opensearch_flush() treats only FLB_OS_STATUS_SUCCESS as pure success and triggers diagnostics/retry on any other flag combination.

Changes

Cohort / File(s) Summary
Status flag definitions
plugins/out_opensearch/opensearch.h
Adds bit-flag macros: FLB_OS_STATUS_SUCCESS, FLB_OS_STATUS_INCOMPLETE, FLB_OS_STATUS_ERROR_UNPACK, FLB_OS_STATUS_BAD_TYPE, FLB_OS_STATUS_INVALID_ARGUMENT, FLB_OS_STATUS_BAD_RESPONSE, FLB_OS_STATUS_DUPLICATES, FLB_OS_STATUS_ERROR.
Error handling refactor
plugins/out_opensearch/opensearch.c
Refactors opensearch_error_check() to accumulate status flags (removes early boolean returns); updates cb_opensearch_flush() to treat only FLB_OS_STATUS_SUCCESS as success, log response on pure success, and route any other flag combination into diagnostic handling and retry paths.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested labels

backport to v4.1.x

Suggested reviewers

  • cosmo0920

Poem

🐇 I nibbled flags beneath the moon,
Bits and hops in tidy tune,
SUCCESS glowed single, others stood,
I hopped the paths of check and brood,
A rabbit sings: the status blooms 🌙✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: fixing error treatment during response parsing in the OpenSearch output plugin.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 29d8c002e8

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 855 to +856
/* Check for errors other than version conflict (document already exists) */
if (item_val.via.i64 != 409) {
check = FLB_TRUE;
goto done;
if (item_val.via.i64 >= 400 && item_val.via.i64 != 409) {
check |= FLB_OS_STATUS_ERROR;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark 409-only bulk responses as success

The new status-flag logic never sets any flag for status == 409: the error path explicitly excludes 409, but there is no corresponding success/duplicate flag, so a response containing only 409s leaves check at 0. Since cb_opensearch_flush only accepts ret == FLB_OS_STATUS_SUCCESS, that 409-only batch will be treated as an error and retried indefinitely even though it is a valid “document already exists” outcome. This regresses the previously-accepted behavior for conflict-only batches and reintroduces unnecessary retries for that scenario.

Useful? React with 👍 / 👎.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/out_opensearch/opensearch.c (1)

756-763: Avoid leaking the msgpack buffer on unpack failure.

msgpack_unpack_next failure returns before out_buf is freed and result is destroyed, leaking memory on malformed responses.

🐛 Proposed fix
-    ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
-    if (ret != MSGPACK_UNPACK_SUCCESS) {
-        flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
-                      c->resp.payload);
-        check |= FLB_OS_STATUS_ERROR_UNPACK;
-        return check;
-    }
+    ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
+    if (ret != MSGPACK_UNPACK_SUCCESS) {
+        flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
+                      c->resp.payload);
+        check |= FLB_OS_STATUS_ERROR_UNPACK;
+        goto done;
+    }
🤖 Fix all issues with AI agents
In `@plugins/out_opensearch/opensearch.c`:
- Around line 850-858: The code treats 409 responses as non-success so a batch
of only 409s yields zero and triggers retry; modify the success detection so
that item_val.via.i64 values in the 200–299 range OR equal to 409 set
FLB_OS_STATUS_SUCCESS (i.e., update the first if to include == 409), while
keeping the existing error detection (the second if) that sets
FLB_OS_STATUS_ERROR only for item_val.via.i64 >= 400 && item_val.via.i64 != 409;
update the checks around item_val.via.i64, FLB_OS_STATUS_SUCCESS and
FLB_OS_STATUS_ERROR accordingly so 409-only batches are considered successful.
🧹 Nitpick comments (1)
plugins/out_opensearch/opensearch.h (1)

58-65: Fix typo in new status macro before it becomes public API debt.

FLB_OS_STATUS_INVAILD_ARGUMENT is misspelled; since this is newly introduced and public, consider renaming to FLB_OS_STATUS_INVALID_ARGUMENT and updating all usages to avoid propagating the typo.

♻️ Proposed rename
-#define FLB_OS_STATUS_INVAILD_ARGUMENT (1 << 4)
+#define FLB_OS_STATUS_INVALID_ARGUMENT (1 << 4)
-            check |= FLB_OS_STATUS_INVAILD_ARGUMENT;
+            check |= FLB_OS_STATUS_INVALID_ARGUMENT;
...
-                    check |= FLB_OS_STATUS_INVAILD_ARGUMENT;
+                    check |= FLB_OS_STATUS_INVALID_ARGUMENT;

@castorsky castorsky force-pushed the out_opensearch_bulk_errors_treat branch 2 times, most recently from d0b49fe to c138ab5 Compare January 20, 2026 09:03
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/out_opensearch/opensearch.c (1)

757-764: Memory leak on early return.

When msgpack_unpack_next fails, the function returns without freeing out_buf which was allocated by flb_pack_json on line 735.

Proposed fix
     ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
     if (ret != MSGPACK_UNPACK_SUCCESS) {
         flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
                       c->resp.payload);
+        flb_free(out_buf);
         check |= FLB_OS_STATUS_ERROR_UNPACK;
         return check;
     }
🤖 Fix all issues with AI agents
In `@plugins/out_opensearch/opensearch.c`:
- Line 779: The status flag name FLB_OS_STATUS_INVAILD_ARGUMENT is misspelled;
update its definition in the header (opensearch.h) to
FLB_OS_STATUS_INVALID_ARGUMENT and then replace all usages (e.g., the occurrence
in opensearch.c where check |= FLB_OS_STATUS_INVAILD_ARGUMENT) to use the
corrected symbol FLB_OS_STATUS_INVALID_ARGUMENT so the identifier is consistent
across the codebase.
♻️ Duplicate comments (1)
plugins/out_opensearch/opensearch.c (1)

850-858: 409-only batches still trigger retry (previously flagged).

The logic correctly excludes 409 from the ERROR flag, but 409 responses also don't set the SUCCESS flag. A batch with only 409s returns check = 0, which fails the ret == FLB_OS_STATUS_SUCCESS check in cb_opensearch_flush and triggers unnecessary retries.

Per the PR objectives, 409 (version conflict / document already exists) should be treated as a successful outcome.

Proposed fix
                         /* Check for success responses */
-                        if (item_val.via.i64 >= 200 && item_val.via.i64 < 300)
+                        if ((item_val.via.i64 >= 200 && item_val.via.i64 < 300) ||
+                            item_val.via.i64 == 409)
                         {
                             check |= FLB_OS_STATUS_SUCCESS;
                         }

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@plugins/out_opensearch/opensearch.c`:
- Around line 762-763: The early return leaks the JSON buffer allocated by
flb_pack_json (out_buf) when msgpack_unpack_next fails; before returning
FLB_OS_STATUS_ERROR_UNPACK in the failure path near the msgpack_unpack_next
check, free out_buf (and set pointer to NULL or ensure no double-free), and
perform any other necessary cleanup of resources allocated after flb_pack_json
so the function (the block handling
msgpack_unpack_next/FLB_OS_STATUS_ERROR_UNPACK) always frees out_buf on all
return paths.
♻️ Duplicate comments (2)
plugins/out_opensearch/opensearch.h (1)

62-62: Typo: INVAILD should be INVALID.

The macro FLB_OS_STATUS_INVAILD_ARGUMENT contains a typo. This should be corrected to FLB_OS_STATUS_INVALID_ARGUMENT for clarity and consistency.

plugins/out_opensearch/opensearch.c (1)

850-858: 409-only batches will be retried indefinitely.

The current logic sets FLB_OS_STATUS_SUCCESS only for 2xx statuses (lines 851-854). While 409 is correctly excluded from FLB_OS_STATUS_ERROR (line 856), it doesn't set any flag. A batch containing only 409 responses will return check = 0, which fails the ret == FLB_OS_STATUS_SUCCESS check at line 1016, causing an infinite retry loop.

Per the PR objective, 409 (version conflict) should be treated as success. Either include 409 in the success condition or use the defined FLB_OS_STATUS_DUPLICATES flag and update the flush logic accordingly.

Proposed fix
                         /* Check for success responses */
-                        if (item_val.via.i64 >= 200 && item_val.via.i64 < 300)
+                        if ((item_val.via.i64 >= 200 && item_val.via.i64 < 300) ||
+                            item_val.via.i64 == 409)
                         {
                             check |= FLB_OS_STATUS_SUCCESS;
                         }
🧹 Nitpick comments (1)
plugins/out_opensearch/opensearch.h (1)

58-65: Consider using FLB_OS_STATUS_DUPLICATES for 409 (version conflict) responses.

The FLB_OS_STATUS_DUPLICATES flag is defined but never used in the implementation. Based on the PR objective to treat 409 (version conflict / document already exists) as a success condition, this flag seems designed for that purpose but was not integrated into the error checking logic.

The error status set was adopted from the out_elasticsearch module and the
opensearch_error_check function was modified.

Previously, any response containing 'errors=true' was considered faulty
if there were any message status different from 409 (particularly, bulk was
marked faulty when it contained only 200/201/409 statuses). This behavior
caused retries for successfully ingested batches.

This commit introduces fixed logic for error treatment. The
opensearch_error_check function now iterates over all message statuses in bulk
and marks status bits in the 'check' flag. Afterward, the message batch is
considered successful if there were only 2xx statuses (including 200, 201) or
409 (version conflict), and scheduled for retry if there were any errors
(4xx/5xx statuses except 409, or failed response parsing).

Signed-off-by: Castor Sky <csky57@gmail.com>
Used cleanup procedure for 'out_buf' and 'result' for the case when errors of JSON parsing occurred.

Signed-off-by: Castor Sky <csky57@gmail.com>
@castorsky castorsky force-pushed the out_opensearch_bulk_errors_treat branch from ec2cd6c to 3abd99f Compare January 22, 2026 01:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant