Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions plugins/out_opensearch/opensearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
{
int i, j, k;
int ret;
int check = FLB_FALSE;
int check = 0;
int root_type;
char *out_buf;
size_t off = 0;
Expand All @@ -737,17 +737,20 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (ret == -1) {
/* Is this an incomplete HTTP Request ? */
if (c->resp.payload_size <= 0) {
return FLB_TRUE;
check |= FLB_OS_STATUS_INCOMPLETE;
return check;
}

/* Lookup error field */
if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
return FLB_FALSE;
check |= FLB_OS_STATUS_SUCCESS;
return check;
}

flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
c->resp.payload);
return FLB_TRUE;
check |= FLB_OS_STATUS_BAD_RESPONSE;
return check;
}

/* Lookup error field */
Expand All @@ -756,14 +759,15 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (ret != MSGPACK_UNPACK_SUCCESS) {
flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
c->resp.payload);
return FLB_TRUE;
check |= FLB_OS_STATUS_ERROR_UNPACK;
goto done;
}

root = result.data;
if (root.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected payload type=%i",
root.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -772,7 +776,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (key.type != MSGPACK_OBJECT_STR) {
flb_plg_error(ctx->ins, "unexpected key type=%i",
key.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_INVALID_ARGUMENT;
goto done;
}

Expand All @@ -781,14 +785,14 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (val.type != MSGPACK_OBJECT_BOOLEAN) {
flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
val.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}

/* If error == false, we are OK (no errors = FLB_FALSE) */
if (!val.via.boolean) {
/* no errors */
check = FLB_FALSE;
check |= FLB_OS_STATUS_SUCCESS;
goto done;
}
}
Expand All @@ -797,7 +801,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (val.type != MSGPACK_OBJECT_ARRAY) {
flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
val.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -806,22 +810,22 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (item.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
item.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}

if (item.via.map.size != 1) {
flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
item.via.map.size);
check = FLB_TRUE;
check |= FLB_OS_STATUS_INVALID_ARGUMENT;
goto done;
}

item = item.via.map.ptr[0].val;
if (item.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
item.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -830,7 +834,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (item_key.type != MSGPACK_OBJECT_STR) {
flb_plg_error(ctx->ins, "unexpected key type=%i",
item_key.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -840,13 +844,16 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
item_val.type);
check = FLB_TRUE;
check |= FLB_OS_STATUS_BAD_TYPE;
goto done;
}
/* Check for success responses */
if ((item_val.via.i64 >= 200 && item_val.via.i64 < 300) || item_val.via.i64 == 409) {
check |= FLB_OS_STATUS_SUCCESS;
}
/* Check for errors other than version conflict (document already exists) */
if (item_val.via.i64 != 409) {
check = FLB_TRUE;
goto done;
if (item_val.via.i64 >= 400 && item_val.via.i64 != 409) {
check |= FLB_OS_STATUS_ERROR;
Comment on lines 854 to +856

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark 409-only bulk responses as success

The new status-flag logic never sets any flag for status == 409: the error path explicitly excludes 409, but there is no corresponding success/duplicate flag, so a response containing only 409s leaves check at 0. Since cb_opensearch_flush only accepts ret == FLB_OS_STATUS_SUCCESS, that 409-only batch will be treated as an error and retried indefinitely even though it is a valid “document already exists” outcome. This regresses the previously-accepted behavior for conflict-only batches and reintroduces unnecessary retries for that scenario.

Useful? React with 👍 / 👎.

}
}
}
Expand Down Expand Up @@ -1005,8 +1012,14 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
* and lookup the 'error' field.
*/
ret = opensearch_error_check(ctx, c);
if (ret == FLB_TRUE) {
/* we got an error */
if (ret == FLB_OS_STATUS_SUCCESS) {
/* Only the SUCCESS flag was set => the batch was completely accepted by OpenSearch. */
flb_plg_debug(ctx->ins, "OpenSearch response\n%s",
c->resp.payload);
}
else {
/* Some errors were discovered while parsing the response.
* Any error that may coexist with the SUCCESS flag should cause a retry. */
if (ctx->trace_error) {
/*
* If trace_error is set, trace the actual
Expand Down Expand Up @@ -1035,10 +1048,6 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
}
goto retry;
}
else {
flb_plg_debug(ctx->ins, "OpenSearch response\n%s",
c->resp.payload);
}
}
else {
if (signature) {
Expand Down
9 changes: 9 additions & 0 deletions plugins/out_opensearch/opensearch.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
#define FLB_OS_COMPRESSION_NONE 0
#define FLB_OS_COMPRESSION_GZIP 1

#define FLB_OS_STATUS_SUCCESS (1 << 0)
#define FLB_OS_STATUS_INCOMPLETE (1 << 1)
#define FLB_OS_STATUS_ERROR_UNPACK (1 << 2)
#define FLB_OS_STATUS_BAD_TYPE (1 << 3)
#define FLB_OS_STATUS_INVALID_ARGUMENT (1 << 4)
#define FLB_OS_STATUS_BAD_RESPONSE (1 << 5)
#define FLB_OS_STATUS_DUPLICATES (1 << 6)
#define FLB_OS_STATUS_ERROR (1 << 7)

struct flb_opensearch {
/* OpenSearch index (database) and type (table) */
flb_sds_t index;
Expand Down