Skip to content

Commit dc2570c

Browse files
authored
out_http: Implement "body_key" and "headers_key" options (fluent#5439)
* out_http: Implement "body_key" and "headers_key" options Add two new options to out_http that allows greater flexibility in controlling the http request body and headers. "body_key" specifies a key in the record that contains the actual http body. This key's value must be a msgpack string or binary. "headers_key" specifies a key in the record that contains headers the should be added to the request. This key's value must be a msgpack map. Signed-off-by: Thiago Padilha <[email protected]>
1 parent 02eba59 commit dc2570c

File tree

3 files changed

+256
-6
lines changed

3 files changed

+256
-6
lines changed

plugins/out_http/http.c

Lines changed: 212 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <fluent-bit/flb_pack.h>
2828
#include <fluent-bit/flb_sds.h>
2929
#include <fluent-bit/flb_gzip.h>
30+
#include <fluent-bit/flb_record_accessor.h>
3031
#include <msgpack.h>
3132

3233
#ifdef FLB_HAVE_SIGNV4
@@ -38,6 +39,7 @@
3839

3940
#include <stdio.h>
4041
#include <stdlib.h>
42+
#include <string.h>
4143
#include <assert.h>
4244
#include <errno.h>
4345

@@ -69,9 +71,43 @@ static int cb_http_init(struct flb_output_instance *ins,
6971
return 0;
7072
}
7173

74+
static void append_headers(struct flb_http_client *c,
75+
char **headers)
76+
{
77+
int i;
78+
char *header_key;
79+
char *header_value;
80+
81+
i = 0;
82+
header_key = NULL;
83+
header_value = NULL;
84+
while (*headers) {
85+
if (i % 2 == 0) {
86+
header_key = *headers;
87+
}
88+
else {
89+
header_value = *headers;
90+
}
91+
if (header_key && header_value) {
92+
flb_http_add_header(c,
93+
header_key,
94+
strlen(header_key),
95+
header_value,
96+
strlen(header_value));
97+
flb_free(header_key);
98+
flb_free(header_value);
99+
header_key = NULL;
100+
header_value = NULL;
101+
}
102+
headers++;
103+
i++;
104+
}
105+
}
106+
72107
static int http_post(struct flb_out_http *ctx,
73108
const void *body, size_t body_len,
74-
const char *tag, int tag_len)
109+
const char *tag, int tag_len,
110+
char **headers)
75111
{
76112
int ret;
77113
int out_ret = FLB_OK;
@@ -136,7 +172,10 @@ static int http_post(struct flb_out_http *ctx,
136172
c->cb_ctx = ctx->ins->callback;
137173

138174
/* Append headers */
139-
if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
175+
if (headers) {
176+
append_headers(c, headers);
177+
}
178+
else if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
140179
(ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
141180
(ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) ||
142181
(ctx->out_format == FLB_HTTP_OUT_GELF)) {
@@ -328,13 +367,162 @@ static int http_gelf(struct flb_out_http *ctx,
328367
s = tmp;
329368
}
330369

331-
ret = http_post(ctx, s, flb_sds_len(s), tag, tag_len);
370+
ret = http_post(ctx, s, flb_sds_len(s), tag, tag_len, NULL);
332371
flb_sds_destroy(s);
333372
msgpack_unpacked_destroy(&result);
334373

335374
return ret;
336375
}
337376

377+
static char **extract_headers(msgpack_object *obj) {
378+
size_t i;
379+
char **headers = NULL;
380+
size_t str_count;
381+
msgpack_object_map map;
382+
msgpack_object_str k;
383+
msgpack_object_str v;
384+
385+
if (obj->type != MSGPACK_OBJECT_MAP) {
386+
goto err;
387+
}
388+
389+
map = obj->via.map;
390+
str_count = map.size * 2 + 1;
391+
headers = flb_calloc(str_count, sizeof *headers);
392+
393+
if (!headers) {
394+
goto err;
395+
}
396+
397+
for (i = 0; i < map.size; i++) {
398+
if (map.ptr[i].key.type != MSGPACK_OBJECT_STR ||
399+
map.ptr[i].val.type != MSGPACK_OBJECT_STR) {
400+
continue;
401+
}
402+
403+
k = map.ptr[i].key.via.str;
404+
v = map.ptr[i].val.via.str;
405+
406+
headers[i * 2] = strndup(k.ptr, k.size);
407+
408+
if (!headers[i]) {
409+
goto err;
410+
}
411+
412+
headers[i * 2 + 1] = strndup(v.ptr, v.size);
413+
414+
if (!headers[i]) {
415+
goto err;
416+
}
417+
}
418+
419+
return headers;
420+
421+
err:
422+
if (headers) {
423+
for (i = 0; i < str_count; i++) {
424+
if (headers[i]) {
425+
flb_free(headers[i]);
426+
}
427+
}
428+
flb_free(headers);
429+
}
430+
return NULL;
431+
}
432+
static int post_all_requests(struct flb_out_http *ctx,
433+
const char *data, size_t size,
434+
flb_sds_t body_key,
435+
flb_sds_t headers_key,
436+
struct flb_event_chunk *event_chunk)
437+
{
438+
struct flb_time t;
439+
msgpack_unpacked result;
440+
msgpack_object root;
441+
msgpack_object map;
442+
msgpack_object *obj;
443+
msgpack_object *k;
444+
msgpack_object *v;
445+
msgpack_object *start_key;
446+
const char *body;
447+
size_t body_size;
448+
bool body_found;
449+
bool headers_found;
450+
char **headers;
451+
size_t off = 0;
452+
size_t record_count = 0;
453+
int ret = 0;
454+
455+
msgpack_unpacked_init(&result);
456+
457+
while (msgpack_unpack_next(&result, data, size, &off) == MSGPACK_UNPACK_SUCCESS) {
458+
headers = NULL;
459+
body_found = false;
460+
headers_found = false;
461+
root = result.data;
462+
463+
if (root.type != MSGPACK_OBJECT_ARRAY) {
464+
ret = -1;
465+
break;
466+
}
467+
468+
if (root.via.array.size != 2) {
469+
ret = -1;
470+
break;
471+
}
472+
473+
flb_time_pop_from_msgpack(&t, &result, &obj);
474+
475+
map = root.via.array.ptr[1];
476+
if (map.type != MSGPACK_OBJECT_MAP) {
477+
ret = -1;
478+
break;
479+
}
480+
481+
if (!flb_ra_get_kv_pair(ctx->body_ra, map, &start_key, &k, &v)) {
482+
if (v->type == MSGPACK_OBJECT_STR || v->type == MSGPACK_OBJECT_BIN) {
483+
body = v->via.str.ptr;
484+
body_size = v->via.str.size;
485+
body_found = true;
486+
}
487+
else {
488+
flb_plg_warn(ctx->ins,
489+
"failed to extract body using pattern \"%s\" "
490+
"(must be a msgpack string or bin)", ctx->body_key);
491+
}
492+
}
493+
494+
if (!flb_ra_get_kv_pair(ctx->headers_ra, map, &start_key, &k, &v)) {
495+
headers = extract_headers(v);
496+
if (headers) {
497+
headers_found = true;
498+
}
499+
else {
500+
flb_plg_warn(ctx->ins,
501+
"error extracting headers using pattern \"%s\"",
502+
ctx->headers_key);
503+
}
504+
}
505+
506+
if (body_found && headers_found) {
507+
flb_plg_trace(ctx->ins, "posting record %d", record_count++);
508+
ret = http_post(ctx, body, body_size, event_chunk->tag,
509+
flb_sds_len(event_chunk->tag), headers);
510+
}
511+
else {
512+
flb_plg_warn(ctx->ins,
513+
"failed to extract body/headers using patterns "
514+
"\"%s\" and \"%s\"", ctx->body_key, ctx->headers_key);
515+
ret = -1;
516+
continue;
517+
}
518+
519+
flb_free(headers);
520+
}
521+
522+
msgpack_unpacked_destroy(&result);
523+
return ret;
524+
}
525+
338526
static void cb_http_flush(struct flb_event_chunk *event_chunk,
339527
struct flb_output_flush *out_flush,
340528
struct flb_input_instance *i_ins,
@@ -346,7 +534,15 @@ static void cb_http_flush(struct flb_event_chunk *event_chunk,
346534
struct flb_out_http *ctx = out_context;
347535
(void) i_ins;
348536

349-
if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
537+
if (ctx->body_key) {
538+
ret = post_all_requests(ctx, event_chunk->data, event_chunk->size,
539+
ctx->body_key, ctx->headers_key, event_chunk);
540+
if (ret < 0) {
541+
flb_plg_error(ctx->ins,
542+
"failed to post requests body key \"%s\"", ctx->body_key);
543+
}
544+
}
545+
else if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
350546
(ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
351547
(ctx->out_format == FLB_PACK_JSON_FORMAT_LINES)) {
352548

@@ -357,7 +553,7 @@ static void cb_http_flush(struct flb_event_chunk *event_chunk,
357553
ctx->date_key);
358554
if (json != NULL) {
359555
ret = http_post(ctx, json, flb_sds_len(json),
360-
event_chunk->tag, flb_sds_len(event_chunk->tag));
556+
event_chunk->tag, flb_sds_len(event_chunk->tag), NULL);
361557
flb_sds_destroy(json);
362558
}
363559
}
@@ -369,7 +565,7 @@ static void cb_http_flush(struct flb_event_chunk *event_chunk,
369565
else {
370566
ret = http_post(ctx,
371567
event_chunk->data, event_chunk->size,
372-
event_chunk->tag, flb_sds_len(event_chunk->tag));
568+
event_chunk->tag, flb_sds_len(event_chunk->tag), NULL);
373569
}
374570

375571
FLB_OUTPUT_RETURN(ret);
@@ -487,6 +683,16 @@ static struct flb_config_map config_map[] = {
487683
0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.level_key),
488684
"Specify the key to use for the 'level' in gelf format"
489685
},
686+
{
687+
FLB_CONFIG_MAP_STR, "body_key", NULL,
688+
0, FLB_TRUE, offsetof(struct flb_out_http, body_key),
689+
"Specify the key which contains the body"
690+
},
691+
{
692+
FLB_CONFIG_MAP_STR, "headers_key", NULL,
693+
0, FLB_TRUE, offsetof(struct flb_out_http, headers_key),
694+
"Specify the key which contains the headers"
695+
},
490696

491697
/* EOF */
492698
{0}

plugins/out_http/http.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ struct flb_out_http {
6868
/* GELF fields */
6969
struct flb_gelf_fields gelf_fields;
7070

71+
/* which record key to use as body */
72+
flb_sds_t body_key;
73+
74+
struct flb_record_accessor *body_ra;
75+
76+
/* override headers with contents of the map in the key specified here */
77+
flb_sds_t headers_key;
78+
79+
struct flb_record_accessor *headers_ra;
80+
7181
/* Include tag in header */
7282
flb_sds_t header_tag;
7383

plugins/out_http/http_conf.c

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <fluent-bit/flb_pack.h>
2323
#include <fluent-bit/flb_sds.h>
2424
#include <fluent-bit/flb_kv.h>
25+
#include <fluent-bit/flb_record_accessor.h>
2526
#ifdef FLB_HAVE_SIGNV4
2627
#ifdef FLB_HAVE_AWS
2728
#include <fluent-bit/flb_aws_credentials.h>
@@ -59,6 +60,34 @@ struct flb_out_http *flb_http_conf_create(struct flb_output_instance *ins,
5960
return NULL;
6061
}
6162

63+
if (ctx->headers_key && !ctx->body_key) {
64+
flb_plg_error(ctx->ins, "when setting headers_key, body_key is also required");
65+
flb_free(ctx);
66+
return NULL;
67+
}
68+
69+
if (ctx->body_key && !ctx->headers_key) {
70+
flb_plg_error(ctx->ins, "when setting body_key, headers_key is also required");
71+
flb_free(ctx);
72+
return NULL;
73+
}
74+
75+
if (ctx->body_key && ctx->headers_key) {
76+
ctx->body_ra = flb_ra_create(ctx->body_key, FLB_FALSE);
77+
if (!ctx->body_ra) {
78+
flb_plg_error(ctx->ins, "failed to allocate body record accessor");
79+
flb_free(ctx);
80+
return NULL;
81+
}
82+
83+
ctx->headers_ra = flb_ra_create(ctx->headers_key, FLB_FALSE);
84+
if (!ctx->headers_ra) {
85+
flb_plg_error(ctx->ins, "failed to allocate headers record accessor");
86+
flb_free(ctx);
87+
return NULL;
88+
}
89+
}
90+
6291
/*
6392
* Check if a Proxy have been set, if so the Upstream manager will use
6493
* the Proxy end-point and then we let the HTTP client know about it, so
@@ -246,6 +275,11 @@ void flb_http_conf_destroy(struct flb_out_http *ctx)
246275
return;
247276
}
248277

278+
if (ctx->body_ra && ctx->headers_ra) {
279+
flb_ra_destroy(ctx->body_ra);
280+
flb_ra_destroy(ctx->headers_ra);
281+
}
282+
249283
if (ctx->u) {
250284
flb_upstream_destroy(ctx->u);
251285
}

0 commit comments

Comments
 (0)