Skip to content

Commit 41414f5

Browse files
committed
es_out: support multiple output nodes in round-robin by using upstream servers configuration
1 parent 54f1c6f commit 41414f5

File tree

4 files changed

+487
-134
lines changed

4 files changed

+487
-134
lines changed

plugins/out_es/es.c

Lines changed: 113 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <fluent-bit/flb_utils.h>
2424
#include <fluent-bit/flb_network.h>
2525
#include <fluent-bit/flb_http_client.h>
26+
#include <fluent-bit/flb_upstream.h>
27+
#include <fluent-bit/flb_upstream_ha.h>
2628
#include <fluent-bit/flb_pack.h>
2729
#include <fluent-bit/flb_time.h>
2830
#include <msgpack.h>
@@ -38,7 +40,7 @@ struct flb_output_plugin out_es_plugin;
3840

3941
static inline int es_pack_map_content(msgpack_packer *tmp_pck,
4042
msgpack_object map,
41-
struct flb_elasticsearch *ctx)
43+
struct flb_elasticsearch_config *ec)
4244
{
4345
int i;
4446
char *ptr_key = NULL;
@@ -87,7 +89,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
8789
*
8890
* https://goo.gl/R5NMTr
8991
*/
90-
if (ctx->replace_dots == FLB_TRUE) {
92+
if (ec->replace_dots == FLB_TRUE) {
9193
char *p = ptr_key;
9294
char *end = ptr_key + key_size;
9395
while (p != end) {
@@ -112,7 +114,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
112114
*/
113115
if (v->type == MSGPACK_OBJECT_MAP) {
114116
msgpack_pack_map(tmp_pck, v->via.map.size);
115-
es_pack_map_content(tmp_pck, *v, ctx);
117+
es_pack_map_content(tmp_pck, *v, ec);
116118
}
117119
else {
118120
msgpack_pack_object(tmp_pck, *v);
@@ -130,7 +132,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
130132
*/
131133
static char *elasticsearch_format(const void *data, size_t bytes,
132134
const char *tag, int tag_len, int *out_size,
133-
struct flb_elasticsearch *ctx)
135+
struct flb_elasticsearch_config *ec)
134136
{
135137
int ret;
136138
int len;
@@ -200,9 +202,9 @@ static char *elasticsearch_format(const void *data, size_t bytes,
200202
msgpack_unpacked_init(&result);
201203

202204
/* Copy logstash prefix if logstash format is enabled */
203-
if (ctx->logstash_format == FLB_TRUE) {
204-
memcpy(logstash_index, ctx->logstash_prefix, ctx->logstash_prefix_len);
205-
logstash_index[ctx->logstash_prefix_len] = '\0';
205+
if (ec->logstash_format == FLB_TRUE) {
206+
memcpy(logstash_index, ec->logstash_prefix, ec->logstash_prefix_len);
207+
logstash_index[ec->logstash_prefix_len] = '\0';
206208
}
207209

208210
/*
@@ -212,17 +214,17 @@ static char *elasticsearch_format(const void *data, size_t bytes,
212214
* The header stored in 'j_index' will be used for the all records on
213215
* this payload.
214216
*/
215-
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
217+
if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) {
216218
flb_time_get(&tms);
217219
gmtime_r(&tms.tm.tv_sec, &tm);
218220
s = strftime(index_formatted, sizeof(index_formatted) - 1,
219-
ctx->index, &tm);
221+
ec->index, &tm);
220222
es_index = index_formatted;
221223

222224
index_len = snprintf(j_index,
223225
ES_BULK_HEADER,
224226
ES_BULK_INDEX_FMT,
225-
es_index, ctx->type);
227+
es_index, ec->type);
226228
}
227229

228230
/*
@@ -231,7 +233,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
231233
* in order to prevent generating millions of indexes
232234
* we can set to always use current time for index generation
233235
*/
234-
if (ctx->current_time_index == FLB_TRUE) {
236+
if (ec->current_time_index == FLB_TRUE) {
235237
flb_time_get(&tms);
236238
}
237239

@@ -248,7 +250,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
248250
}
249251

250252
/* Only pop time from record if current_time_index is disabled */
251-
if (ctx->current_time_index == FLB_FALSE) {
253+
if (ec->current_time_index == FLB_FALSE) {
252254
flb_time_pop_from_msgpack(&tms, &result, &obj);
253255
}
254256

@@ -263,16 +265,16 @@ static char *elasticsearch_format(const void *data, size_t bytes,
263265
map_size = map.via.map.size;
264266

265267
es_index_custom_len = 0;
266-
if (ctx->logstash_prefix_key_len != 0) {
268+
if (ec->logstash_prefix_key_len != 0) {
267269
for (i = 0; i < map_size; i++) {
268270
key = map.via.map.ptr[i].key;
269271
if (key.type != MSGPACK_OBJECT_STR) {
270272
continue;
271273
}
272-
if (key.via.str.size != ctx->logstash_prefix_key_len) {
274+
if (key.via.str.size != ec->logstash_prefix_key_len) {
273275
continue;
274276
}
275-
if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, ctx->logstash_prefix_key_len) != 0) {
277+
if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, ec->logstash_prefix_key_len) != 0) {
276278
continue;
277279
}
278280
val = map.via.map.ptr[i].val;
@@ -294,62 +296,62 @@ static char *elasticsearch_format(const void *data, size_t bytes,
294296
msgpack_sbuffer_init(&tmp_sbuf);
295297
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
296298

297-
if (ctx->include_tag_key == FLB_TRUE) {
299+
if (ec->include_tag_key == FLB_TRUE) {
298300
map_size++;
299301
}
300302

301303
/* Set the new map size */
302304
msgpack_pack_map(&tmp_pck, map_size + 1);
303305

304306
/* Append the time key */
305-
msgpack_pack_str(&tmp_pck, ctx->time_key_len);
306-
msgpack_pack_str_body(&tmp_pck, ctx->time_key, ctx->time_key_len);
307+
msgpack_pack_str(&tmp_pck, ec->time_key_len);
308+
msgpack_pack_str_body(&tmp_pck, ec->time_key, ec->time_key_len);
307309

308310
/* Format the time */
309311
gmtime_r(&tms.tm.tv_sec, &tm);
310312
s = strftime(time_formatted, sizeof(time_formatted) - 1,
311-
ctx->time_key_format, &tm);
313+
ec->time_key_format, &tm);
312314
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
313315
".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
314316

315317
s += len;
316318
msgpack_pack_str(&tmp_pck, s);
317319
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
318320

319-
es_index = ctx->index;
320-
if (ctx->logstash_format == FLB_TRUE) {
321+
es_index = ec->index;
322+
if (ec->logstash_format == FLB_TRUE) {
321323
/* Compose Index header */
322324
if (es_index_custom_len > 0) {
323325
p = logstash_index + es_index_custom_len;
324326
} else {
325-
p = logstash_index + ctx->logstash_prefix_len;
327+
p = logstash_index + ec->logstash_prefix_len;
326328
}
327329
*p++ = '-';
328330

329331
len = p - logstash_index;
330332
s = strftime(p, sizeof(logstash_index) - len - 1,
331-
ctx->logstash_dateformat, &tm);
333+
ec->logstash_dateformat, &tm);
332334
p += s;
333335
*p++ = '\0';
334336
es_index = logstash_index;
335-
if (ctx->generate_id == FLB_FALSE) {
337+
if (ec->generate_id == FLB_FALSE) {
336338
index_len = snprintf(j_index,
337339
ES_BULK_HEADER,
338340
ES_BULK_INDEX_FMT,
339-
es_index, ctx->type);
341+
es_index, ec->type);
340342
}
341343
}
342-
else if (ctx->current_time_index == FLB_TRUE) {
344+
else if (ec->current_time_index == FLB_TRUE) {
343345
/* Make sure we handle index time format for index */
344346
strftime(index_formatted, sizeof(index_formatted) - 1,
345-
ctx->index, &tm);
347+
ec->index, &tm);
346348
es_index = index_formatted;
347349
}
348350

349351
/* Tag Key */
350-
if (ctx->include_tag_key == FLB_TRUE) {
351-
msgpack_pack_str(&tmp_pck, ctx->tag_key_len);
352-
msgpack_pack_str_body(&tmp_pck, ctx->tag_key, ctx->tag_key_len);
352+
if (ec->include_tag_key == FLB_TRUE) {
353+
msgpack_pack_str(&tmp_pck, ec->tag_key_len);
354+
msgpack_pack_str_body(&tmp_pck, ec->tag_key, ec->tag_key_len);
353355
msgpack_pack_str(&tmp_pck, tag_len);
354356
msgpack_pack_str_body(&tmp_pck, tag, tag_len);
355357
}
@@ -361,15 +363,15 @@ static char *elasticsearch_format(const void *data, size_t bytes,
361363
* Elasticsearch have a restriction that key names cannot contain
362364
* a dot; if some dot is found, it's replaced with an underscore.
363365
*/
364-
ret = es_pack_map_content(&tmp_pck, map, ctx);
366+
ret = es_pack_map_content(&tmp_pck, map, ec);
365367
if (ret == -1) {
366368
msgpack_unpacked_destroy(&result);
367369
msgpack_sbuffer_destroy(&tmp_sbuf);
368370
es_bulk_destroy(bulk);
369371
return NULL;
370372
}
371373

372-
if (ctx->generate_id == FLB_TRUE) {
374+
if (ec->generate_id == FLB_TRUE) {
373375
MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash);
374376
snprintf(es_uuid, sizeof(es_uuid),
375377
"%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
@@ -378,7 +380,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
378380
index_len = snprintf(j_index,
379381
ES_BULK_HEADER,
380382
ES_BULK_INDEX_FMT_ID,
381-
es_index, ctx->type, es_uuid);
383+
es_index, ec->type, es_uuid);
382384
}
383385

384386
/* Convert msgpack to JSON */
@@ -412,7 +414,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
412414
* return the bulk->ptr buffer
413415
*/
414416
flb_free(bulk);
415-
if (ctx->trace_output) {
417+
if (ec->trace_output) {
416418
fwrite(buf, 1, *out_size, stdout);
417419
fflush(stdout);
418420
}
@@ -423,20 +425,30 @@ int cb_es_init(struct flb_output_instance *ins,
423425
struct flb_config *config,
424426
void *data)
425427
{
428+
int ret;
429+
const char *tmp;
426430
struct flb_elasticsearch *ctx;
431+
(void) data;
427432

428-
ctx = flb_es_conf_create(ins, config);
433+
ctx = flb_calloc(1, sizeof(struct flb_elasticsearch));
429434
if (!ctx) {
430-
flb_error("[out_es] cannot initialize plugin");
435+
flb_errno();
431436
return -1;
432437
}
433438

434-
flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s",
435-
ins->host.name, ins->host.port, ctx->uri,
436-
ctx->index, ctx->type);
437-
439+
mk_list_init(&ctx->configs);
438440
flb_output_set_context(ins, ctx);
439-
return 0;
441+
442+
/* Configure HA or simple mode ? */
443+
tmp = flb_output_get_property("upstream", ins);
444+
if (tmp) {
445+
ret = es_config_ha(tmp, ctx, config);
446+
}
447+
else {
448+
ret = es_config_simple(ins, ctx, config);
449+
}
450+
451+
return ret;
440452
}
441453

442454
static int elasticsearch_error_check(struct flb_http_client *c)
@@ -546,46 +558,72 @@ void cb_es_flush(const void *data, size_t bytes,
546558
char *pack;
547559
size_t b_sent;
548560
struct flb_elasticsearch *ctx = out_context;
561+
struct flb_elasticsearch_config *ec = NULL;
549562
struct flb_upstream_conn *u_conn;
550563
struct flb_http_client *c;
564+
struct flb_upstream_node *node;
551565
(void) i_ins;
552566
(void) tag;
553567
(void) tag_len;
554568

569+
if (ctx->ha_mode == FLB_TRUE) {
570+
node = flb_upstream_ha_node_get(ctx->ha);
571+
if (!node) {
572+
flb_error("[out_es] cannot get an Upstream HA node");
573+
FLB_OUTPUT_RETURN(FLB_RETRY);
574+
}
575+
576+
/* Get forward_config stored in node opaque data */
577+
ec = flb_upstream_node_get_data(node);
578+
}
579+
else {
580+
ec = mk_list_entry_first(&ctx->configs,
581+
struct flb_elasticsearch_config,
582+
_head);
583+
}
584+
585+
flb_debug("[out_es] trying node %s", node->name);
586+
555587
/* Get upstream connection */
556-
u_conn = flb_upstream_conn_get(ctx->u);
588+
if (ctx->ha_mode == FLB_TRUE) {
589+
u_conn = flb_upstream_conn_get(node->u);
590+
}
591+
else {
592+
u_conn = flb_upstream_conn_get(ctx->u);
593+
}
557594
if (!u_conn) {
595+
flb_error("[out_es] no upstream connections available");
558596
FLB_OUTPUT_RETURN(FLB_RETRY);
559597
}
560598

561599
/* Convert format */
562-
pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx);
600+
pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec);
563601
if (!pack) {
564602
flb_upstream_conn_release(u_conn);
565603
FLB_OUTPUT_RETURN(FLB_ERROR);
566604
}
567605

568606
/* Compose HTTP Client request */
569-
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
607+
c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri,
570608
pack, bytes_out, NULL, 0, NULL, 0);
571609

572-
flb_http_buffer_size(c, ctx->buffer_size);
610+
flb_http_buffer_size(c, ec->buffer_size);
573611

574612
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
575613
flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20);
576614

577-
if (ctx->http_user && ctx->http_passwd) {
578-
flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
615+
if (ec->http_user && ec->http_passwd) {
616+
flb_http_basic_auth(c, ec->http_user, ec->http_passwd);
579617
}
580618

581619
ret = flb_http_do(c, &b_sent);
582620
if (ret != 0) {
583-
flb_warn("[out_es] http_do=%i URI=%s", ret, ctx->uri);
621+
flb_warn("[out_es] http_do=%i URI=%s", ret, ec->uri);
584622
goto retry;
585623
}
586624
else {
587625
/* The request was issued successfully, validate the 'error' field */
588-
flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
626+
flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ec->uri);
589627
if (c->resp.status != 200 && c->resp.status != 201) {
590628
goto retry;
591629
}
@@ -598,7 +636,7 @@ void cb_es_flush(const void *data, size_t bytes,
598636
ret = elasticsearch_error_check(c);
599637
if (ret == FLB_TRUE) {
600638
/* we got an error */
601-
if (ctx->trace_error) {
639+
if (ec->trace_error) {
602640
/*
603641
* If trace_error is set, trace the actual
604642
* input/output to Elasticsearch that caused the problem.
@@ -635,8 +673,32 @@ void cb_es_flush(const void *data, size_t bytes,
635673
int cb_es_exit(void *data, struct flb_config *config)
636674
{
637675
struct flb_elasticsearch *ctx = data;
676+
struct flb_elasticsearch_config *ec;
677+
struct mk_list *head;
678+
struct mk_list *tmp;
679+
(void) config;
680+
681+
if (!ctx) {
682+
return 0;
683+
}
684+
685+
/* Destroy elasticsearch_config contexts */
686+
mk_list_foreach_safe(head, tmp, &ctx->configs) {
687+
ec = mk_list_entry(head, struct flb_elasticsearch_config, _head);
688+
mk_list_del(&ec->_head);
689+
flb_es_conf_destroy(ec);
690+
}
691+
692+
if (ctx->ha_mode == FLB_TRUE) {
693+
if (ctx->ha) {
694+
flb_upstream_ha_destroy(ctx->ha);
695+
}
696+
}
697+
else {
698+
flb_upstream_destroy(ctx->u);
699+
}
700+
flb_free(ctx);
638701

639-
flb_es_conf_destroy(ctx);
640702
return 0;
641703
}
642704

0 commit comments

Comments
 (0)