Skip to content

Commit 0817e00

Browse files
committed
out_loki: prevent race conditions when multiple workers use the Remove_Keys
Signed-off-by: Eduardo Silva <[email protected]>
1 parent c872957 commit 0817e00

File tree

2 files changed

+105
-8
lines changed

2 files changed

+105
-8
lines changed

plugins/out_loki/loki.c

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ pthread_once_t initialization_guard = PTHREAD_ONCE_INIT;
4444

4545
FLB_TLS_DEFINE(struct flb_loki_dynamic_tenant_id_entry,
4646
thread_local_tenant_id);
47+
struct flb_loki_remove_mpa_entry {
48+
struct flb_mp_accessor *mpa;
49+
struct cfl_list _head;
50+
};
51+
FLB_TLS_DEFINE(struct flb_loki_remove_mpa_entry, thread_local_remove_mpa);
4752

4853
void initialize_thread_local_storage()
4954
{
5055
FLB_TLS_INIT(thread_local_tenant_id);
56+
FLB_TLS_INIT(thread_local_remove_mpa);
5157
}
5258

5359
static struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id_create() {
@@ -81,6 +87,43 @@ static void dynamic_tenant_id_destroy(struct flb_loki_dynamic_tenant_id_entry *e
8187
}
8288
}
8389

90+
static struct flb_loki_remove_mpa_entry *remove_mpa_entry_create(struct flb_loki *ctx)
91+
{
92+
struct flb_loki_remove_mpa_entry *entry;
93+
94+
entry = flb_calloc(1, sizeof(struct flb_loki_remove_mpa_entry));
95+
if (!entry) {
96+
flb_errno();
97+
return NULL;
98+
}
99+
100+
entry->mpa = flb_mp_accessor_create(&ctx->remove_keys_derived);
101+
if (!entry->mpa) {
102+
flb_free(entry);
103+
return NULL;
104+
}
105+
106+
cfl_list_entry_init(&entry->_head);
107+
108+
return entry;
109+
}
110+
111+
static void remove_mpa_entry_destroy(struct flb_loki_remove_mpa_entry *entry)
112+
{
113+
if (entry) {
114+
if (entry->mpa) {
115+
flb_mp_accessor_destroy(entry->mpa);
116+
entry->mpa = NULL;
117+
}
118+
119+
if (!cfl_list_entry_is_orphan(&entry->_head)) {
120+
cfl_list_del(&entry->_head);
121+
}
122+
123+
flb_free(entry);
124+
}
125+
}
126+
84127
static void flb_loki_kv_init(struct mk_list *list)
85128
{
86129
mk_list_init(list);
@@ -1371,7 +1414,8 @@ static int get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map,
13711414

13721415
static int pack_record(struct flb_loki *ctx,
13731416
msgpack_packer *mp_pck, msgpack_object *rec,
1374-
flb_sds_t *dynamic_tenant_id)
1417+
flb_sds_t *dynamic_tenant_id,
1418+
struct flb_mp_accessor *remove_mpa)
13751419
{
13761420
int i;
13771421
int skip = 0;
@@ -1397,8 +1441,8 @@ static int pack_record(struct flb_loki *ctx,
13971441

13981442
/* Remove keys in remove_keys */
13991443
msgpack_unpacked_init(&mp_buffer);
1400-
if (ctx->remove_mpa) {
1401-
ret = flb_mp_accessor_keys_remove(ctx->remove_mpa, rec,
1444+
if (remove_mpa) {
1445+
ret = flb_mp_accessor_keys_remove(remove_mpa, rec,
14021446
(void *) &tmp_sbuf_data, &tmp_sbuf_size);
14031447
if (ret == FLB_TRUE) {
14041448
ret = msgpack_unpack_next(&mp_buffer, tmp_sbuf_data, tmp_sbuf_size, &off);
@@ -1564,6 +1608,15 @@ static int cb_loki_init(struct flb_output_instance *ins,
15641608
}
15651609

15661610
cfl_list_init(&ctx->dynamic_tenant_list);
1611+
result = pthread_mutex_init(&ctx->remove_mpa_list_lock, NULL);
1612+
if (result != 0) {
1613+
flb_errno();
1614+
flb_plg_error(ins, "cannot initialize remove_mpa list lock");
1615+
loki_config_destroy(ctx);
1616+
return -1;
1617+
}
1618+
1619+
cfl_list_init(&ctx->remove_mpa_list);
15671620

15681621
/*
15691622
* This plugin instance uses the HTTP client interface, let's register
@@ -1581,7 +1634,8 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
15811634
int total_records,
15821635
char *tag, int tag_len,
15831636
const void *data, size_t bytes,
1584-
flb_sds_t *dynamic_tenant_id)
1637+
flb_sds_t *dynamic_tenant_id,
1638+
struct flb_mp_accessor *remove_mpa)
15851639
{
15861640
// int mp_ok = MSGPACK_UNPACK_SUCCESS;
15871641
// size_t off = 0;
@@ -1672,7 +1726,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
16721726

16731727
/* Append the timestamp */
16741728
pack_timestamp(&mp_pck, &log_event.timestamp);
1675-
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
1729+
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa);
16761730
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
16771731
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
16781732
}
@@ -1709,7 +1763,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
17091763

17101764
/* Append the timestamp */
17111765
pack_timestamp(&mp_pck, &log_event.timestamp);
1712-
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
1766+
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa);
17131767
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
17141768
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
17151769
}
@@ -1752,13 +1806,30 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk,
17521806
struct flb_connection *u_conn;
17531807
struct flb_http_client *c;
17541808
struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id;
1809+
struct flb_loki_remove_mpa_entry *remove_mpa_entry;
17551810
struct mk_list *head;
17561811
struct flb_config_map_val *mv;
17571812
struct flb_slist_entry *key = NULL;
17581813
struct flb_slist_entry *val = NULL;
17591814

17601815
dynamic_tenant_id = FLB_TLS_GET(thread_local_tenant_id);
17611816

1817+
remove_mpa_entry = FLB_TLS_GET(thread_local_remove_mpa);
1818+
1819+
if (remove_mpa_entry == NULL) {
1820+
remove_mpa_entry = remove_mpa_entry_create(ctx);
1821+
if (!remove_mpa_entry) {
1822+
flb_plg_error(ctx->ins, "cannot allocate remove_mpa entry");
1823+
FLB_OUTPUT_RETURN(FLB_RETRY);
1824+
}
1825+
1826+
FLB_TLS_SET(thread_local_remove_mpa, remove_mpa_entry);
1827+
1828+
pthread_mutex_lock(&ctx->remove_mpa_list_lock);
1829+
cfl_list_add(&remove_mpa_entry->_head, &ctx->remove_mpa_list);
1830+
pthread_mutex_unlock(&ctx->remove_mpa_list_lock);
1831+
}
1832+
17621833
if (dynamic_tenant_id == NULL) {
17631834
dynamic_tenant_id = dynamic_tenant_id_create();
17641835

@@ -1784,7 +1855,8 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk,
17841855
(char *) event_chunk->tag,
17851856
flb_sds_len(event_chunk->tag),
17861857
event_chunk->data, event_chunk->size,
1787-
&dynamic_tenant_id->value);
1858+
&dynamic_tenant_id->value,
1859+
remove_mpa_entry->mpa);
17881860

17891861
if (!payload) {
17901862
flb_plg_error(ctx->ins, "cannot compose request payload");
@@ -1982,6 +2054,21 @@ static void release_dynamic_tenant_ids(struct cfl_list *dynamic_tenant_list)
19822054
}
19832055
}
19842056

2057+
static void release_remove_mpa_entries(struct cfl_list *remove_mpa_list)
2058+
{
2059+
struct cfl_list *iterator;
2060+
struct cfl_list *backup;
2061+
struct flb_loki_remove_mpa_entry *entry;
2062+
2063+
cfl_list_foreach_safe(iterator, backup, remove_mpa_list) {
2064+
entry = cfl_list_entry(iterator,
2065+
struct flb_loki_remove_mpa_entry,
2066+
_head);
2067+
2068+
remove_mpa_entry_destroy(entry);
2069+
}
2070+
}
2071+
19852072
static int cb_loki_exit(void *data, struct flb_config *config)
19862073
{
19872074
struct flb_loki *ctx = data;
@@ -1996,6 +2083,12 @@ static int cb_loki_exit(void *data, struct flb_config *config)
19962083

19972084
pthread_mutex_unlock(&ctx->dynamic_tenant_list_lock);
19982085

2086+
pthread_mutex_lock(&ctx->remove_mpa_list_lock);
2087+
2088+
release_remove_mpa_entries(&ctx->remove_mpa_list);
2089+
2090+
pthread_mutex_unlock(&ctx->remove_mpa_list_lock);
2091+
19992092
loki_config_destroy(ctx);
20002093

20012094
return 0;
@@ -2147,7 +2240,8 @@ static int cb_loki_format_test(struct flb_config *config,
21472240

21482241
payload = loki_compose_payload(ctx, total_records,
21492242
(char *) tag, tag_len, data, bytes,
2150-
&dynamic_tenant_id);
2243+
&dynamic_tenant_id,
2244+
ctx->remove_mpa);
21512245
if (payload == NULL) {
21522246
if (dynamic_tenant_id != NULL) {
21532247
flb_sds_destroy(dynamic_tenant_id);

plugins/out_loki/loki.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ struct flb_loki {
100100
struct cfl_list dynamic_tenant_list;
101101
pthread_mutex_t dynamic_tenant_list_lock;
102102

103+
struct cfl_list remove_mpa_list;
104+
pthread_mutex_t remove_mpa_list_lock;
105+
103106
/* Upstream Context */
104107
struct flb_upstream *u;
105108

0 commit comments

Comments
 (0)