Skip to content

Commit 2ff7c87

Browse files
committed
feat: doris be connection pool
Signed-off-by: composer <[email protected]>
1 parent 600e596 commit 2ff7c87

File tree

3 files changed

+75
-18
lines changed

3 files changed

+75
-18
lines changed

plugins/out_doris/doris.c

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,17 +113,36 @@ static int http_put(struct flb_out_doris *ctx,
113113
msgpack_object msg_key;
114114
msgpack_object msg_val;
115115

116+
char address[1024] = {0};
117+
int len = 0;
118+
116119
/* Get upstream context and connection */
117-
if (strcmp(host, ctx->host) == 0 && port == ctx->port) {
120+
if (strcmp(host, ctx->host) == 0 && port == ctx->port) { // address in config
118121
u = ctx->u;
119122
}
120-
else {
121-
// TODO cache
122-
u = flb_upstream_create(ctx->u->base.config,
123-
host,
124-
port,
125-
ctx->u->base.flags,
126-
ctx->u->base.tls_context);
123+
else { // redirected address
124+
len = snprintf(address, sizeof(address), "%s:%i", host, port);
125+
u = flb_hash_table_get_ptr(ctx->u_pool, address, len);
126+
if (!u) { // first check
127+
pthread_mutex_lock(&ctx->mutex); // lock
128+
u = flb_hash_table_get_ptr(ctx->u_pool, address, len);
129+
if (!u) { // second check
130+
u = flb_upstream_create(ctx->u->base.config,
131+
host,
132+
port,
133+
ctx->u->base.flags,
134+
ctx->u->base.tls_context);
135+
if (u) {
136+
flb_hash_table_add(ctx->u_pool, address, len, u, 0);
137+
}
138+
}
139+
pthread_mutex_unlock(&ctx->mutex); // unlock
140+
if (!u) {
141+
flb_plg_error(ctx->ins, "no doris be connections available to %s:%i",
142+
host, port);
143+
return FLB_RETRY;
144+
}
145+
}
127146
}
128147
u_conn = flb_upstream_conn_get(u);
129148
if (!u_conn) {
@@ -276,11 +295,6 @@ static int http_put(struct flb_out_doris *ctx,
276295
/* Release the TCP connection */
277296
flb_upstream_conn_release(u_conn);
278297

279-
/* Release flb_upstream */
280-
if (u != ctx->u) {
281-
flb_upstream_destroy(u);
282-
}
283-
284298
return out_ret;
285299
}
286300

@@ -341,7 +355,7 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
341355

342356
if (ctx->add_label) {
343357
/* {label_prefix}_{db}_{table}_{timestamp}_{uuid} */
344-
len = snprintf(label, sizeof(label) - 1, "%s_%s_%s_%lu_", ctx->label_prefix, ctx->database, ctx->table, cfl_time_now() / 1000000000L);
358+
len = snprintf(label, sizeof(label), "%s_%s_%s_%lu_", ctx->label_prefix, ctx->database, ctx->table, cfl_time_now() / 1000000000L);
345359
flb_utils_uuid_v4_gen(label + len);
346360
len += 36;
347361
}

plugins/out_doris/doris.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ struct flb_out_doris {
6262
/* Upstream connection to the backend server */
6363
struct flb_upstream *u;
6464

65+
/* doris be connection pool, key: string* be_address, value: flb_upstream* u */
66+
struct flb_hash_table *u_pool;
67+
pthread_mutex_t mutex;
68+
int mutex_initialized;
69+
6570
/* Plugin instance */
6671
struct flb_output_instance *ins;
6772
};

plugins/out_doris/doris_conf.c

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
7575
int io_flags = 0;
7676
const char *tmp;
7777
struct flb_upstream *upstream;
78+
struct flb_hash_table *u_pool;
7879
struct flb_out_doris *ctx = NULL;
7980
struct flb_doris_progress_reporter *reporter = NULL;
8081

@@ -177,6 +178,20 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
177178
/* Set instance flags into upstream */
178179
flb_output_upstream_set(ctx->u, ins);
179180

181+
/* doris be connection pool */
182+
u_pool = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 16, -1);
183+
if (!u_pool) {
184+
flb_doris_conf_destroy(ctx);
185+
return NULL;
186+
}
187+
ctx->u_pool = u_pool;
188+
if (pthread_mutex_init(&ctx->mutex, NULL) == 0) {
189+
ctx->mutex_initialized = 1;
190+
} else {
191+
flb_doris_conf_destroy(ctx);
192+
return NULL;
193+
}
194+
180195
/* create and start the progress reporter */
181196
if (ctx->log_progress_interval > 0) {
182197
reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter));
@@ -207,15 +222,38 @@ void flb_doris_conf_destroy(struct flb_out_doris *ctx)
207222
return;
208223
}
209224

210-
if (ctx->u) {
211-
flb_upstream_destroy(ctx->u);
212-
}
213-
214225
if (ctx->reporter) {
215226
ctx->reporter->running = 0;
216227
pthread_join(ctx->reporter_thread, NULL);
217228
flb_free(ctx->reporter);
218229
}
219230

231+
if (ctx->u) {
232+
flb_upstream_destroy(ctx->u);
233+
}
234+
235+
if (ctx->u_pool) {
236+
int i;
237+
struct mk_list *tmp;
238+
struct mk_list *head;
239+
struct flb_hash_table_entry *entry;
240+
struct flb_hash_table_chain *table;
241+
242+
for (i = 0; i < ctx->u_pool->size; i++) {
243+
table = &ctx->u_pool->table[i];
244+
mk_list_foreach_safe(head, tmp, &table->chains) {
245+
entry = mk_list_entry(head, struct flb_hash_table_entry, _head);
246+
flb_upstream_destroy((struct flb_upstream*) entry->val);
247+
entry->val = NULL;
248+
}
249+
}
250+
251+
flb_hash_table_destroy(ctx->u_pool);
252+
}
253+
254+
if (ctx->mutex_initialized) {
255+
pthread_mutex_destroy(&ctx->mutex);
256+
}
257+
220258
flb_free(ctx);
221259
}

0 commit comments

Comments
 (0)