@@ -713,7 +713,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
713713{
714714 int i , j , k ;
715715 int ret ;
716- int check = FLB_FALSE ;
716+ int check = 0 ;
717717 int root_type ;
718718 char * out_buf ;
719719 size_t off = 0 ;
@@ -737,17 +737,20 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
737737 if (ret == -1 ) {
738738 /* Is this an incomplete HTTP Request ? */
739739 if (c -> resp .payload_size <= 0 ) {
740- return FLB_TRUE ;
740+ check |= FLB_OS_STATUS_INCOMPLETE ;
741+ return check ;
741742 }
742743
743744 /* Lookup error field */
744745 if (strstr (c -> resp .payload , "\"errors\":false,\"items\":[" )) {
745- return FLB_FALSE ;
746+ check |= FLB_OS_STATUS_SUCCESS ;
747+ return check ;
746748 }
747749
748750 flb_plg_error (ctx -> ins , "could not pack/validate JSON response\n%s" ,
749751 c -> resp .payload );
750- return FLB_TRUE ;
752+ check |= FLB_OS_STATUS_BAD_RESPONSE ;
753+ return check ;
751754 }
752755
753756 /* Lookup error field */
@@ -756,14 +759,15 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
756759 if (ret != MSGPACK_UNPACK_SUCCESS ) {
757760 flb_plg_error (ctx -> ins , "Cannot unpack response to find error\n%s" ,
758761 c -> resp .payload );
759- return FLB_TRUE ;
762+ check |= FLB_OS_STATUS_ERROR_UNPACK ;
763+ return check ;
760764 }
761765
762766 root = result .data ;
763767 if (root .type != MSGPACK_OBJECT_MAP ) {
764768 flb_plg_error (ctx -> ins , "unexpected payload type=%i" ,
765769 root .type );
766- check = FLB_TRUE ;
770+ check |= FLB_OS_STATUS_BAD_TYPE ;
767771 goto done ;
768772 }
769773
@@ -772,7 +776,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
772776 if (key .type != MSGPACK_OBJECT_STR ) {
773777 flb_plg_error (ctx -> ins , "unexpected key type=%i" ,
774778 key .type );
775- check = FLB_TRUE ;
779+ check |= FLB_OS_STATUS_INVALID_ARGUMENT ;
776780 goto done ;
777781 }
778782
@@ -781,14 +785,14 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
781785 if (val .type != MSGPACK_OBJECT_BOOLEAN ) {
782786 flb_plg_error (ctx -> ins , "unexpected 'error' value type=%i" ,
783787 val .type );
784- check = FLB_TRUE ;
788+ check |= FLB_OS_STATUS_BAD_TYPE ;
785789 goto done ;
786790 }
787791
788792 /* If error == false, we are OK (no errors = FLB_FALSE) */
789793 if (!val .via .boolean ) {
790794 /* no errors */
791- check = FLB_FALSE ;
795+ check |= FLB_OS_STATUS_SUCCESS ;
792796 goto done ;
793797 }
794798 }
@@ -797,7 +801,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
797801 if (val .type != MSGPACK_OBJECT_ARRAY ) {
798802 flb_plg_error (ctx -> ins , "unexpected 'items' value type=%i" ,
799803 val .type );
800- check = FLB_TRUE ;
804+ check |= FLB_OS_STATUS_BAD_TYPE ;
801805 goto done ;
802806 }
803807
@@ -806,22 +810,22 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
806810 if (item .type != MSGPACK_OBJECT_MAP ) {
807811 flb_plg_error (ctx -> ins , "unexpected 'item' outer value type=%i" ,
808812 item .type );
809- check = FLB_TRUE ;
813+ check |= FLB_OS_STATUS_BAD_TYPE ;
810814 goto done ;
811815 }
812816
813817 if (item .via .map .size != 1 ) {
814818 flb_plg_error (ctx -> ins , "unexpected 'item' size=%i" ,
815819 item .via .map .size );
816- check = FLB_TRUE ;
820+ check |= FLB_OS_STATUS_INVALID_ARGUMENT ;
817821 goto done ;
818822 }
819823
820824 item = item .via .map .ptr [0 ].val ;
821825 if (item .type != MSGPACK_OBJECT_MAP ) {
822826 flb_plg_error (ctx -> ins , "unexpected 'item' inner value type=%i" ,
823827 item .type );
824- check = FLB_TRUE ;
828+ check |= FLB_OS_STATUS_BAD_TYPE ;
825829 goto done ;
826830 }
827831
@@ -830,7 +834,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
830834 if (item_key .type != MSGPACK_OBJECT_STR ) {
831835 flb_plg_error (ctx -> ins , "unexpected key type=%i" ,
832836 item_key .type );
833- check = FLB_TRUE ;
837+ check |= FLB_OS_STATUS_BAD_TYPE ;
834838 goto done ;
835839 }
836840
@@ -840,13 +844,16 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
840844 if (item_val .type != MSGPACK_OBJECT_POSITIVE_INTEGER ) {
841845 flb_plg_error (ctx -> ins , "unexpected 'status' value type=%i" ,
842846 item_val .type );
843- check = FLB_TRUE ;
847+ check |= FLB_OS_STATUS_BAD_TYPE ;
844848 goto done ;
845849 }
850+ /* Check for success responses */
851+ if ((item_val .via .i64 >= 200 && item_val .via .i64 < 300 ) || item_val .via .i64 == 409 ) {
852+ check |= FLB_OS_STATUS_SUCCESS ;
853+ }
846854 /* Check for errors other than version conflict (document already exists) */
847- if (item_val .via .i64 != 409 ) {
848- check = FLB_TRUE ;
849- goto done ;
855+ if (item_val .via .i64 >= 400 && item_val .via .i64 != 409 ) {
856+ check |= FLB_OS_STATUS_ERROR ;
850857 }
851858 }
852859 }
@@ -1005,8 +1012,14 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
10051012 * and lookup the 'error' field.
10061013 */
10071014 ret = opensearch_error_check (ctx , c );
1008- if (ret == FLB_TRUE ) {
1009- /* we got an error */
1015+ if (ret == FLB_OS_STATUS_SUCCESS ) {
1016+ /* Only the SUCCESS flag was set => the batch was completely accepted by OpenSearch. */
1017+ flb_plg_debug (ctx -> ins , "OpenSearch response\n%s" ,
1018+ c -> resp .payload );
1019+ }
1020+ else {
1021+ /* Some errors were discovered while parsing the response.
1022+ * Any error that may coexist with the SUCCESS flag should cause a retry. */
10101023 if (ctx -> trace_error ) {
10111024 /*
10121025 * If trace_error is set, trace the actual
@@ -1035,10 +1048,6 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
10351048 }
10361049 goto retry ;
10371050 }
1038- else {
1039- flb_plg_debug (ctx -> ins , "OpenSearch response\n%s" ,
1040- c -> resp .payload );
1041- }
10421051 }
10431052 else {
10441053 if (signature ) {
0 commit comments