2727#include "doris.h"
2828#include "doris_conf.h"
2929
30+ #ifdef FLB_SYSTEM_WINDOWS
31+ #include <winnt.h>
32+ #endif
33+
34+ static inline void atomic_store (volatile int * dest , int val ) {
35+ #ifdef FLB_SYSTEM_WINDOWS
36+ InterlockedExchange ((LONG volatile * ) dest , (LONG ) val );
37+ #else
38+ __atomic_store_n (dest , val , __ATOMIC_RELEASE );
39+ #endif
40+ }
41+
42+ static inline int atomic_load (volatile int * dest ) {
43+ #ifdef FLB_SYSTEM_WINDOWS
44+ return (int ) InterlockedCompareExchange ((LONG volatile * ) dest , 0 , 0 );
45+ #else
46+ return __atomic_load_n (dest , __ATOMIC_ACQUIRE );
47+ #endif
48+ }
49+
3050void * report (void * c ) {
3151 struct flb_out_doris * ctx = (struct flb_out_doris * ) c ;
3252
@@ -40,7 +60,7 @@ void *report(void *c) {
4060
4161 flb_plg_info (ctx -> ins , "Start progress reporter with interval %d" , ctx -> log_progress_interval );
4262
43- while (ctx -> reporter -> running && ctx -> log_progress_interval > 0 ) {
63+ while (atomic_load ( & ctx -> reporter -> running ) && ctx -> log_progress_interval > 0 ) {
4464 flb_time_msleep (ctx -> log_progress_interval * 1000 );
4565
4666 cur_time = cfl_time_now () / 1000000000L ;
@@ -200,7 +220,7 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
200220 flb_doris_conf_destroy (ctx );
201221 return NULL ;
202222 }
203- reporter -> running = 1 ;
223+ atomic_store ( & reporter -> running , 1 ) ;
204224 reporter -> total_bytes = 0 ;
205225 reporter -> total_rows = 0 ;
206226 reporter -> failed_rows = 0 ;
@@ -223,7 +243,7 @@ void flb_doris_conf_destroy(struct flb_out_doris *ctx)
223243 }
224244
225245 if (ctx -> reporter ) {
226- ctx -> reporter -> running = 0 ;
246+ atomic_store ( & ctx -> reporter -> running , 0 ) ;
227247 pthread_join (ctx -> reporter_thread , NULL );
228248 flb_free (ctx -> reporter );
229249 }
0 commit comments