@@ -713,7 +713,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
713713{
714714 int i , j , k ;
715715 int ret ;
716- int check = FLB_FALSE ;
716+ int check = 0 ;
717717 int root_type ;
718718 char * out_buf ;
719719 size_t off = 0 ;
@@ -737,17 +737,20 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
737737 if (ret == -1 ) {
738738 /* Is this an incomplete HTTP Request ? */
739739 if (c -> resp .payload_size <= 0 ) {
740- return FLB_TRUE ;
740+ check |= FLB_OS_STATUS_INCOMPLETE ;
741+ return check ;
741742 }
742743
743744 /* Lookup error field */
744745 if (strstr (c -> resp .payload , "\"errors\":false,\"items\":[" )) {
745- return FLB_FALSE ;
746+ check |= FLB_OS_STATUS_SUCCESS ;
747+ return check ;
746748 }
747749
748750 flb_plg_error (ctx -> ins , "could not pack/validate JSON response\n%s" ,
749751 c -> resp .payload );
750- return FLB_TRUE ;
752+ check |= FLB_OS_STATUS_BAD_RESPONSE ;
753+ return check ;
751754 }
752755
753756 /* Lookup error field */
@@ -756,14 +759,15 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
756759 if (ret != MSGPACK_UNPACK_SUCCESS ) {
757760 flb_plg_error (ctx -> ins , "Cannot unpack response to find error\n%s" ,
758761 c -> resp .payload );
759- return FLB_TRUE ;
762+ check |= FLB_OS_STATUS_ERROR_UNPACK ;
763+ return check ;
760764 }
761765
762766 root = result .data ;
763767 if (root .type != MSGPACK_OBJECT_MAP ) {
764768 flb_plg_error (ctx -> ins , "unexpected payload type=%i" ,
765769 root .type );
766- check = FLB_TRUE ;
770+ check |= FLB_OS_STATUS_BAD_TYPE ;
767771 goto done ;
768772 }
769773
@@ -772,7 +776,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
772776 if (key .type != MSGPACK_OBJECT_STR ) {
773777 flb_plg_error (ctx -> ins , "unexpected key type=%i" ,
774778 key .type );
775- check = FLB_TRUE ;
779+ check |= FLB_OS_STATUS_INVAILD_ARGUMENT ;
776780 goto done ;
777781 }
778782
@@ -781,14 +785,14 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
781785 if (val .type != MSGPACK_OBJECT_BOOLEAN ) {
782786 flb_plg_error (ctx -> ins , "unexpected 'error' value type=%i" ,
783787 val .type );
784- check = FLB_TRUE ;
788+ check |= FLB_OS_STATUS_BAD_TYPE ;
785789 goto done ;
786790 }
787791
788792 /* If error == false, we are OK (no errors = FLB_FALSE) */
789793 if (!val .via .boolean ) {
790794 /* no errors */
791- check = FLB_FALSE ;
795+ check |= FLB_OS_STATUS_SUCCESS ;
792796 goto done ;
793797 }
794798 }
@@ -797,7 +801,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
797801 if (val .type != MSGPACK_OBJECT_ARRAY ) {
798802 flb_plg_error (ctx -> ins , "unexpected 'items' value type=%i" ,
799803 val .type );
800- check = FLB_TRUE ;
804+ check |= FLB_OS_STATUS_BAD_TYPE ;
801805 goto done ;
802806 }
803807
@@ -806,22 +810,22 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
806810 if (item .type != MSGPACK_OBJECT_MAP ) {
807811 flb_plg_error (ctx -> ins , "unexpected 'item' outer value type=%i" ,
808812 item .type );
809- check = FLB_TRUE ;
813+ check |= FLB_OS_STATUS_BAD_TYPE ;
810814 goto done ;
811815 }
812816
813817 if (item .via .map .size != 1 ) {
814818 flb_plg_error (ctx -> ins , "unexpected 'item' size=%i" ,
815819 item .via .map .size );
816- check = FLB_TRUE ;
820+ check |= FLB_OS_STATUS_INVAILD_ARGUMENT ;
817821 goto done ;
818822 }
819823
820824 item = item .via .map .ptr [0 ].val ;
821825 if (item .type != MSGPACK_OBJECT_MAP ) {
822826 flb_plg_error (ctx -> ins , "unexpected 'item' inner value type=%i" ,
823827 item .type );
824- check = FLB_TRUE ;
828+ check |= FLB_OS_STATUS_BAD_TYPE ;
825829 goto done ;
826830 }
827831
@@ -830,7 +834,7 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
830834 if (item_key .type != MSGPACK_OBJECT_STR ) {
831835 flb_plg_error (ctx -> ins , "unexpected key type=%i" ,
832836 item_key .type );
833- check = FLB_TRUE ;
837+ check |= FLB_OS_STATUS_BAD_TYPE ;
834838 goto done ;
835839 }
836840
@@ -840,13 +844,17 @@ static int opensearch_error_check(struct flb_opensearch *ctx,
840844 if (item_val .type != MSGPACK_OBJECT_POSITIVE_INTEGER ) {
841845 flb_plg_error (ctx -> ins , "unexpected 'status' value type=%i" ,
842846 item_val .type );
843- check = FLB_TRUE ;
847+ check |= FLB_OS_STATUS_BAD_TYPE ;
844848 goto done ;
845849 }
850+ /* Check for success responses */
851+ if (item_val .via .i64 >= 200 && item_val .via .i64 < 300 )
852+ {
853+ check |= FLB_OS_STATUS_SUCCESS ;
854+ }
846855 /* Check for errors other than version conflict (document already exists) */
847- if (item_val .via .i64 != 409 ) {
848- check = FLB_TRUE ;
849- goto done ;
856+ if (item_val .via .i64 >= 400 && item_val .via .i64 != 409 ) {
857+ check |= FLB_OS_STATUS_ERROR ;
850858 }
851859 }
852860 }
@@ -1005,8 +1013,14 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
10051013 * and lookup the 'error' field.
10061014 */
10071015 ret = opensearch_error_check (ctx , c );
1008- if (ret == FLB_TRUE ) {
1009- /* we got an error */
1016+ if (ret == FLB_OS_STATUS_SUCCESS ) {
1017+ /* Only the SUCCESS flag was set => the batch was completely accepted by OpenSearch. */
1018+ flb_plg_debug (ctx -> ins , "OpenSearch response\n%s" ,
1019+ c -> resp .payload );
1020+ }
1021+ else {
1022+ /* Some errors were discovered while parsing the response.
1023+ * Any error that may coexist with the SUCCESS flag should cause a retry. */
10101024 if (ctx -> trace_error ) {
10111025 /*
10121026 * If trace_error is set, trace the actual
@@ -1035,10 +1049,6 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
10351049 }
10361050 goto retry ;
10371051 }
1038- else {
1039- flb_plg_debug (ctx -> ins , "OpenSearch response\n%s" ,
1040- c -> resp .payload );
1041- }
10421052 }
10431053 else {
10441054 if (signature ) {
0 commit comments