diff --git a/plugins/out_opensearch/opensearch.c b/plugins/out_opensearch/opensearch.c index db654be6d73..d895380408b 100644 --- a/plugins/out_opensearch/opensearch.c +++ b/plugins/out_opensearch/opensearch.c @@ -713,7 +713,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx, { int i, j, k; int ret; - int check = FLB_FALSE; + int check = 0; int root_type; char *out_buf; size_t off = 0; @@ -737,17 +737,20 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (ret == -1) { /* Is this an incomplete HTTP Request ? */ if (c->resp.payload_size <= 0) { - return FLB_TRUE; + check |= FLB_OS_STATUS_INCOMPLETE; + return check; } /* Lookup error field */ if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) { - return FLB_FALSE; + check |= FLB_OS_STATUS_SUCCESS; + return check; } flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s", c->resp.payload); - return FLB_TRUE; + check |= FLB_OS_STATUS_BAD_RESPONSE; + return check; } /* Lookup error field */ @@ -756,14 +759,15 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (ret != MSGPACK_UNPACK_SUCCESS) { flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", c->resp.payload); - return FLB_TRUE; + check |= FLB_OS_STATUS_ERROR_UNPACK; + goto done; } root = result.data; if (root.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected payload type=%i", root.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } @@ -772,7 +776,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", key.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_INVALID_ARGUMENT; goto done; } @@ -781,14 +785,14 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (val.type != MSGPACK_OBJECT_BOOLEAN) { flb_plg_error(ctx->ins, "unexpected 'error' value type=%i", val.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } /* If error == false, we are OK (no errors = FLB_FALSE) */ if (!val.via.boolean) { /* no errors */ - check = FLB_FALSE; + check |= FLB_OS_STATUS_SUCCESS; goto done; } } @@ -797,7 +801,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (val.type != MSGPACK_OBJECT_ARRAY) { flb_plg_error(ctx->ins, "unexpected 'items' value type=%i", val.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } @@ -806,14 +810,14 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (item.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i", item.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } if (item.via.map.size != 1) { flb_plg_error(ctx->ins, "unexpected 'item' size=%i", item.via.map.size); - check = FLB_TRUE; + check |= FLB_OS_STATUS_INVALID_ARGUMENT; goto done; } @@ -821,7 +825,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (item.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i", item.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } @@ -830,7 +834,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (item_key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", item_key.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } @@ -840,13 +844,16 @@ static int opensearch_error_check(struct flb_opensearch *ctx, if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { flb_plg_error(ctx->ins, "unexpected 'status' value type=%i", item_val.type); - check = FLB_TRUE; + check |= FLB_OS_STATUS_BAD_TYPE; goto done; } + /* Check for success responses */ + if ((item_val.via.i64 >= 200 && item_val.via.i64 < 300) || item_val.via.i64 == 409) { + check |= FLB_OS_STATUS_SUCCESS; + } /* Check for errors other than version conflict (document already exists) */ - if (item_val.via.i64 != 409) { - check = FLB_TRUE; - goto done; + if (item_val.via.i64 >= 400 && item_val.via.i64 != 409) { + check |= FLB_OS_STATUS_ERROR; } } } @@ -1005,8 +1012,14 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk, * and lookup the 'error' field. */ ret = opensearch_error_check(ctx, c); - if (ret == FLB_TRUE) { - /* we got an error */ + if (ret == FLB_OS_STATUS_SUCCESS) { + /* Only the SUCCESS flag was set => the batch was completely accepted by OpenSearch. */ + flb_plg_debug(ctx->ins, "OpenSearch response\n%s", + c->resp.payload); + } + else { + /* Some errors were discovered while parsing the response. + * Any error that may coexist with the SUCCESS flag should cause a retry. */ if (ctx->trace_error) { /* * If trace_error is set, trace the actual @@ -1035,10 +1048,6 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk, } goto retry; } - else { - flb_plg_debug(ctx->ins, "OpenSearch response\n%s", - c->resp.payload); - } } else { if (signature) { diff --git a/plugins/out_opensearch/opensearch.h b/plugins/out_opensearch/opensearch.h index a0453fb019d..0d9c64ed8db 100644 --- a/plugins/out_opensearch/opensearch.h +++ b/plugins/out_opensearch/opensearch.h @@ -55,6 +55,15 @@ #define FLB_OS_COMPRESSION_NONE 0 #define FLB_OS_COMPRESSION_GZIP 1 +#define FLB_OS_STATUS_SUCCESS (1 << 0) +#define FLB_OS_STATUS_INCOMPLETE (1 << 1) +#define FLB_OS_STATUS_ERROR_UNPACK (1 << 2) +#define FLB_OS_STATUS_BAD_TYPE (1 << 3) +#define FLB_OS_STATUS_INVALID_ARGUMENT (1 << 4) +#define FLB_OS_STATUS_BAD_RESPONSE (1 << 5) +#define FLB_OS_STATUS_DUPLICATES (1 << 6) +#define FLB_OS_STATUS_ERROR (1 << 7) + struct flb_opensearch { /* OpenSearch index (database) and type (table) */ flb_sds_t index;