Skip to content

Commit 8e33525

Browse files
committed
refactor: global be pool -> tls be pool
1 parent 480885a commit 8e33525

File tree

3 files changed

+78
-67
lines changed

3 files changed

+78
-67
lines changed

plugins/out_doris/doris.c

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,17 @@ static inline void sync_fetch_and_add(size_t *dest, size_t value) {
5858
#endif
5959
}
6060

61+
/* doris be connection pool, key: be_address, value: flb_upstream with single thread mode */
62+
FLB_TLS_DEFINE(struct flb_hash_table, doris_be_pool);
63+
6164
static int cb_doris_init(struct flb_output_instance *ins,
6265
struct flb_config *config, void *data)
6366
{
6467
struct flb_out_doris *ctx = NULL;
6568
(void) data;
6669

70+
FLB_TLS_INIT(be_pool);
71+
6772
ctx = flb_doris_conf_create(ins, config);
6873
if (!ctx) {
6974
return -1;
@@ -81,6 +86,56 @@ static int cb_doris_init(struct flb_output_instance *ins,
8186
return 0;
8287
}
8388

89+
static int cb_doris_worker_init(void *data, struct flb_config *config)
90+
{
91+
struct flb_hash_table *be_pool;
92+
struct flb_out_doris *ctx = data;
93+
94+
flb_plg_info(ctx->ins, "worker initializing");
95+
96+
be_pool = FLB_TLS_GET(doris_be_pool);
97+
if (!be_pool) {
98+
be_pool = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 16, -1);
99+
if (!be_pool) {
100+
flb_errno();
101+
return -1;
102+
}
103+
FLB_TLS_SET(doris_be_pool, be_pool);
104+
}
105+
106+
return 0;
107+
}
108+
109+
static int cb_doris_worker_exit(void *data, struct flb_config *config)
110+
{
111+
struct flb_hash_table *be_pool;
112+
struct flb_out_doris *ctx = data;
113+
114+
int i;
115+
struct mk_list *tmp;
116+
struct mk_list *head;
117+
struct flb_hash_table_entry *entry;
118+
struct flb_hash_table_chain *table;
119+
120+
flb_plg_info(ctx->ins, "worker exiting");
121+
122+
be_pool = FLB_TLS_GET(doris_be_pool);
123+
if (be_pool) {
124+
for (i = 0; i < be_pool->size; i++) {
125+
table = &be_pool->table[i];
126+
mk_list_foreach_safe(head, tmp, &table->chains) {
127+
entry = mk_list_entry(head, struct flb_hash_table_entry, _head);
128+
flb_upstream_destroy((struct flb_upstream*) entry->val);
129+
entry->val = NULL;
130+
}
131+
}
132+
flb_hash_table_destroy(be_pool);
133+
FLB_TLS_SET(doris_be_pool, NULL);
134+
}
135+
136+
return 0;
137+
}
138+
84139
static int http_put(struct flb_out_doris *ctx,
85140
const char *host, int port,
86141
const void *body, size_t body_len,
@@ -102,6 +157,7 @@ static int http_put(struct flb_out_doris *ctx,
102157
struct flb_config_map_val *mv;
103158
struct flb_slist_entry *key = NULL;
104159
struct flb_slist_entry *val = NULL;
160+
struct flb_hash_table *be_pool;
105161

106162
int i;
107163
int root_type;
@@ -116,28 +172,24 @@ static int http_put(struct flb_out_doris *ctx,
116172
char address[1024] = {0};
117173
int len = 0;
118174

175+
be_pool = FLB_TLS_GET(doris_be_pool);
176+
119177
/* Get upstream context and connection */
120178
if (strcmp(host, ctx->host) == 0 && port == ctx->port) { // address in config
121179
u = ctx->u;
122180
}
123181
else { // redirected address
124182
len = snprintf(address, sizeof(address), "%s:%i", host, port);
125-
u = flb_hash_table_get_ptr(ctx->u_pool, address, len);
126-
if (!u) { // first check
127-
pthread_mutex_lock(&ctx->mutex); // lock
128-
u = flb_hash_table_get_ptr(ctx->u_pool, address, len);
129-
if (!u) { // second check
130-
u = flb_upstream_create(ctx->u->base.config,
131-
host,
132-
port,
133-
ctx->u->base.flags,
134-
ctx->u->base.tls_context);
135-
if (u) {
136-
flb_hash_table_add(ctx->u_pool, address, len, u, 0);
137-
}
138-
}
139-
pthread_mutex_unlock(&ctx->mutex); // unlock
140-
if (!u) {
183+
u = flb_hash_table_get_ptr(be_pool, address, len);
184+
if (!u) {
185+
u = flb_upstream_create(ctx->u->base.config,
186+
host,
187+
port,
188+
ctx->u->base.flags,
189+
ctx->u->base.tls_context);
190+
if (u) {
191+
flb_hash_table_add(be_pool, address, len, u, 0);
192+
} else {
141193
flb_plg_error(ctx->ins, "no doris be connections available to %s:%i",
142194
host, port);
143195
return FLB_RETRY;
@@ -270,7 +322,7 @@ static int http_put(struct flb_out_doris *ctx,
270322
free_buf:
271323
flb_free(out_buf);
272324
msgpack_unpacked_destroy(&result);
273-
parse_done:
325+
parse_done:;
274326
}
275327
else {
276328
out_ret = FLB_RETRY;
@@ -475,13 +527,15 @@ static int cb_doris_format_test(struct flb_config *config,
475527

476528
/* Plugin reference */
477529
struct flb_output_plugin out_doris_plugin = {
478-
.name = "doris",
479-
.description = "Doris Output",
480-
.cb_init = cb_doris_init,
481-
.cb_pre_run = NULL,
482-
.cb_flush = cb_doris_flush,
483-
.cb_exit = cb_doris_exit,
484-
.config_map = config_map,
530+
.name = "doris",
531+
.description = "Doris Output",
532+
.cb_init = cb_doris_init,
533+
.cb_pre_run = NULL,
534+
.cb_flush = cb_doris_flush,
535+
.cb_exit = cb_doris_exit,
536+
.cb_worker_init = cb_doris_worker_init,
537+
.cb_worker_exit = cb_doris_worker_exit,
538+
.config_map = config_map,
485539

486540
/* for testing */
487541
.test_formatter.callback = cb_doris_format_test,

plugins/out_doris/doris.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@ struct flb_out_doris {
6868
/* Upstream connection to the backend server */
6969
struct flb_upstream *u;
7070

71-
/* doris be connection pool, key: string* be_address, value: flb_upstream* u */
72-
struct flb_hash_table *u_pool;
73-
pthread_mutex_t mutex;
74-
int mutex_initialized;
75-
7671
/* Plugin instance */
7772
struct flb_output_instance *ins;
7873
};

plugins/out_doris/doris_conf.c

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
9595
int io_flags = 0;
9696
const char *tmp;
9797
struct flb_upstream *upstream;
98-
struct flb_hash_table *u_pool;
9998
struct flb_out_doris *ctx = NULL;
10099
struct flb_doris_progress_reporter *reporter = NULL;
101100

@@ -198,20 +197,6 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
198197
/* Set instance flags into upstream */
199198
flb_output_upstream_set(ctx->u, ins);
200199

201-
/* doris be connection pool */
202-
u_pool = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 16, -1);
203-
if (!u_pool) {
204-
flb_doris_conf_destroy(ctx);
205-
return NULL;
206-
}
207-
ctx->u_pool = u_pool;
208-
if (pthread_mutex_init(&ctx->mutex, NULL) == 0) {
209-
ctx->mutex_initialized = 1;
210-
} else {
211-
flb_doris_conf_destroy(ctx);
212-
return NULL;
213-
}
214-
215200
/* create and start the progress reporter */
216201
if (ctx->log_progress_interval > 0) {
217202
reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter));
@@ -252,28 +237,5 @@ void flb_doris_conf_destroy(struct flb_out_doris *ctx)
252237
flb_upstream_destroy(ctx->u);
253238
}
254239

255-
if (ctx->u_pool) {
256-
int i;
257-
struct mk_list *tmp;
258-
struct mk_list *head;
259-
struct flb_hash_table_entry *entry;
260-
struct flb_hash_table_chain *table;
261-
262-
for (i = 0; i < ctx->u_pool->size; i++) {
263-
table = &ctx->u_pool->table[i];
264-
mk_list_foreach_safe(head, tmp, &table->chains) {
265-
entry = mk_list_entry(head, struct flb_hash_table_entry, _head);
266-
flb_upstream_destroy((struct flb_upstream*) entry->val);
267-
entry->val = NULL;
268-
}
269-
}
270-
271-
flb_hash_table_destroy(ctx->u_pool);
272-
}
273-
274-
if (ctx->mutex_initialized) {
275-
pthread_mutex_destroy(&ctx->mutex);
276-
}
277-
278240
flb_free(ctx);
279241
}

0 commit comments

Comments
 (0)