Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/fluent-bit/flb_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ extern FLB_TLS_DEFINE(struct flb_log, flb_log_ctx)
#define FLB_LOG_EVENT MK_EVENT_NOTIFICATION
#define FLB_LOG_MNG 1024


#define FLB_LOG_MNG_TERMINATION_SIGNAL 1
#define FLB_LOG_MNG_REFRESH_SIGNAL 2


#define FLB_LOG_CACHE_ENTRIES 10
#define FLB_LOG_CACHE_TEXT_BUF_SIZE 1024

Expand Down
89 changes: 77 additions & 12 deletions src/flb_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,43 @@ struct log_message {
char msg[4096 - sizeof(size_t)];
};

static inline int consume_byte(flb_pipefd_t fd)
static inline int64_t flb_log_consume_signal(struct flb_log *context)
{
int ret;
uint64_t val;
int64_t signal_value;
int result;

result = flb_pipe_r(context->ch_mng[0],
&signal_value,
sizeof(signal_value));

/* We need to consume the byte */
ret = flb_pipe_r(fd, &val, sizeof(val));
if (ret <= 0) {
if (result <= 0) {
flb_errno();

return -1;
}

return 0;
return signal_value;
}

static inline int flb_log_enqueue_signal(struct flb_log *context,
int64_t signal_value)
{
int result;

result = flb_pipe_w(context->ch_mng[1],
&signal_value,
sizeof(signal_value));

if (result <= 0) {
flb_errno();

result = 1;
}
else {
result = 0;
}

return result;
}

static inline int log_push(struct log_message *msg, struct flb_log *log)
Expand Down Expand Up @@ -95,15 +119,19 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log)
* we can trust we will always get a full message on each read(2).
*/
bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message));

if (bytes <= 0) {
flb_errno();

return -1;
}
if (msg.size > sizeof(msg.msg)) {
fprintf(stderr, "[log] message too long: %zi > %zi",
msg.size, sizeof(msg.msg));

return -1;
}

log_push(&msg, log);

return bytes;
Expand All @@ -115,6 +143,7 @@ static void log_worker_collector(void *data)
int run = FLB_TRUE;
struct mk_event *event = NULL;
struct flb_log *log = data;
int64_t signal_value;

FLB_TLS_INIT(flb_log_ctx);
FLB_TLS_SET(flb_log_ctx, log);
Expand All @@ -129,13 +158,31 @@ static void log_worker_collector(void *data)

while (run) {
mk_event_wait(log->evl);

mk_event_foreach(event, log->evl) {
if (event->type == FLB_LOG_EVENT) {
log_read(event->fd, log);
}
else if (event->type == FLB_LOG_MNG) {
consume_byte(event->fd);
run = FLB_FALSE;
signal_value = flb_log_consume_signal(log);

if (signal_value == FLB_LOG_MNG_TERMINATION_SIGNAL) {
run = FLB_FALSE;
}
else if (signal_value == FLB_LOG_MNG_REFRESH_SIGNAL) {
/* This signal is only used to
* break the loop when a new client is
* added in order to prevent a deadlock
* that happens if the newly added pipes capacity
* is exceeded during the initialization process
* of a threaded input plugin which causes write
* to block (until the logger thread consumes
* the buffered data) which in turn keeps the
* thread from triggering the status set
* condition which causes the main thread to
* lock indefinitely as described in issue 9667.
*/
}
}
}
}
Expand Down Expand Up @@ -326,18 +373,35 @@ int flb_log_worker_init(struct flb_worker *worker)
/* Register the read-end of the pipe (log[0]) into the event loop */
ret = mk_event_add(log->evl, worker->log[0],
FLB_LOG_EVENT, MK_EVENT_READ, &worker->event);

if (ret == -1) {
flb_pipe_destroy(worker->log);

return -1;
}

ret = flb_log_enqueue_signal(log, FLB_LOG_MNG_REFRESH_SIGNAL);

if (ret == -1) {
mk_event_del(log->evl, &worker->event);

flb_pipe_destroy(worker->log);

return -1;
}

/* Log cache to reduce noise */
cache = flb_log_cache_create(10, FLB_LOG_CACHE_ENTRIES);
if (!cache) {
mk_event_del(log->evl, &worker->event);

flb_pipe_destroy(worker->log);

return -1;
}

worker->log_cache = cache;

return 0;
}

Expand Down Expand Up @@ -431,6 +495,7 @@ struct flb_log *flb_log_create(struct flb_config *config, int type,
/* Register channel manager into the event loop */
ret = mk_event_add(log->evl, log->ch_mng[0],
FLB_LOG_MNG, MK_EVENT_READ, &log->event);

if (ret == -1) {
fprintf(stderr, "[log] could not register event\n");
mk_event_loop_destroy(log->evl);
Expand Down Expand Up @@ -650,6 +715,7 @@ void flb_log_print(int type, const char *file, int line, const char *fmt, ...)
w = flb_worker_get();
if (w) {
n = flb_pipe_write_all(w->log[1], &msg, sizeof(msg));

if (n == -1) {
fprintf(stderr, "%s", (char *) msg.msg);
perror("write");
Expand Down Expand Up @@ -681,10 +747,9 @@ int flb_errno_print(int errnum, const char *file, int line)

int flb_log_destroy(struct flb_log *log, struct flb_config *config)
{
uint64_t val = FLB_TRUE;

/* Signal the child worker, stop working */
flb_pipe_w(log->ch_mng[1], &val, sizeof(val));
flb_log_enqueue_signal(log, FLB_LOG_MNG_TERMINATION_SIGNAL);

pthread_join(log->tid, NULL);

/* Release resources */
Expand Down