From 3d423f1434727372c0c8364ec10f7aacc54f9cdc Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Fri, 22 Jul 2022 07:51:36 -0300 Subject: [PATCH 1/3] http_server/health: Implement throughput health check Signed-off-by: Thiago Padilha --- include/fluent-bit/flb_config.h | 10 ++ include/fluent-bit/flb_time.h | 1 + src/flb_config.c | 19 +++ src/flb_time.c | 5 + src/http_server/api/v1/health.c | 282 ++++++++++++++++++++++++++++++-- 5 files changed, 301 insertions(+), 16 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 0d75d76df40..65ddd9f4427 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -177,6 +177,11 @@ struct flb_config { int hc_errors_count; /* health check error counts as unhealthy*/ int hc_retry_failure_count; /* health check retry failures count as unhealthy*/ int health_check_period; /* period by second for health status check */ + int hc_throughput; /* if throughput check is enabled */ + char *hc_throughput_input_plugins; /* which input plugins should be considered for checking throughput */ + char *hc_throughput_output_plugins;/* which output plugins should be considered for checking throughput */ + double hc_throughput_ratio_threshold; /* output/input ratio threshold to consider a failure */ + int hc_throughput_min_failures; /* minimum amount of failures to cause error condition */ #endif /* @@ -317,6 +322,11 @@ enum conf_type { #define FLB_CONF_STR_HC_ERRORS_COUNT "HC_Errors_Count" #define FLB_CONF_STR_HC_RETRIES_FAILURE_COUNT "HC_Retry_Failure_Count" #define FLB_CONF_STR_HC_PERIOD "HC_Period" +#define FLB_CONF_STR_HC_THROUGHPUT "HC_Throughput" +#define FLB_CONF_STR_HC_THROUGHPUT_IN_PLUGINS "HC_Throughput_Input_Plugins" +#define FLB_CONF_STR_HC_THROUGHPUT_OUT_PLUGINS "HC_Throughput_Output_Plugins" +#define FLB_CONF_STR_HC_THROUGHPUT_RATIO_THRESHOLD "HC_Throughput_Ratio_Threshold" +#define FLB_CONF_STR_HC_THROUGHPUT_MIN_FAILURES "HC_Throughput_Min_Failures" #endif /* !FLB_HAVE_HTTP_SERVER */ #ifdef FLB_HAVE_CHUNK_TRACE diff --git a/include/fluent-bit/flb_time.h b/include/fluent-bit/flb_time.h index cf672dcd64f..5555829db26 100644 --- a/include/fluent-bit/flb_time.h +++ b/include/fluent-bit/flb_time.h @@ -90,6 +90,7 @@ int flb_time_msleep(uint32_t ms); double flb_time_to_double(struct flb_time *tm); uint64_t flb_time_to_nanosec(struct flb_time *tm); uint64_t flb_time_to_millisec(struct flb_time *tm); +uint64_t flb_time_to_seconds(struct flb_time *tm); int flb_time_add(struct flb_time *base, struct flb_time *duration, struct flb_time *result); int flb_time_diff(struct flb_time *time1, diff --git a/src/flb_config.c b/src/flb_config.c index 5ef2e75ef83..8fa4243324b 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -110,6 +110,25 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_INT, offsetof(struct flb_config, health_check_period)}, + {FLB_CONF_STR_HC_THROUGHPUT, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, hc_throughput)}, + + {FLB_CONF_STR_HC_THROUGHPUT_IN_PLUGINS, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, hc_throughput_input_plugins)}, + + {FLB_CONF_STR_HC_THROUGHPUT_OUT_PLUGINS, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, hc_throughput_output_plugins)}, + + {FLB_CONF_STR_HC_THROUGHPUT_RATIO_THRESHOLD, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, hc_throughput_ratio_threshold)}, + + {FLB_CONF_STR_HC_THROUGHPUT_MIN_FAILURES, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, hc_throughput_min_failures)}, #endif /* DNS*/ {FLB_CONF_DNS_MODE, diff --git a/src/flb_time.c b/src/flb_time.c index 92482124857..450646b6fa3 100644 --- a/src/flb_time.c +++ b/src/flb_time.c @@ -101,6 +101,11 @@ uint64_t flb_time_to_millisec(struct flb_time *tm) return (((uint64_t)tm->tm.tv_sec * 1000L) + tm->tm.tv_nsec / 1000000L); } +uint64_t flb_time_to_seconds(struct flb_time *tm) +{ + return (uint64_t)tm->tm.tv_sec; +} + int flb_time_add(struct flb_time *base, struct flb_time *duration, struct flb_time *result) { if (base == NULL || duration == NULL|| result == NULL) { diff --git a/src/http_server/api/v1/health.c b/src/http_server/api/v1/health.c index 713d4b87749..0c2b5016fa5 100644 --- a/src/http_server/api/v1/health.c +++ b/src/http_server/api/v1/health.c @@ -24,11 +24,33 @@ #include #include #include +#include #include #include #include "health.h" +/* + * in/out records sample at a certain timestamp. + */ +struct flb_hs_throughput_sample { + uint64_t in_records; + uint64_t out_records; + uint64_t timestamp_seconds; + struct mk_list _head; +}; + +struct { + int enabled; + struct mk_list *input_plugins; + struct mk_list *output_plugins; + double out_in_ratio_threshold; + int min_failures; + + struct mk_list *sample_list; + bool healthy; +} throughput_check_state = {0}; + struct flb_health_check_metrics_counter *metrics_counter; pthread_key_t hs_health_key; @@ -88,6 +110,25 @@ static void counter_init(struct flb_hs *hs) { } +static bool contains_str(struct mk_list *items, msgpack_object_str name) +{ + struct mk_list *head; + struct flb_split_entry *entry; + + if (!items) { + return false; + } + + mk_list_foreach(head, items) { + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (!strncmp(name.ptr, entry->value, name.size)) { + return true; + } + } + + return false; +} + /* * tell what's the current status for health check * One default background is that the metrics received and saved into @@ -113,7 +154,7 @@ static int is_healthy() { } if (mk_list_is_empty(metrics_list) == 0) { - return FLB_TRUE; + return FLB_TRUE && throughput_check_state.healthy; } /* Get the error metrics entry from the start time of current period */ @@ -141,12 +182,18 @@ static int is_healthy() { return FLB_FALSE; } - return FLB_TRUE; + return FLB_TRUE && throughput_check_state.healthy; } /* read the metrics from message queue and update the counter*/ -static void read_metrics(void *data, size_t size, int* error_count, - int* retry_failure_count) +static void read_metrics(void *data, + size_t size, + struct mk_list *input_plugins, + struct mk_list *output_plugins, + int* error_count, + int* retry_failure_count, + uint64_t *input_records, + uint64_t *output_records) { int i; int j; @@ -156,6 +203,8 @@ static void read_metrics(void *data, size_t size, int* error_count, size_t off = 0; int errors = 0; int retry_failure = 0; + uint64_t in_recs = 0; + uint64_t out_recs = 0; msgpack_unpacked_init(&result); msgpack_unpack_next(&result, data, size, &off); @@ -168,16 +217,14 @@ static void read_metrics(void *data, size_t size, int* error_count, /* Keys: input, output */ k = map.via.map.ptr[i].key; v = map.via.map.ptr[i].val; - if (k.via.str.size != sizeof("output") - 1 || - strncmp(k.via.str.ptr, "output", k.via.str.size) != 0) { - continue; - } /* Iterate sub-map */ for (j = 0; j < v.via.map.size; j++) { + msgpack_object sk; msgpack_object sv; /* Keys: plugin name , values: metrics */ + sk = v.via.map.ptr[j].key; sv = v.via.map.ptr[j].val; for (m = 0; m < sv.via.map.size; m++) { @@ -187,14 +234,23 @@ static void read_metrics(void *data, size_t size, int* error_count, mk = sv.via.map.ptr[m].key; mv = sv.via.map.ptr[m].val; - if (mk.via.str.size == sizeof("errors") - 1 && - strncmp(mk.via.str.ptr, "errors", mk.via.str.size) == 0) { - errors += mv.via.u64; + if (!strncmp(k.via.str.ptr, "output", k.via.str.size)) { + if (!strncmp(mk.via.str.ptr, "errors", mk.via.str.size)) { + errors += mv.via.u64; + } + else if (!strncmp(mk.via.str.ptr, "retries_failed", mk.via.str.size)) { + retry_failure += mv.via.u64; + } + else if (!strncmp(mk.via.str.ptr, "proc_records", mk.via.str.size) && + contains_str(output_plugins, sk.via.str)) { + out_recs += mv.via.u64; + } } - else if (mk.via.str.size == sizeof("retries_failed") - 1 && - strncmp(mk.via.str.ptr, "retries_failed", - mk.via.str.size) == 0) { - retry_failure += mv.via.u64; + + if (!strncmp(k.via.str.ptr, "input", k.via.str.size) && + !strncmp(mk.via.str.ptr, "records", mk.via.str.size) && + contains_str(input_plugins, sk.via.str)) { + in_recs += mv.via.u64; } } } @@ -202,6 +258,8 @@ static void read_metrics(void *data, size_t size, int* error_count, *error_count = errors; *retry_failure_count = retry_failure; + *input_records = in_recs; + *output_records = out_recs; msgpack_unpacked_destroy(&result); } @@ -245,6 +303,109 @@ static int cleanup_metrics() return c; } +static int check_throughput_health(uint64_t in_records, + uint64_t out_records, + struct mk_list *sample_list, + int sample_count, + double out_in_ratio_threshold) { + struct flb_time tp; + uint64_t timestamp_seconds; + uint64_t in_rate; + uint64_t out_rate; + struct mk_list *tmp; + struct mk_list *head; + double out_in_ratio; + struct flb_hs_throughput_sample *entry; + struct flb_hs_throughput_sample *prev; + struct flb_hs_throughput_sample *sample; + struct flb_hs_throughput_sample *last_sample = NULL; + int count; + bool healthy; + bool rv; + + flb_time_get(&tp); + timestamp_seconds = flb_time_to_seconds(&tp); + + if (mk_list_is_empty(sample_list) != 0) { + last_sample = mk_list_entry_last(sample_list, + struct flb_hs_throughput_sample, + _head); + } + + if (!last_sample || + in_records != last_sample->in_records || + out_records != last_sample->out_records) { + + sample = flb_malloc(sizeof(struct flb_hs_throughput_sample)); + + if (sample) { + sample->timestamp_seconds = timestamp_seconds; + sample->in_records = in_records; + sample->out_records = out_records; + mk_list_add(&sample->_head, sample_list); + } else { + flb_error("[api/v1/health/throughput]: failed to allocate sample"); + } + + } else { + /* don't collect another sample unless either in_records or out_records have + * changed since last check */ + flb_debug("[api/v1/health/throughput]: no changes since last check"); + } + + flb_debug("[api/v1/health/throughput]: check samples start %d %f", + sample_count, + out_in_ratio_threshold); + + healthy = false; + mk_list_foreach_safe_r(head, tmp, sample_list) { + entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head); + if (entry == mk_list_entry_first(sample_list, + struct flb_hs_throughput_sample, + _head)) { + break; + } + + prev = mk_list_entry(entry->_head.prev, + struct flb_hs_throughput_sample, + _head); + in_rate = (entry->in_records - prev->in_records) / + (entry->timestamp_seconds - prev->timestamp_seconds); + out_rate = (entry->out_records - prev->out_records) / + (entry->timestamp_seconds - prev->timestamp_seconds); + out_in_ratio = (double)out_rate / (double)in_rate; + healthy = healthy || out_in_ratio > out_in_ratio_threshold; + + flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f\n", + out_in_ratio, + out_rate, + in_rate); + + if (healthy) { + break; + } + } + + count = 0; + mk_list_foreach_safe_r(head, tmp, sample_list) { + entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head); + if (count == sample_count) { + mk_list_del(&entry->_head); + flb_free(entry); + } + else { + count++; + } + } + + rv = count < sample_count || healthy; + flb_debug("checking throughput samples stop, result: %s", + rv ? "healthy" :"unhealthy"); + + return rv; +} + + /* * Callback invoked every time some metrics are received through a * message queue channel. This function runs in a Monkey HTTP thread @@ -263,6 +424,8 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) struct mk_list *metrics_list = NULL; int error_count = 0; int retry_failure_count = 0; + uint64_t input_records = 0; + uint64_t output_records = 0; metrics_list = pthread_getspecific(hs_health_key); @@ -286,7 +449,24 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) buf->users = 0; - read_metrics(data, size, &error_count, &retry_failure_count); + read_metrics(data, + size, + throughput_check_state.input_plugins, + throughput_check_state.output_plugins, + &error_count, + &retry_failure_count, + &input_records, + &output_records); + + + if (throughput_check_state.enabled) { + throughput_check_state.healthy = + check_throughput_health(input_records, + output_records, + throughput_check_state.sample_list, + throughput_check_state.min_failures, + throughput_check_state.out_in_ratio_threshold); + } metrics_counter->error_counter = error_count; metrics_counter->retry_failure_counter = retry_failure_count; @@ -314,9 +494,79 @@ static void cb_health(mk_request_t *request, void *data) } } +static void configure_throughput_check(struct flb_config *config) +{ + bool enabled = config->hc_throughput; + + throughput_check_state.enabled = false; + throughput_check_state.healthy = true; + + if (!enabled) { + return; + } + + if (!config->hc_throughput_input_plugins) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_IN_PLUGINS " is required"); + return; + } + if (!config->hc_throughput_output_plugins) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_OUT_PLUGINS " is required"); + return; + } + if (!config->hc_throughput_ratio_threshold) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_RATIO_THRESHOLD " is required"); + return; + } + if (!config->hc_throughput_min_failures) { + flb_warn("[api/v1/health/throughput]: " FLB_CONF_STR_HC_THROUGHPUT_MIN_FAILURES " is required"); + return; + } + + throughput_check_state.sample_list = flb_malloc(sizeof(struct mk_list)); + if (!throughput_check_state.sample_list) { + flb_errno(); + return; + } + mk_list_init(throughput_check_state.sample_list); + + throughput_check_state.input_plugins = + flb_utils_split(config->hc_throughput_input_plugins, ',', 0); + + if (!throughput_check_state.input_plugins) { + flb_free(throughput_check_state.sample_list); + flb_errno(); + return; + } + + throughput_check_state.output_plugins = + flb_utils_split(config->hc_throughput_output_plugins, ',', 0); + + if (!throughput_check_state.output_plugins) { + flb_free(throughput_check_state.sample_list); + flb_free(throughput_check_state.input_plugins); + flb_errno(); + return; + } + + throughput_check_state.out_in_ratio_threshold = config->hc_throughput_ratio_threshold; + throughput_check_state.min_failures = config->hc_throughput_min_failures; + throughput_check_state.enabled = true; + + flb_info("[api/v1/health/throughput]: configuration complete. " + "input plugins: %s | " + "output plugins: %s | " + "ratio threshold: %f | " + "min failures: %d", + config->hc_throughput_input_plugins, + config->hc_throughput_output_plugins, + config->hc_throughput_ratio_threshold, + config->hc_throughput_min_failures); +} + /* Perform registration */ int api_v1_health(struct flb_hs *hs) { + configure_throughput_check(hs->config); pthread_key_create(&hs_health_key, hs_health_key_destroy); From 28360dd6330899a3a3a4b3f9b87e4d1f4e715dd5 Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Fri, 22 Jul 2022 11:39:31 -0300 Subject: [PATCH 2/3] http_server/health: Implement throughput health check using ring buffer Use a ring buffer for storing samples as per Leonardo's suggestion. Signed-off-by: Thiago Padilha --- src/http_server/api/v1/health.c | 153 ++++++++++++++++---------------- 1 file changed, 75 insertions(+), 78 deletions(-) diff --git a/src/http_server/api/v1/health.c b/src/http_server/api/v1/health.c index 0c2b5016fa5..46198578267 100644 --- a/src/http_server/api/v1/health.c +++ b/src/http_server/api/v1/health.c @@ -37,17 +37,57 @@ struct flb_hs_throughput_sample { uint64_t in_records; uint64_t out_records; uint64_t timestamp_seconds; - struct mk_list _head; }; +/* ring buffer + helper functions for storing samples */ +struct flb_hs_throughput_samples { + struct flb_hs_throughput_sample *items; + int size; + int count; + int insert; +}; + +static struct flb_hs_throughput_sample *samples_add( + struct flb_hs_throughput_samples *samples) +{ + struct flb_hs_throughput_sample *sample = samples->items + samples->insert; + samples->insert = (samples->insert + 1) % samples->size; + if (samples->count < samples->size) { + samples->count++; + } + return sample; +} + +static int samples_translate_index( + struct flb_hs_throughput_samples *samples, int index) +{ + if (index >= samples->count || index < 0) { + return -1; + } + int end_index = samples->insert; + int start_index = end_index - samples->count; + int modulo = (start_index + index) % samples->size; + return modulo < 0 ? modulo + samples->size : modulo; +} + +static struct flb_hs_throughput_sample *samples_get( + struct flb_hs_throughput_samples *samples, int index) +{ + int real_index = samples_translate_index(samples, index); + if (real_index < 0) { + return NULL; + } + + return samples->items + real_index; +} + struct { int enabled; struct mk_list *input_plugins; struct mk_list *output_plugins; double out_in_ratio_threshold; - int min_failures; - struct mk_list *sample_list; + struct flb_hs_throughput_samples samples; bool healthy; } throughput_check_state = {0}; @@ -305,78 +345,45 @@ static int cleanup_metrics() static int check_throughput_health(uint64_t in_records, uint64_t out_records, - struct mk_list *sample_list, - int sample_count, + struct flb_hs_throughput_samples *samples, double out_in_ratio_threshold) { + int i; struct flb_time tp; - uint64_t timestamp_seconds; uint64_t in_rate; uint64_t out_rate; - struct mk_list *tmp; - struct mk_list *head; double out_in_ratio; struct flb_hs_throughput_sample *entry; struct flb_hs_throughput_sample *prev; struct flb_hs_throughput_sample *sample; - struct flb_hs_throughput_sample *last_sample = NULL; - int count; bool healthy; bool rv; flb_time_get(&tp); - timestamp_seconds = flb_time_to_seconds(&tp); - - if (mk_list_is_empty(sample_list) != 0) { - last_sample = mk_list_entry_last(sample_list, - struct flb_hs_throughput_sample, - _head); - } - if (!last_sample || - in_records != last_sample->in_records || - out_records != last_sample->out_records) { - - sample = flb_malloc(sizeof(struct flb_hs_throughput_sample)); - - if (sample) { - sample->timestamp_seconds = timestamp_seconds; - sample->in_records = in_records; - sample->out_records = out_records; - mk_list_add(&sample->_head, sample_list); - } else { - flb_error("[api/v1/health/throughput]: failed to allocate sample"); - } - - } else { - /* don't collect another sample unless either in_records or out_records have - * changed since last check */ - flb_debug("[api/v1/health/throughput]: no changes since last check"); - } + sample = samples_add(samples); + sample->timestamp_seconds = flb_time_to_seconds(&tp); + sample->in_records = in_records; + sample->out_records = out_records; flb_debug("[api/v1/health/throughput]: check samples start %d %f", - sample_count, + samples->size, out_in_ratio_threshold); healthy = false; - mk_list_foreach_safe_r(head, tmp, sample_list) { - entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head); - if (entry == mk_list_entry_first(sample_list, - struct flb_hs_throughput_sample, - _head)) { - break; + for (i = samples->count - 1; i > 0; i--) { + entry = samples_get(samples, i); + prev = samples_get(samples, i - 1); + uint64_t timestamp_delta = entry->timestamp_seconds - prev->timestamp_seconds; + if (timestamp_delta == 0) { + /* check against divide by zero */ + continue; } - - prev = mk_list_entry(entry->_head.prev, - struct flb_hs_throughput_sample, - _head); - in_rate = (entry->in_records - prev->in_records) / - (entry->timestamp_seconds - prev->timestamp_seconds); - out_rate = (entry->out_records - prev->out_records) / - (entry->timestamp_seconds - prev->timestamp_seconds); + in_rate = (entry->in_records - prev->in_records) / timestamp_delta; + out_rate = (entry->out_records - prev->out_records) / timestamp_delta; out_in_ratio = (double)out_rate / (double)in_rate; healthy = healthy || out_in_ratio > out_in_ratio_threshold; - flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f\n", + flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f", out_in_ratio, out_rate, in_rate); @@ -386,19 +393,7 @@ static int check_throughput_health(uint64_t in_records, } } - count = 0; - mk_list_foreach_safe_r(head, tmp, sample_list) { - entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head); - if (count == sample_count) { - mk_list_del(&entry->_head); - flb_free(entry); - } - else { - count++; - } - } - - rv = count < sample_count || healthy; + rv = samples->count < samples->size || healthy; flb_debug("checking throughput samples stop, result: %s", rv ? "healthy" :"unhealthy"); @@ -463,8 +458,7 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) throughput_check_state.healthy = check_throughput_health(input_records, output_records, - throughput_check_state.sample_list, - throughput_check_state.min_failures, + &throughput_check_state.samples, throughput_check_state.out_in_ratio_threshold); } @@ -498,6 +492,7 @@ static void configure_throughput_check(struct flb_config *config) { bool enabled = config->hc_throughput; + memset(&throughput_check_state, 0, sizeof(throughput_check_state)); throughput_check_state.enabled = false; throughput_check_state.healthy = true; @@ -522,18 +517,10 @@ static void configure_throughput_check(struct flb_config *config) return; } - throughput_check_state.sample_list = flb_malloc(sizeof(struct mk_list)); - if (!throughput_check_state.sample_list) { - flb_errno(); - return; - } - mk_list_init(throughput_check_state.sample_list); - throughput_check_state.input_plugins = flb_utils_split(config->hc_throughput_input_plugins, ',', 0); if (!throughput_check_state.input_plugins) { - flb_free(throughput_check_state.sample_list); flb_errno(); return; } @@ -542,16 +529,26 @@ static void configure_throughput_check(struct flb_config *config) flb_utils_split(config->hc_throughput_output_plugins, ',', 0); if (!throughput_check_state.output_plugins) { - flb_free(throughput_check_state.sample_list); flb_free(throughput_check_state.input_plugins); flb_errno(); return; } throughput_check_state.out_in_ratio_threshold = config->hc_throughput_ratio_threshold; - throughput_check_state.min_failures = config->hc_throughput_min_failures; throughput_check_state.enabled = true; + throughput_check_state.samples.items = flb_calloc( + config->hc_throughput_min_failures, + sizeof(struct flb_hs_throughput_sample)); + + if (!throughput_check_state.samples.items) { + flb_free(throughput_check_state.input_plugins); + flb_free(throughput_check_state.output_plugins); + flb_errno(); + return; + } + throughput_check_state.samples.size = config->hc_throughput_min_failures; + flb_info("[api/v1/health/throughput]: configuration complete. " "input plugins: %s | " "output plugins: %s | " From c280b8dd6b8dc75c475abfc068c534eb0bd8650a Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Fri, 22 Jul 2022 16:10:46 -0300 Subject: [PATCH 3/3] http_server/health: Implement throughput health check Move struct definitions to header. Signed-off-by: Thiago Padilha --- src/http_server/api/v1/health.c | 34 +++++---------------------------- src/http_server/api/v1/health.h | 28 +++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/src/http_server/api/v1/health.c b/src/http_server/api/v1/health.c index 46198578267..5e403e989b1 100644 --- a/src/http_server/api/v1/health.c +++ b/src/http_server/api/v1/health.c @@ -30,22 +30,11 @@ #include "health.h" -/* - * in/out records sample at a certain timestamp. - */ -struct flb_hs_throughput_sample { - uint64_t in_records; - uint64_t out_records; - uint64_t timestamp_seconds; -}; - -/* ring buffer + helper functions for storing samples */ -struct flb_hs_throughput_samples { - struct flb_hs_throughput_sample *items; - int size; - int count; - int insert; -}; +struct flb_hs_throughput_state throughput_check_state = {0}; + +struct flb_health_check_metrics_counter *metrics_counter; + +pthread_key_t hs_health_key; static struct flb_hs_throughput_sample *samples_add( struct flb_hs_throughput_samples *samples) @@ -81,19 +70,6 @@ static struct flb_hs_throughput_sample *samples_get( return samples->items + real_index; } -struct { - int enabled; - struct mk_list *input_plugins; - struct mk_list *output_plugins; - double out_in_ratio_threshold; - - struct flb_hs_throughput_samples samples; - bool healthy; -} throughput_check_state = {0}; - -struct flb_health_check_metrics_counter *metrics_counter; - -pthread_key_t hs_health_key; static struct mk_list *hs_health_key_create() { diff --git a/src/http_server/api/v1/health.h b/src/http_server/api/v1/health.h index 27a826f4321..b6fe4b22c7e 100644 --- a/src/http_server/api/v1/health.h +++ b/src/http_server/api/v1/health.h @@ -65,6 +65,34 @@ struct flb_hs_hc_buf { struct mk_list _head; }; +/* + * in/out records sample at a certain timestamp. + */ +struct flb_hs_throughput_sample { + uint64_t in_records; + uint64_t out_records; + uint64_t timestamp_seconds; +}; + +/* ring buffer + helper functions for storing samples */ +struct flb_hs_throughput_samples { + struct flb_hs_throughput_sample *items; + int size; + int count; + int insert; +}; + +/* throughput health check state */ +struct flb_hs_throughput_state { + int enabled; + struct mk_list *input_plugins; + struct mk_list *output_plugins; + double out_in_ratio_threshold; + + struct flb_hs_throughput_samples samples; + int healthy; +}; + /* health endpoint*/ int api_v1_health(struct flb_hs *hs);