diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 0d75d76df40..65ddd9f4427 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -177,6 +177,11 @@ struct flb_config { int hc_errors_count; /* health check error counts as unhealthy*/ int hc_retry_failure_count; /* health check retry failures count as unhealthy*/ int health_check_period; /* period by second for health status check */ + int hc_throughput; /* if throughput check is enabled */ + char *hc_throughput_input_plugins; /* which input plugins should be considered for checking throughput */ + char *hc_throughput_output_plugins;/* which output plugins should be considered for checking throughput */ + double hc_throughput_ratio_threshold; /* output/input ratio threshold to consider a failure */ + int hc_throughput_min_failures; /* minimum amount of failures to cause error condition */ #endif /* @@ -317,6 +322,11 @@ enum conf_type { #define FLB_CONF_STR_HC_ERRORS_COUNT "HC_Errors_Count" #define FLB_CONF_STR_HC_RETRIES_FAILURE_COUNT "HC_Retry_Failure_Count" #define FLB_CONF_STR_HC_PERIOD "HC_Period" +#define FLB_CONF_STR_HC_THROUGHPUT "HC_Throughput" +#define FLB_CONF_STR_HC_THROUGHPUT_IN_PLUGINS "HC_Throughput_Input_Plugins" +#define FLB_CONF_STR_HC_THROUGHPUT_OUT_PLUGINS "HC_Throughput_Output_Plugins" +#define FLB_CONF_STR_HC_THROUGHPUT_RATIO_THRESHOLD "HC_Throughput_Ratio_Threshold" +#define FLB_CONF_STR_HC_THROUGHPUT_MIN_FAILURES "HC_Throughput_Min_Failures" #endif /* !FLB_HAVE_HTTP_SERVER */ #ifdef FLB_HAVE_CHUNK_TRACE diff --git a/include/fluent-bit/flb_time.h b/include/fluent-bit/flb_time.h index cf672dcd64f..5555829db26 100644 --- a/include/fluent-bit/flb_time.h +++ b/include/fluent-bit/flb_time.h @@ -90,6 +90,7 @@ int flb_time_msleep(uint32_t ms); double flb_time_to_double(struct flb_time *tm); uint64_t flb_time_to_nanosec(struct flb_time *tm); uint64_t flb_time_to_millisec(struct flb_time *tm); +uint64_t flb_time_to_seconds(struct flb_time *tm); int flb_time_add(struct flb_time *base, struct flb_time *duration, struct flb_time *result); int flb_time_diff(struct flb_time *time1, diff --git a/src/flb_config.c b/src/flb_config.c index 5ef2e75ef83..8fa4243324b 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -110,6 +110,25 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_INT, offsetof(struct flb_config, health_check_period)}, + {FLB_CONF_STR_HC_THROUGHPUT, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, hc_throughput)}, + + {FLB_CONF_STR_HC_THROUGHPUT_IN_PLUGINS, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, hc_throughput_input_plugins)}, + + {FLB_CONF_STR_HC_THROUGHPUT_OUT_PLUGINS, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, hc_throughput_output_plugins)}, + + {FLB_CONF_STR_HC_THROUGHPUT_RATIO_THRESHOLD, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, hc_throughput_ratio_threshold)}, + + {FLB_CONF_STR_HC_THROUGHPUT_MIN_FAILURES, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, hc_throughput_min_failures)}, #endif /* DNS*/ {FLB_CONF_DNS_MODE, diff --git a/src/flb_time.c b/src/flb_time.c index 92482124857..450646b6fa3 100644 --- a/src/flb_time.c +++ b/src/flb_time.c @@ -101,6 +101,11 @@ uint64_t flb_time_to_millisec(struct flb_time *tm) return (((uint64_t)tm->tm.tv_sec * 1000L) + tm->tm.tv_nsec / 1000000L); } +uint64_t flb_time_to_seconds(struct flb_time *tm) +{ + return (uint64_t)tm->tm.tv_sec; +} + int flb_time_add(struct flb_time *base, struct flb_time *duration, struct flb_time *result) { if (base == NULL || duration == NULL|| result == NULL) { diff --git a/src/http_server/api/v1/health.c b/src/http_server/api/v1/health.c index 713d4b87749..5e403e989b1 100644 --- a/src/http_server/api/v1/health.c +++ b/src/http_server/api/v1/health.c @@ -24,15 +24,53 @@ #include #include #include +#include #include #include #include "health.h" +struct flb_hs_throughput_state throughput_check_state = {0}; + struct flb_health_check_metrics_counter *metrics_counter; pthread_key_t hs_health_key; +static struct flb_hs_throughput_sample *samples_add( + struct flb_hs_throughput_samples *samples) +{ + struct flb_hs_throughput_sample *sample = samples->items + samples->insert; + samples->insert = (samples->insert + 1) % samples->size; + if (samples->count < samples->size) { + samples->count++; + } + return sample; +} + +static int samples_translate_index( + struct flb_hs_throughput_samples *samples, int index) +{ + if (index >= samples->count || index < 0) { + return -1; + } + int end_index = samples->insert; + int start_index = end_index - samples->count; + int modulo = (start_index + index) % samples->size; + return modulo < 0 ? modulo + samples->size : modulo; +} + +static struct flb_hs_throughput_sample *samples_get( + struct flb_hs_throughput_samples *samples, int index) +{ + int real_index = samples_translate_index(samples, index); + if (real_index < 0) { + return NULL; + } + + return samples->items + real_index; +} + + static struct mk_list *hs_health_key_create() { struct mk_list *metrics_list = NULL; @@ -88,6 +126,25 @@ static void counter_init(struct flb_hs *hs) { } +static bool contains_str(struct mk_list *items, msgpack_object_str name) +{ + struct mk_list *head; + struct flb_split_entry *entry; + + if (!items) { + return false; + } + + mk_list_foreach(head, items) { + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (!strncmp(name.ptr, entry->value, name.size)) { + return true; + } + } + + return false; +} + /* * tell what's the current status for health check * One default background is that the metrics received and saved into @@ -113,7 +170,7 @@ static int is_healthy() { } if (mk_list_is_empty(metrics_list) == 0) { - return FLB_TRUE; + return FLB_TRUE && throughput_check_state.healthy; } /* Get the error metrics entry from the start time of current period */ @@ -141,12 +198,18 @@ static int is_healthy() { return FLB_FALSE; } - return FLB_TRUE; + return FLB_TRUE && throughput_check_state.healthy; } /* read the metrics from message queue and update the counter*/ -static void read_metrics(void *data, size_t size, int* error_count, - int* retry_failure_count) +static void read_metrics(void *data, + size_t size, + struct mk_list *input_plugins, + struct mk_list *output_plugins, + int* error_count, + int* retry_failure_count, + uint64_t *input_records, + uint64_t *output_records) { int i; int j; @@ -156,6 +219,8 @@ static void read_metrics(void *data, size_t size, int* error_count, size_t off = 0; int errors = 0; int retry_failure = 0; + uint64_t in_recs = 0; + uint64_t out_recs = 0; msgpack_unpacked_init(&result); msgpack_unpack_next(&result, data, size, &off); @@ -168,16 +233,14 @@ static void read_metrics(void *data, size_t size, int* error_count, /* Keys: input, output */ k = map.via.map.ptr[i].key; v = map.via.map.ptr[i].val; - if (k.via.str.size != sizeof("output") - 1 || - strncmp(k.via.str.ptr, "output", k.via.str.size) != 0) { - continue; - } /* Iterate sub-map */ for (j = 0; j < v.via.map.size; j++) { + msgpack_object sk; msgpack_object sv; /* Keys: plugin name , values: metrics */ + sk = v.via.map.ptr[j].key; sv = v.via.map.ptr[j].val; for (m = 0; m < sv.via.map.size; m++) { @@ -187,14 +250,23 @@ static void read_metrics(void *data, size_t size, int* error_count, mk = sv.via.map.ptr[m].key; mv = sv.via.map.ptr[m].val; - if (mk.via.str.size == sizeof("errors") - 1 && - strncmp(mk.via.str.ptr, "errors", mk.via.str.size) == 0) { - errors += mv.via.u64; + if (!strncmp(k.via.str.ptr, "output", k.via.str.size)) { + if (!strncmp(mk.via.str.ptr, "errors", mk.via.str.size)) { + errors += mv.via.u64; + } + else if (!strncmp(mk.via.str.ptr, "retries_failed", mk.via.str.size)) { + retry_failure += mv.via.u64; + } + else if (!strncmp(mk.via.str.ptr, "proc_records", mk.via.str.size) && + contains_str(output_plugins, sk.via.str)) { + out_recs += mv.via.u64; + } } - else if (mk.via.str.size == sizeof("retries_failed") - 1 && - strncmp(mk.via.str.ptr, "retries_failed", - mk.via.str.size) == 0) { - retry_failure += mv.via.u64; + + if (!strncmp(k.via.str.ptr, "input", k.via.str.size) && + !strncmp(mk.via.str.ptr, "records", mk.via.str.size) && + contains_str(input_plugins, sk.via.str)) { + in_recs += mv.via.u64; } } } @@ -202,6 +274,8 @@ static void read_metrics(void *data, size_t size, int* error_count, *error_count = errors; *retry_failure_count = retry_failure; + *input_records = in_recs; + *output_records = out_recs; msgpack_unpacked_destroy(&result); } @@ -245,6 +319,64 @@ static int cleanup_metrics() return c; } +static int check_throughput_health(uint64_t in_records, + uint64_t out_records, + struct flb_hs_throughput_samples *samples, + double out_in_ratio_threshold) { + int i; + struct flb_time tp; + uint64_t in_rate; + uint64_t out_rate; + double out_in_ratio; + struct flb_hs_throughput_sample *entry; + struct flb_hs_throughput_sample *prev; + struct flb_hs_throughput_sample *sample; + bool healthy; + bool rv; + + flb_time_get(&tp); + + sample = samples_add(samples); + sample->timestamp_seconds = flb_time_to_seconds(&tp); + sample->in_records = in_records; + sample->out_records = out_records; + + flb_debug("[api/v1/health/throughput]: check samples start %d %f", + samples->size, + out_in_ratio_threshold); + + healthy = false; + for (i = samples->count - 1; i > 0; i--) { + entry = samples_get(samples, i); + prev = samples_get(samples, i - 1); + uint64_t timestamp_delta = entry->timestamp_seconds - prev->timestamp_seconds; + if (timestamp_delta == 0) { + /* check against divide by zero */ + continue; + } + in_rate = (entry->in_records - prev->in_records) / timestamp_delta; + out_rate = (entry->out_records - prev->out_records) / timestamp_delta; + out_in_ratio = (double)out_rate / (double)in_rate; + healthy = healthy || out_in_ratio > out_in_ratio_threshold; + + flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f", + out_in_ratio, + out_rate, + in_rate); + + if (healthy) { + break; + } + } + + rv = samples->count < samples->size || healthy; + flb_debug("checking throughput samples stop, result: %s", + rv ? "healthy" :"unhealthy"); + + return rv; +} + + /* * Callback invoked every time some metrics are received through a * message queue channel. This function runs in a Monkey HTTP thread @@ -263,6 +395,8 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) struct mk_list *metrics_list = NULL; int error_count = 0; int retry_failure_count = 0; + uint64_t input_records = 0; + uint64_t output_records = 0; metrics_list = pthread_getspecific(hs_health_key); @@ -286,7 +420,23 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) buf->users = 0; - read_metrics(data, size, &error_count, &retry_failure_count); + read_metrics(data, + size, + throughput_check_state.input_plugins, + throughput_check_state.output_plugins, + &error_count, + &retry_failure_count, + &input_records, + &output_records); + + + if (throughput_check_state.enabled) { + throughput_check_state.healthy = + check_throughput_health(input_records, + output_records, + &throughput_check_state.samples, + throughput_check_state.out_in_ratio_threshold); + } metrics_counter->error_counter = error_count; metrics_counter->retry_failure_counter = retry_failure_count; @@ -314,9 +464,82 @@ static void cb_health(mk_request_t *request, void *data) } } +static void configure_throughput_check(struct flb_config *config) +{ + bool enabled = config->hc_throughput; + + memset(&throughput_check_state, 0, sizeof(throughput_check_state)); + throughput_check_state.enabled = false; + throughput_check_state.healthy = true; + + if (!enabled) { + return; + } + + if (!config->hc_throughput_input_plugins) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_IN_PLUGINS " is required"); + return; + } + if (!config->hc_throughput_output_plugins) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_OUT_PLUGINS " is required"); + return; + } + if (!config->hc_throughput_ratio_threshold) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_RATIO_THRESHOLD " is required"); + return; + } + if (!config->hc_throughput_min_failures) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_MIN_FAILURES " is required"); + return; + } + + throughput_check_state.input_plugins = + flb_utils_split(config->hc_throughput_input_plugins, ',', 0); + + if (!throughput_check_state.input_plugins) { + flb_errno(); + return; + } + + throughput_check_state.output_plugins = + flb_utils_split(config->hc_throughput_output_plugins, ',', 0); + + if (!throughput_check_state.output_plugins) { + flb_free(throughput_check_state.input_plugins); + flb_errno(); + return; + } + + throughput_check_state.out_in_ratio_threshold = config->hc_throughput_ratio_threshold; + throughput_check_state.enabled = true; + + throughput_check_state.samples.items = flb_calloc( + config->hc_throughput_min_failures, + sizeof(struct flb_hs_throughput_sample)); + + if (!throughput_check_state.samples.items) { + flb_free(throughput_check_state.input_plugins); + flb_free(throughput_check_state.output_plugins); + flb_errno(); + return; + } + throughput_check_state.samples.size = config->hc_throughput_min_failures; + + flb_info("[api/v1/health/throughput]: configuration complete. " + "input plugins: %s | " + "output plugins: %s | " + "ratio threshold: %f | " + "min failures: %d", + config->hc_throughput_input_plugins, + config->hc_throughput_output_plugins, + config->hc_throughput_ratio_threshold, + config->hc_throughput_min_failures); +} + /* Perform registration */ int api_v1_health(struct flb_hs *hs) { + configure_throughput_check(hs->config); pthread_key_create(&hs_health_key, hs_health_key_destroy); diff --git a/src/http_server/api/v1/health.h b/src/http_server/api/v1/health.h index 27a826f4321..b6fe4b22c7e 100644 --- a/src/http_server/api/v1/health.h +++ b/src/http_server/api/v1/health.h @@ -65,6 +65,34 @@ struct flb_hs_hc_buf { struct mk_list _head; }; +/* + * in/out records sample at a certain timestamp. + */ +struct flb_hs_throughput_sample { + uint64_t in_records; + uint64_t out_records; + uint64_t timestamp_seconds; +}; + +/* ring buffer + helper functions for storing samples */ +struct flb_hs_throughput_samples { + struct flb_hs_throughput_sample *items; + int size; + int count; + int insert; +}; + +/* throughput health check state */ +struct flb_hs_throughput_state { + int enabled; + struct mk_list *input_plugins; + struct mk_list *output_plugins; + double out_in_ratio_threshold; + + struct flb_hs_throughput_samples samples; + int healthy; +}; + /* health endpoint*/ int api_v1_health(struct flb_hs *hs);