From e67d452dda4aa519fc277160fba84ca75a8adf83 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Mon, 10 Feb 2025 19:46:08 +0100 Subject: [PATCH] log: added a new management signal to overcome a dedadlock Signed-off-by: Leonardo Alminana --- include/fluent-bit/flb_log.h | 5 ++ src/flb_log.c | 89 +++++++++++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/include/fluent-bit/flb_log.h b/include/fluent-bit/flb_log.h index 48e8ff74966..96cbf68326e 100644 --- a/include/fluent-bit/flb_log.h +++ b/include/fluent-bit/flb_log.h @@ -54,6 +54,11 @@ extern FLB_TLS_DEFINE(struct flb_log, flb_log_ctx) #define FLB_LOG_EVENT MK_EVENT_NOTIFICATION #define FLB_LOG_MNG 1024 + +#define FLB_LOG_MNG_TERMINATION_SIGNAL 1 +#define FLB_LOG_MNG_REFRESH_SIGNAL 2 + + #define FLB_LOG_CACHE_ENTRIES 10 #define FLB_LOG_CACHE_TEXT_BUF_SIZE 1024 diff --git a/src/flb_log.c b/src/flb_log.c index f9c7bfbc15d..c5c73d40ad0 100644 --- a/src/flb_log.c +++ b/src/flb_log.c @@ -47,19 +47,43 @@ struct log_message { char msg[4096 - sizeof(size_t)]; }; -static inline int consume_byte(flb_pipefd_t fd) +static inline int64_t flb_log_consume_signal(struct flb_log *context) { - int ret; - uint64_t val; + int64_t signal_value; + int result; + + result = flb_pipe_r(context->ch_mng[0], + &signal_value, + sizeof(signal_value)); - /* We need to consume the byte */ - ret = flb_pipe_r(fd, &val, sizeof(val)); - if (ret <= 0) { + if (result <= 0) { flb_errno(); + return -1; } - return 0; + return signal_value; +} + +static inline int flb_log_enqueue_signal(struct flb_log *context, + int64_t signal_value) +{ + int result; + + result = flb_pipe_w(context->ch_mng[1], + &signal_value, + sizeof(signal_value)); + + if (result <= 0) { + flb_errno(); + + result = 1; + } + else { + result = 0; + } + + return result; } static inline int log_push(struct log_message *msg, struct flb_log *log) @@ -95,15 +119,19 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log) * we can trust we will always get a full message on each read(2). */ bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message)); + if (bytes <= 0) { flb_errno(); + return -1; } if (msg.size > sizeof(msg.msg)) { fprintf(stderr, "[log] message too long: %zi > %zi", msg.size, sizeof(msg.msg)); + return -1; } + log_push(&msg, log); return bytes; @@ -115,6 +143,7 @@ static void log_worker_collector(void *data) int run = FLB_TRUE; struct mk_event *event = NULL; struct flb_log *log = data; + int64_t signal_value; FLB_TLS_INIT(flb_log_ctx); FLB_TLS_SET(flb_log_ctx, log); @@ -129,13 +158,31 @@ static void log_worker_collector(void *data) while (run) { mk_event_wait(log->evl); + mk_event_foreach(event, log->evl) { if (event->type == FLB_LOG_EVENT) { log_read(event->fd, log); } else if (event->type == FLB_LOG_MNG) { - consume_byte(event->fd); - run = FLB_FALSE; + signal_value = flb_log_consume_signal(log); + + if (signal_value == FLB_LOG_MNG_TERMINATION_SIGNAL) { + run = FLB_FALSE; + } + else if (signal_value == FLB_LOG_MNG_REFRESH_SIGNAL) { + /* This signal is only used to + * break the loop when a new client is + * added in order to prevent a deadlock + * that happens if the newly added pipes capacity + * is exceeded during the initialization process + * of a threaded input plugin which causes write + * to block (until the logger thread consumes + * the buffered data) which in turn keeps the + * thread from triggering the status set + * condition which causes the main thread to + * lock indefinitely as described in issue 9667. + */ + } } } } @@ -326,18 +373,35 @@ int flb_log_worker_init(struct flb_worker *worker) /* Register the read-end of the pipe (log[0]) into the event loop */ ret = mk_event_add(log->evl, worker->log[0], FLB_LOG_EVENT, MK_EVENT_READ, &worker->event); + if (ret == -1) { flb_pipe_destroy(worker->log); + + return -1; + } + + ret = flb_log_enqueue_signal(log, FLB_LOG_MNG_REFRESH_SIGNAL); + + if (ret == -1) { + mk_event_del(log->evl, &worker->event); + + flb_pipe_destroy(worker->log); + return -1; } /* Log cache to reduce noise */ cache = flb_log_cache_create(10, FLB_LOG_CACHE_ENTRIES); if (!cache) { + mk_event_del(log->evl, &worker->event); + flb_pipe_destroy(worker->log); + return -1; } + worker->log_cache = cache; + return 0; } @@ -431,6 +495,7 @@ struct flb_log *flb_log_create(struct flb_config *config, int type, /* Register channel manager into the event loop */ ret = mk_event_add(log->evl, log->ch_mng[0], FLB_LOG_MNG, MK_EVENT_READ, &log->event); + if (ret == -1) { fprintf(stderr, "[log] could not register event\n"); mk_event_loop_destroy(log->evl); @@ -650,6 +715,7 @@ void flb_log_print(int type, const char *file, int line, const char *fmt, ...) w = flb_worker_get(); if (w) { n = flb_pipe_write_all(w->log[1], &msg, sizeof(msg)); + if (n == -1) { fprintf(stderr, "%s", (char *) msg.msg); perror("write"); @@ -681,10 +747,9 @@ int flb_errno_print(int errnum, const char *file, int line) int flb_log_destroy(struct flb_log *log, struct flb_config *config) { - uint64_t val = FLB_TRUE; - /* Signal the child worker, stop working */ - flb_pipe_w(log->ch_mng[1], &val, sizeof(val)); + flb_log_enqueue_signal(log, FLB_LOG_MNG_TERMINATION_SIGNAL); + pthread_join(log->tid, NULL); /* Release resources */