Skip to content

Commit 592d401

Browse files
nokute78edsiper
authored andcommitted
out_loki: support new option tenant_id_key(#2935)
Signed-off-by: Takahiro Yamashita <[email protected]>
1 parent 8eb2f8f commit 592d401

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

plugins/out_loki/loki.c

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,13 @@ static void loki_config_destroy(struct flb_loki *ctx)
599599
if (ctx->ra_k8s) {
600600
flb_ra_destroy(ctx->ra_k8s);
601601
}
602+
if (ctx->ra_tenant_id_key) {
603+
flb_ra_destroy(ctx->ra_tenant_id_key);
604+
if (ctx->dynamic_tenant_id) {
605+
flb_sds_destroy(ctx->dynamic_tenant_id);
606+
ctx->dynamic_tenant_id = NULL;
607+
}
608+
}
602609

603610
if (ctx->remove_mpa) {
604611
flb_mp_accessor_destroy(ctx->remove_mpa);
@@ -653,6 +660,16 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
653660
return NULL;
654661
}
655662

663+
/* tenant_id_key */
664+
if (ctx->tenant_id_key_config) {
665+
ctx->ra_tenant_id_key = flb_ra_create(ctx->tenant_id_key_config, FLB_FALSE);
666+
if (!ctx->ra_tenant_id_key) {
667+
flb_plg_error(ctx->ins,
668+
"could not create record accessor for Tenant ID");
669+
}
670+
ctx->dynamic_tenant_id = NULL;
671+
}
672+
656673
/* Line Format */
657674
if (strcasecmp(ctx->line_format, "json") == 0) {
658675
ctx->out_line_format = FLB_LOKI_FMT_JSON;
@@ -796,6 +813,57 @@ static void pack_format_line_value(flb_sds_t buf, msgpack_object *val)
796813
}
797814
}
798815

816+
// seek tenant id from map and set it to ctx->dynamic_tenant_id
817+
static int get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map)
818+
{
819+
struct flb_ra_value *rval = NULL;
820+
flb_sds_t tmp_str;
821+
int cmp_len;
822+
823+
rval = flb_ra_get_value_object(ctx->ra_tenant_id_key, *map);
824+
825+
if (rval == NULL) {
826+
flb_plg_warn(ctx->ins, "the value of %s is missing",
827+
ctx->tenant_id_key_config);
828+
return -1;
829+
}
830+
else if (rval->o.type != MSGPACK_OBJECT_STR) {
831+
flb_plg_warn(ctx->ins, "the value of %s is not string",
832+
ctx->tenant_id_key_config);
833+
return -1;
834+
}
835+
836+
tmp_str = flb_sds_create_len(rval->o.via.str.ptr,
837+
rval->o.via.str.size);
838+
if (tmp_str == NULL) {
839+
flb_plg_warn(ctx->ins, "cannot create tenant ID string from record");
840+
flb_ra_key_value_destroy(rval);
841+
return -1;
842+
}
843+
844+
// check if already dynamic_tenant_id is set.
845+
if (ctx->dynamic_tenant_id) {
846+
cmp_len = flb_sds_len(ctx->dynamic_tenant_id);
847+
if ((rval->o.via.str.size == cmp_len) &&
848+
flb_sds_cmp(tmp_str, ctx->dynamic_tenant_id, cmp_len) == 0) {
849+
// tenant_id is same. nothing to do.
850+
flb_ra_key_value_destroy(rval);
851+
flb_sds_destroy(tmp_str);
852+
return 0;
853+
}
854+
flb_plg_warn(ctx->ins, "Tenant ID is overwritten %s -> %s",
855+
ctx->dynamic_tenant_id, tmp_str);
856+
flb_sds_destroy(ctx->dynamic_tenant_id);
857+
}
858+
859+
// this sds will be released after setting http header.
860+
ctx->dynamic_tenant_id = tmp_str;
861+
flb_plg_debug(ctx->ins, "Tenant ID is %s", ctx->dynamic_tenant_id);
862+
863+
flb_ra_key_value_destroy(rval);
864+
return 0;
865+
}
866+
799867
static int pack_record(struct flb_loki *ctx,
800868
msgpack_packer *mp_pck, msgpack_object *rec)
801869
{
@@ -829,6 +897,11 @@ static int pack_record(struct flb_loki *ctx,
829897
}
830898
}
831899

900+
// Get tenant id from record.
901+
if (ctx->ra_tenant_id_key && rec->type == MSGPACK_OBJECT_MAP) {
902+
get_tenant_id_from_record(ctx, rec);
903+
}
904+
832905
if (ctx->out_line_format == FLB_LOKI_FMT_JSON) {
833906
line = flb_msgpack_to_json_str(size_hint, rec);
834907
if (!line) {
@@ -1103,7 +1176,15 @@ static void cb_loki_flush(const void *data, size_t bytes,
11031176
FLB_LOKI_CT_JSON, sizeof(FLB_LOKI_CT_JSON) - 1);
11041177

11051178
/* Add X-Scope-OrgID header */
1106-
if (ctx->tenant_id) {
1179+
if (ctx->dynamic_tenant_id) {
1180+
flb_http_add_header(c,
1181+
FLB_LOKI_HEADER_SCOPE, sizeof(FLB_LOKI_HEADER_SCOPE) - 1,
1182+
ctx->dynamic_tenant_id,
1183+
flb_sds_len(ctx->dynamic_tenant_id));
1184+
flb_sds_destroy(ctx->dynamic_tenant_id);
1185+
ctx->dynamic_tenant_id = NULL; // clear for next flush
1186+
}
1187+
else if (ctx->tenant_id) {
11071188
flb_http_add_header(c,
11081189
FLB_LOKI_HEADER_SCOPE, sizeof(FLB_LOKI_HEADER_SCOPE) - 1,
11091190
ctx->tenant_id, flb_sds_len(ctx->tenant_id));
@@ -1183,6 +1264,12 @@ static struct flb_config_map config_map[] = {
11831264
"it assumes Loki is running in single-tenant mode and no X-Scope-OrgID "
11841265
"header is sent."
11851266
},
1267+
{
1268+
FLB_CONFIG_MAP_STR, "tenant_id_key", NULL,
1269+
0, FLB_TRUE, offsetof(struct flb_loki, tenant_id_key_config),
1270+
"If set, X-Scope-OrgID will be the value of the key from incoming record. "
1271+
"It is useful to set X-Scode-OrgID dynamically."
1272+
},
11861273

11871274
{
11881275
FLB_CONFIG_MAP_CLIST, "labels", NULL,

plugins/out_loki/loki.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct flb_loki {
5555
int auto_kubernetes_labels;
5656
flb_sds_t line_format;
5757
flb_sds_t tenant_id;
58+
flb_sds_t tenant_id_key_config;
5859

5960
/* HTTP Auth */
6061
flb_sds_t http_user;
@@ -74,6 +75,8 @@ struct flb_loki {
7475
struct mk_list labels_list; /* list of flb_loki_kv nodes */
7576
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
7677
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
78+
struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */
79+
flb_sds_t dynamic_tenant_id; /* temporary buffer for tenant id */
7780

7881
/* Upstream Context */
7982
struct flb_upstream *u;

0 commit comments

Comments
 (0)