Skip to content

Commit 724940e

Browse files
drbugfinder-workedsiper
authored andcommitted
filter_log_to_metrics: Add timer callback for emitting metrics
This commit will change the log_to_metrics filter to use a timer based metric inject and not directly inject metrics on every incoming log record anymore. This will lower the overall load and memory consumption especially in high-volume and high-cardinality situations. Signed-off-by: Richard Treu <[email protected]>
1 parent 0665dd9 commit 724940e

File tree

2 files changed

+105
-9
lines changed

2 files changed

+105
-9
lines changed

plugins/filter_log_to_metrics/log_to_metrics.c

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,35 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
450450
return label_counter;
451451
}
452452

453+
/* Timer callback to inject metrics into the pipeline */
454+
static void cb_send_metric_chunk(struct flb_config *config, void *data)
455+
{
456+
int ret;
457+
struct log_to_metrics_ctx *ctx = data;
458+
459+
/* Check that metric context is not empty */
460+
if (ctx->cmt == NULL || ctx->input_ins == NULL) {
461+
return;
462+
}
463+
464+
if (ctx->new_data) {
465+
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
466+
strlen(ctx->tag), ctx->cmt);
467+
if (ret != 0) {
468+
flb_plg_error(ctx->ins, "could not append metrics");
469+
}
470+
}
471+
472+
/* Check if we are shutting down. If so, stop our timer */
473+
if (config->is_shutting_down) {
474+
if(ctx->timer && ctx->timer->active) {
475+
flb_plg_debug(ctx->ins, "Stopping callback timer");
476+
flb_sched_timer_cb_disable(ctx->timer);
477+
}
478+
}
479+
ctx->new_data = FLB_FALSE;
480+
}
481+
453482
static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
454483
struct flb_config *config, void *data)
455484
{
@@ -462,6 +491,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
462491
char metric_subsystem[MAX_METRIC_LENGTH];
463492
char value_field[MAX_METRIC_LENGTH];
464493
struct flb_input_instance *input_ins;
494+
struct flb_sched *sched;
465495

466496

467497
int i;
@@ -676,7 +706,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
676706
}
677707

678708
ret = flb_input_name_exists(ctx->emitter_name, config);
679-
if (ret == FLB_TRUE) {
709+
if (ret) {
680710
flb_plg_error(f_ins, "emitter_name '%s' already exists",
681711
ctx->emitter_name);
682712
flb_sds_destroy(ctx->emitter_name);
@@ -732,6 +762,43 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
732762
}
733763
ctx->input_ins = input_ins;
734764

765+
766+
if (ctx->flush_interval_sec <= 0) {
767+
ctx->flush_interval_sec = strtol(DEFAULT_INTERVAL_SEC, NULL, 10);
768+
}
769+
if (ctx->flush_interval_nsec <= 0) {
770+
ctx->flush_interval_nsec = strtol(DEFAULT_INTERVAL_NSEC, NULL, 10);
771+
}
772+
if (ctx->flush_interval_sec == 0 && ctx->flush_interval_nsec == 0) {
773+
flb_plg_debug(ctx->ins, "Interval is set to 0, will not use timer and "
774+
"send metrics immediately");
775+
ctx->timer_mode = FLB_FALSE;
776+
return 0;
777+
}
778+
779+
/* Initialize timer for scheduled metric updates */
780+
sched = flb_sched_ctx_get();
781+
if(sched == 0) {
782+
flb_plg_error(f_ins, "could not get scheduler context");
783+
log_to_metrics_destroy(ctx);
784+
return -1;
785+
}
786+
/* Convert flush_interval_sec and flush_interval_nsec to milliseconds */
787+
ctx->timer_interval = (ctx->flush_interval_sec * 1000) +
788+
(ctx->flush_interval_nsec / 1000000);
789+
flb_plg_debug(ctx->ins,
790+
"Creating metric timer with frequency %d ms",
791+
ctx->timer_interval);
792+
793+
ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
794+
ctx->timer_interval, cb_send_metric_chunk,
795+
ctx, &ctx->timer);
796+
if (ret < 0) {
797+
flb_plg_error(f_ins, "could not create timer callback");
798+
log_to_metrics_destroy(ctx);
799+
return -1;
800+
}
801+
ctx->timer_mode = FLB_TRUE;
735802
return 0;
736803
}
737804

@@ -923,9 +990,17 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
923990
return -1;
924991
}
925992

926-
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt);
927-
if (ret != 0) {
928-
flb_plg_error(ctx->ins, "could not append metrics");
993+
if (!ctx->timer_mode) {
994+
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
995+
strlen(ctx->tag), ctx->cmt);
996+
997+
if (ret != 0) {
998+
flb_plg_error(ctx->ins, "could not append metrics. "
999+
"Please consider to use flush_interval_sec and flush_interval_nsec");
1000+
}
1001+
}
1002+
else {
1003+
ctx->new_data = FLB_TRUE;
9291004
}
9301005

9311006
/* Cleanup */
@@ -944,6 +1019,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
9441019
}
9451020
}
9461021

1022+
9471023
if (ctx->discard_logs) {
9481024
*out_buf = NULL;
9491025
*out_size = 0;
@@ -961,7 +1037,10 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
9611037
static int cb_log_to_metrics_exit(void *data, struct flb_config *config)
9621038
{
9631039
struct log_to_metrics_ctx *ctx = data;
964-
1040+
if(ctx->timer != NULL) {
1041+
flb_plg_debug(ctx->ins, "Destroying callback timer");
1042+
flb_sched_timer_destroy(ctx->timer);
1043+
}
9651044
return log_to_metrics_destroy(ctx);
9661045
}
9671046

@@ -1040,13 +1119,24 @@ static struct flb_config_map config_map[] = {
10401119
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_name),
10411120
"Name of the emitter (advanced users)"
10421121
},
1043-
10441122
{
10451123
FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_MEM_BUF_LIMIT_DEFAULT,
10461124
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_mem_buf_limit),
10471125
"set a buffer limit to restrict memory usage of metrics emitter"
10481126
},
1049-
1127+
{
1128+
FLB_CONFIG_MAP_INT, "flush_interval_sec", DEFAULT_INTERVAL_SEC,
1129+
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, flush_interval_sec),
1130+
"Set the timer interval for metrics emission. If flush_interval_sec and "
1131+
"flush_interval_nsec are set to 0, the timer is disabled (default)."
1132+
},
1133+
{
1134+
FLB_CONFIG_MAP_INT, "flush_interval_nsec", DEFAULT_INTERVAL_NSEC,
1135+
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, flush_interval_nsec),
1136+
"Set the timer interval (subseconds) for metrics emission. "
1137+
"If flush_interval_sec and flush_interval_nsec are set to 0, the timer is disabled "
1138+
"(default). Final precision is milliseconds."
1139+
},
10501140
{
10511141
FLB_CONFIG_MAP_BOOL, "discard_logs", "false",
10521142
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, discard_logs),

plugins/filter_log_to_metrics/log_to_metrics.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@
5050

5151
#define FLB_MEM_BUF_LIMIT_DEFAULT "10M"
5252
#define DEFAULT_LOG_TO_METRICS_NAMESPACE "log_metric"
53-
53+
#define DEFAULT_INTERVAL_SEC "0"
54+
#define DEFAULT_INTERVAL_NSEC "0"
5455

5556
struct log_to_metrics_ctx {
5657
struct mk_list rules;
5758
struct flb_filter_instance *ins;
5859
struct cmt *cmt;
59-
6060
struct flb_input_instance *input_ins;
6161

6262
char **label_keys;
@@ -84,6 +84,12 @@ struct log_to_metrics_ctx {
8484
flb_sds_t tag;
8585
flb_sds_t emitter_name;
8686
size_t emitter_mem_buf_limit;
87+
long flush_interval_sec;
88+
long flush_interval_nsec;
89+
int timer_interval;
90+
int timer_mode;
91+
struct flb_sched_timer *timer;
92+
int new_data;
8793
};
8894

8995
struct grep_rule

0 commit comments

Comments
 (0)