Skip to content

Commit b7bee7f

Browse files
committed
out_opensearch: fix error treatment on response parsing
The error status set was adopted from the out_elasticsearch module and the opensearch_error_check function was modified. Previously, any response containing 'errors=true' was considered faulty if there were any message status different from 409 (particularly, bulk was marked faulty when it contained only 200/201/409 statuses). This behavior caused retries for successfully ingested batches. This commit introduces fixed logic for error treatment. The opensearch_error_check function now iterates over all message statuses in bulk and marks status bits in the 'check' flag. Afterward, the message batch is considered successful if there were only 2xx statuses (including 200, 201) or 409 (version conflict), and scheduled for retry if there were any errors (4xx/5xx statuses except 409, or failed response parsing). Signed-off-by: Castor Sky <csky57@gmail.com>
1 parent 797031c commit b7bee7f

File tree

2 files changed

+43
-24
lines changed

2 files changed

+43
-24
lines changed

plugins/out_opensearch/opensearch.c

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
713713
{
714714
int i, j, k;
715715
int ret;
716-
int check = FLB_FALSE;
716+
int check = 0;
717717
int root_type;
718718
char *out_buf;
719719
size_t off = 0;
@@ -737,17 +737,20 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
737737
if (ret == -1) {
738738
/* Is this an incomplete HTTP Request ? */
739739
if (c->resp.payload_size <= 0) {
740-
return FLB_TRUE;
740+
check |= FLB_OS_STATUS_INCOMPLETE;
741+
return check;
741742
}
742743

743744
/* Lookup error field */
744745
if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
745-
return FLB_FALSE;
746+
check |= FLB_OS_STATUS_SUCCESS;
747+
return check;
746748
}
747749

748750
flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
749751
c->resp.payload);
750-
return FLB_TRUE;
752+
check |= FLB_OS_STATUS_BAD_RESPONSE;
753+
return check;
751754
}
752755

753756
/* Lookup error field */
@@ -756,14 +759,15 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
756759
if (ret != MSGPACK_UNPACK_SUCCESS) {
757760
flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
758761
c->resp.payload);
759-
return FLB_TRUE;
762+
check |= FLB_OS_STATUS_ERROR_UNPACK;
763+
return check;
760764
}
761765

762766
root = result.data;
763767
if (root.type != MSGPACK_OBJECT_MAP) {
764768
flb_plg_error(ctx->ins, "unexpected payload type=%i",
765769
root.type);
766-
check = FLB_TRUE;
770+
check |= FLB_OS_STATUS_BAD_TYPE;
767771
goto done;
768772
}
769773

@@ -772,7 +776,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
772776
if (key.type != MSGPACK_OBJECT_STR) {
773777
flb_plg_error(ctx->ins, "unexpected key type=%i",
774778
key.type);
775-
check = FLB_TRUE;
779+
check |= FLB_OS_STATUS_INVALID_ARGUMENT;
776780
goto done;
777781
}
778782

@@ -781,14 +785,14 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
781785
if (val.type != MSGPACK_OBJECT_BOOLEAN) {
782786
flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
783787
val.type);
784-
check = FLB_TRUE;
788+
check |= FLB_OS_STATUS_BAD_TYPE;
785789
goto done;
786790
}
787791

788792
/* If error == false, we are OK (no errors = FLB_FALSE) */
789793
if (!val.via.boolean) {
790794
/* no errors */
791-
check = FLB_FALSE;
795+
check |= FLB_OS_STATUS_SUCCESS;
792796
goto done;
793797
}
794798
}
@@ -797,7 +801,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
797801
if (val.type != MSGPACK_OBJECT_ARRAY) {
798802
flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
799803
val.type);
800-
check = FLB_TRUE;
804+
check |= FLB_OS_STATUS_BAD_TYPE;
801805
goto done;
802806
}
803807

@@ -806,22 +810,22 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
806810
if (item.type != MSGPACK_OBJECT_MAP) {
807811
flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
808812
item.type);
809-
check = FLB_TRUE;
813+
check |= FLB_OS_STATUS_BAD_TYPE;
810814
goto done;
811815
}
812816

813817
if (item.via.map.size != 1) {
814818
flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
815819
item.via.map.size);
816-
check = FLB_TRUE;
820+
check |= FLB_OS_STATUS_INVALID_ARGUMENT;
817821
goto done;
818822
}
819823

820824
item = item.via.map.ptr[0].val;
821825
if (item.type != MSGPACK_OBJECT_MAP) {
822826
flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
823827
item.type);
824-
check = FLB_TRUE;
828+
check |= FLB_OS_STATUS_BAD_TYPE;
825829
goto done;
826830
}
827831

@@ -830,7 +834,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
830834
if (item_key.type != MSGPACK_OBJECT_STR) {
831835
flb_plg_error(ctx->ins, "unexpected key type=%i",
832836
item_key.type);
833-
check = FLB_TRUE;
837+
check |= FLB_OS_STATUS_BAD_TYPE;
834838
goto done;
835839
}
836840

@@ -840,13 +844,17 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
840844
if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
841845
flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
842846
item_val.type);
843-
check = FLB_TRUE;
847+
check |= FLB_OS_STATUS_BAD_TYPE;
844848
goto done;
845849
}
850+
/* Check for success responses */
851+
if ((item_val.via.i64 >= 200 && item_val.via.i64 < 300) || item_val.via.i64 == 409) {
852+
{
853+
check |= FLB_OS_STATUS_SUCCESS;
854+
}
846855
/* Check for errors other than version conflict (document already exists) */
847-
if (item_val.via.i64 != 409) {
848-
check = FLB_TRUE;
849-
goto done;
856+
if (item_val.via.i64 >= 400 && item_val.via.i64 != 409) {
857+
check |= FLB_OS_STATUS_ERROR;
850858
}
851859
}
852860
}
@@ -1005,8 +1013,14 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
10051013
* and lookup the 'error' field.
10061014
*/
10071015
ret = opensearch_error_check(ctx, c);
1008-
if (ret == FLB_TRUE) {
1009-
/* we got an error */
1016+
if (ret == FLB_OS_STATUS_SUCCESS) {
1017+
/* Only the SUCCESS flag was set => the batch was completely accepted by OpenSearch. */
1018+
flb_plg_debug(ctx->ins, "OpenSearch response\n%s",
1019+
c->resp.payload);
1020+
}
1021+
else {
1022+
/* Some errors were discovered while parsing the response.
1023+
* Any error that may coexist with the SUCCESS flag should cause a retry. */
10101024
if (ctx->trace_error) {
10111025
/*
10121026
* If trace_error is set, trace the actual
@@ -1035,10 +1049,6 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
10351049
}
10361050
goto retry;
10371051
}
1038-
else {
1039-
flb_plg_debug(ctx->ins, "OpenSearch response\n%s",
1040-
c->resp.payload);
1041-
}
10421052
}
10431053
else {
10441054
if (signature) {

plugins/out_opensearch/opensearch.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
#define FLB_OS_COMPRESSION_NONE 0
5656
#define FLB_OS_COMPRESSION_GZIP 1
5757

58+
#define FLB_OS_STATUS_SUCCESS (1 << 0)
59+
#define FLB_OS_STATUS_INCOMPLETE (1 << 1)
60+
#define FLB_OS_STATUS_ERROR_UNPACK (1 << 2)
61+
#define FLB_OS_STATUS_BAD_TYPE (1 << 3)
62+
#define FLB_OS_STATUS_INVALID_ARGUMENT (1 << 4)
63+
#define FLB_OS_STATUS_BAD_RESPONSE (1 << 5)
64+
#define FLB_OS_STATUS_DUPLICATES (1 << 6)
65+
#define FLB_OS_STATUS_ERROR (1 << 7)
66+
5867
struct flb_opensearch {
5968
/* OpenSearch index (database) and type (table) */
6069
flb_sds_t index;

0 commit comments

Comments
 (0)