Skip to content

Commit b1b0635

Browse files
committed
fix: reporter
Signed-off-by: composer <[email protected]>
1 parent ada01de commit b1b0635

File tree

3 files changed

+20
-7
lines changed

3 files changed

+20
-7
lines changed

plugins/out_doris/doris.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@
4848

4949
static inline void sync_fetch_and_add(size_t *dest, size_t value) {
5050
#ifdef FLB_SYSTEM_WINDOWS
51-
InterlockedAdd(dest, value);
51+
#ifdef _WIN64
52+
InterlockedAdd64((LONG64*)dest, (LONG64)value);
53+
#else
54+
InterlockedAdd((LONG*)dest, (LONG)value);
55+
#endif
5256
#else
5357
__sync_fetch_and_add(dest, value);
5458
#endif

plugins/out_doris/doris.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <fluent-bit/flb_pthread.h>
2424

2525
struct flb_doris_progress_reporter {
26+
volatile int running;
2627
size_t total_bytes;
2728
size_t total_rows;
2829
size_t failed_rows;

plugins/out_doris/doris_conf.c

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,10 @@ void *report(void *c) {
3838
size_t cur_time, cur_bytes, cur_rows, total_time, total_speed_mbps, total_speed_rps;
3939
size_t inc_bytes, inc_rows, inc_time, inc_speed_mbps, inc_speed_rps;
4040

41-
pthread_detach(pthread_self());
42-
4341
flb_plg_info(ctx->ins, "Start progress reporter with interval %d", ctx->log_progress_interval);
4442

45-
while (ctx->log_progress_interval > 0) {
46-
sleep(ctx->log_progress_interval);
43+
while (ctx->reporter->running && ctx->log_progress_interval > 0) {
44+
flb_time_msleep(ctx->log_progress_interval * 1000);
4745

4846
cur_time = cfl_time_now() / 1000000000L;
4947
cur_bytes = ctx->reporter->total_bytes;
@@ -100,15 +98,23 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
10098
/* Validate */
10199
if (!ctx->endpoint_type || (strcasecmp(ctx->endpoint_type, "fe") != 0 && strcasecmp(ctx->endpoint_type, "be") != 0)) {
102100
flb_plg_error(ins, "endpoint_type is invalid");
101+
flb_free(ctx);
102+
return NULL;
103103
}
104104
if (!ctx->user) {
105105
flb_plg_error(ins, "user is not set");
106+
flb_free(ctx);
107+
return NULL;
106108
}
107109
if (!ctx->database) {
108110
flb_plg_error(ins, "database is not set");
111+
flb_free(ctx);
112+
return NULL;
109113
}
110114
if (!ctx->table) {
111115
flb_plg_error(ins, "table is not set");
116+
flb_free(ctx);
117+
return NULL;
112118
}
113119

114120
/* Check if SSL/TLS is enabled */
@@ -179,12 +185,13 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
179185
flb_doris_conf_destroy(ctx);
180186
return NULL;
181187
}
188+
reporter->running = 1;
182189
reporter->total_bytes = 0;
183190
reporter->total_rows = 0;
184191
reporter->failed_rows = 0;
185192
ctx->reporter = reporter;
186193

187-
if(pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) {
194+
if (pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) {
188195
flb_plg_error(ins, "failed to create progress reporter");
189196
flb_doris_conf_destroy(ctx);
190197
return NULL;
@@ -205,7 +212,8 @@ void flb_doris_conf_destroy(struct flb_out_doris *ctx)
205212
}
206213

207214
if (ctx->reporter) {
208-
pthread_cancel(ctx->reporter_thread);
215+
ctx->reporter->running = 0;
216+
pthread_join(ctx->reporter_thread, NULL);
209217
flb_free(ctx->reporter);
210218
}
211219

0 commit comments

Comments
 (0)