Skip to content

Commit 1eb324a

Browse files
cosmo0920edsiper
authored andcommitted
out_es: tests: Add HTTP response testing
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 69d148e commit 1eb324a

File tree

3 files changed

+268
-0
lines changed

3 files changed

+268
-0
lines changed

plugins/out_es/es.c

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
985985
FLB_OUTPUT_RETURN(FLB_RETRY);
986986
}
987987

988+
static int elasticsearch_response_test(struct flb_config *config,
989+
void *plugin_context,
990+
int status,
991+
const void *data, size_t bytes,
992+
void **out_data, size_t *out_size)
993+
{
994+
int ret = 0;
995+
struct flb_elasticsearch *ctx = plugin_context;
996+
struct flb_connection *u_conn;
997+
struct flb_http_client *c;
998+
size_t b_sent;
999+
1000+
/* Not retrieve upstream connection */
1001+
u_conn = NULL;
1002+
1003+
/* Compose HTTP Client request (dummy client) */
1004+
c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri,
1005+
NULL, 0, NULL, 0, NULL, 0);
1006+
1007+
flb_http_buffer_size(c, ctx->buffer_size);
1008+
1009+
/* Just stubbing the HTTP responses */
1010+
flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL);
1011+
1012+
ret = flb_http_do(c, &b_sent);
1013+
if (ret != 0) {
1014+
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
1015+
goto error;
1016+
}
1017+
if (ret != 0) {
1018+
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
1019+
goto error;
1020+
}
1021+
else {
1022+
/* The request was issued successfully, validate the 'error' field */
1023+
flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
1024+
if (c->resp.status != 200 && c->resp.status != 201) {
1025+
if (c->resp.payload_size > 0) {
1026+
flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n",
1027+
c->resp.status, ctx->uri, c->resp.payload);
1028+
}
1029+
else {
1030+
flb_plg_error(ctx->ins, "HTTP status=%i URI=%s",
1031+
c->resp.status, ctx->uri);
1032+
}
1033+
goto error;
1034+
}
1035+
1036+
if (c->resp.payload_size > 0) {
1037+
/*
1038+
* Elasticsearch payload should be JSON, we convert it to msgpack
1039+
* and lookup the 'error' field.
1040+
*/
1041+
ret = elasticsearch_error_check(ctx, c);
1042+
}
1043+
else {
1044+
goto error;
1045+
}
1046+
}
1047+
1048+
/* Cleanup */
1049+
flb_http_client_destroy(c);
1050+
1051+
return ret;
1052+
1053+
error:
1054+
/* Cleanup */
1055+
flb_http_client_destroy(c);
1056+
1057+
return -2;
1058+
}
1059+
9881060
static int cb_es_exit(void *data, struct flb_config *config)
9891061
{
9901062
struct flb_elasticsearch *ctx = data;
@@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = {
12311303

12321304
/* Test */
12331305
.test_formatter.callback = elasticsearch_format,
1306+
.test_response.callback = elasticsearch_response_test,
12341307

12351308
/* Plugin flags */
12361309
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,

tests/runtime/data/es/json_es.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,37 @@
1515
#define JSON_DOTS \
1616
"[1448403340," \
1717
"{\".le.vel\":\"error\", \".fo.o\":[{\".o.k\": [{\".b.ar\": \"baz\"}]}]}]"
18+
19+
#define JSON_RESPONSE_SUCCESSES "{\"errors\":false,\"took\":0,\"items\":[" \
20+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dcfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
21+
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":6,\"_primary_term\":1,\"status\":201}}," \
22+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dsfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
23+
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \
24+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"d8fJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
25+
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}," \
26+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"eMfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \
27+
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":9,\"_primary_term\":1,\"status\":201}}]}"
28+
29+
#define JSON_RESPONSE_SUCCESSES_SIZE 783
30+
31+
#define JSON_RESPONSE_PARTIALLY_SUCCESS "{\"errors\":true,\"took\":316737025,\"items\":" \
32+
"[{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"hxELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \
33+
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \
34+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iBELapEB_XqxG5Ydupgb\",\"status\":400," \
35+
"\"error\":{\"type\":\"document_parsing_exception\"," \
36+
"\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. " \
37+
"Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \
38+
"\"caused_by\":{\"type\":\"document_parsing_exception\"," \
39+
"\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \
40+
"Use the index API request parameters.\"}}}}," \
41+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iRELapEB_XqxG5Ydupgb\",\"status\":400," \
42+
"\"error\":{\"type\":\"document_parsing_exception\"," \
43+
"\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. " \
44+
"Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \
45+
"\"caused_by\":{\"type\":\"document_parsing_exception\"," \
46+
"\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \
47+
"Use the index API request parameters.\"}}}}," \
48+
"{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"ihELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \
49+
"\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}]}"
50+
51+
#define JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE 1322

tests/runtime/out_elasticsearch.c

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,164 @@ void flb_test_logstash_prefix_separator()
799799
flb_destroy(ctx);
800800
}
801801

802+
static void cb_check_response_success(void *ctx, int ffd,
803+
int res_ret, void *res_data,
804+
size_t res_size, void *data)
805+
{
806+
TEST_CHECK(res_ret == 1);
807+
}
808+
809+
void flb_test_response_success()
810+
{
811+
int ret;
812+
char *response = "{\"took\":1,\"errors\":false,\"items\":[]}";
813+
int size = 37;
814+
flb_ctx_t *ctx;
815+
int in_ffd;
816+
int out_ffd;
817+
818+
/* Create context, flush every second (some checks omitted here) */
819+
ctx = flb_create();
820+
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
821+
822+
/* Lib input mode */
823+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
824+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
825+
826+
/* Elasticsearch output */
827+
out_ffd = flb_output(ctx, (char *) "es", NULL);
828+
flb_output_set(ctx, out_ffd,
829+
"match", "test",
830+
NULL);
831+
832+
/* Override defaults of index and type */
833+
flb_output_set(ctx, out_ffd,
834+
"write_operation", "create",
835+
NULL);
836+
837+
/* Enable test mode */
838+
ret = flb_output_set_http_test(ctx, out_ffd, "response",
839+
cb_check_response_success,
840+
NULL);
841+
842+
/* Start */
843+
ret = flb_start(ctx);
844+
TEST_CHECK(ret == 0);
845+
846+
/* Ingest data sample */
847+
ret = flb_lib_response(ctx, out_ffd, 200, response, size);
848+
TEST_CHECK(ret == 0);
849+
850+
sleep(2);
851+
flb_stop(ctx);
852+
flb_destroy(ctx);
853+
}
854+
855+
void flb_test_response_successes()
856+
{
857+
int ret;
858+
char *response = JSON_RESPONSE_SUCCESSES;
859+
int size = JSON_RESPONSE_SUCCESSES_SIZE;
860+
flb_ctx_t *ctx;
861+
int in_ffd;
862+
int out_ffd;
863+
864+
/* Create context, flush every second (some checks omitted here) */
865+
ctx = flb_create();
866+
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
867+
868+
/* Lib input mode */
869+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
870+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
871+
872+
/* Elasticsearch output */
873+
out_ffd = flb_output(ctx, (char *) "es", NULL);
874+
flb_output_set(ctx, out_ffd,
875+
"match", "test",
876+
NULL);
877+
878+
/* Override defaults of index and type */
879+
flb_output_set(ctx, out_ffd,
880+
"write_operation", "create",
881+
NULL);
882+
883+
/* Enable test mode */
884+
ret = flb_output_set_http_test(ctx, out_ffd, "response",
885+
cb_check_response_success,
886+
NULL);
887+
888+
/* Start */
889+
ret = flb_start(ctx);
890+
TEST_CHECK(ret == 0);
891+
892+
/* Ingest data sample */
893+
ret = flb_lib_response(ctx, out_ffd, 200, response, size);
894+
TEST_CHECK(ret == 0);
895+
896+
sleep(2);
897+
flb_stop(ctx);
898+
flb_destroy(ctx);
899+
}
900+
901+
static void cb_check_response_partially_success(void *ctx, int ffd,
902+
int res_ret, void *res_data,
903+
size_t res_size, void *data)
904+
{
905+
int composed_ret = 0;
906+
composed_ret |= (1 << 0);
907+
composed_ret |= (1 << 7);
908+
909+
TEST_CHECK(res_ret == composed_ret);
910+
/* Check whether contains a success flag or not */
911+
TEST_CHECK((res_ret & (1 << 0)));
912+
}
913+
914+
void flb_test_response_partially_success()
915+
{
916+
int ret;
917+
char *response = JSON_RESPONSE_PARTIALLY_SUCCESS;
918+
int size = JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE;
919+
flb_ctx_t *ctx;
920+
int in_ffd;
921+
int out_ffd;
922+
923+
/* Create context, flush every second (some checks omitted here) */
924+
ctx = flb_create();
925+
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
926+
927+
/* Lib input mode */
928+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
929+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
930+
931+
/* Elasticsearch output */
932+
out_ffd = flb_output(ctx, (char *) "es", NULL);
933+
flb_output_set(ctx, out_ffd,
934+
"match", "test",
935+
NULL);
936+
937+
/* Override defaults of index and type */
938+
flb_output_set(ctx, out_ffd,
939+
"write_operation", "create",
940+
NULL);
941+
942+
/* Enable test mode */
943+
ret = flb_output_set_http_test(ctx, out_ffd, "response",
944+
cb_check_response_partially_success,
945+
NULL);
946+
947+
/* Start */
948+
ret = flb_start(ctx);
949+
TEST_CHECK(ret == 0);
950+
951+
/* Ingest data sample */
952+
ret = flb_lib_response(ctx, out_ffd, 200, response, size);
953+
TEST_CHECK(ret == 0);
954+
955+
sleep(2);
956+
flb_stop(ctx);
957+
flb_destroy(ctx);
958+
}
959+
802960
/* Test list */
803961
TEST_LIST = {
804962
{"long_index" , flb_test_long_index },
@@ -814,5 +972,8 @@ TEST_LIST = {
814972
{"replace_dots" , flb_test_replace_dots },
815973
{"id_key" , flb_test_id_key },
816974
{"logstash_prefix_separator" , flb_test_logstash_prefix_separator },
975+
{"response_success" , flb_test_response_success },
976+
{"response_successes", flb_test_response_successes },
977+
{"response_partially_success" , flb_test_response_partially_success },
817978
{NULL, NULL}
818979
};

0 commit comments

Comments
 (0)