2727#include <fluent-bit/flb_mem.h>
2828#include <fluent-bit/flb_log.h>
2929#include <fluent-bit/flb_pipe.h>
30+ #include <fluent-bit/flb_pthread.h>
3031#include <fluent-bit/flb_ring_buffer.h>
3132#include <fluent-bit/flb_engine_macros.h>
3233
@@ -52,6 +53,7 @@ struct flb_ring_buffer *flb_ring_buffer_create(uint64_t size)
5253 return NULL ;
5354 }
5455 rb -> data_size = size ;
56+ pthread_mutex_init (& rb -> pth_mutex , NULL );
5557
5658 /* lwrb context */
5759 lwrb = flb_malloc (sizeof (lwrb_t ));
@@ -165,15 +167,19 @@ int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size)
165167 size_t ret ;
166168 size_t av ;
167169
170+ pthread_mutex_lock (& rb -> pth_mutex );
171+
168172 /* make sure there is enough space available */
169173 av = lwrb_get_free (rb -> ctx );
170174 if (av < size ) {
175+ pthread_mutex_unlock (& rb -> pth_mutex );
171176 return -1 ;
172177 }
173178
174179 /* write the content */
175180 ret = lwrb_write (rb -> ctx , ptr , size );
176181 if (ret == 0 ) {
182+ pthread_mutex_unlock (& rb -> pth_mutex );
177183 return -1 ;
178184 }
179185
@@ -186,15 +192,16 @@ int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size)
186192 flb_pipe_write_all (rb -> signal_channels [1 ], "." , 1 );
187193 }
188194 }
189-
195+ pthread_mutex_unlock ( & rb -> pth_mutex );
190196 return 0 ;
191197}
192198
193199int flb_ring_buffer_read (struct flb_ring_buffer * rb , void * ptr , size_t size )
194200{
195201 size_t ret ;
196-
202+ pthread_mutex_lock ( & rb -> pth_mutex );
197203 ret = lwrb_read (rb -> ctx , ptr , size );
204+ pthread_mutex_unlock (& rb -> pth_mutex );
198205 if (ret == 0 ) {
199206 return -1 ;
200207 }
0 commit comments