From cf25d050afa810aaa8d1ff3c91f244320d2fe7da Mon Sep 17 00:00:00 2001 From: wangzhuzhen Date: Mon, 25 Apr 2022 17:28:05 +0800 Subject: [PATCH] filter_throttle: Support a feature that do not drop messages if rate limit is exceeded Signed-off-by: wangzhuzhen --- plugins/filter_throttle/throttle.c | 31 ++++++++++++++++++++++++++---- plugins/filter_throttle/throttle.h | 4 +++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/plugins/filter_throttle/throttle.c b/plugins/filter_throttle/throttle.c index 6abe59c0934..159d0ddb1d9 100644 --- a/plugins/filter_throttle/throttle.c +++ b/plugins/filter_throttle/throttle.c @@ -81,10 +81,10 @@ void *time_ticker(void *args) if (t->ctx->print_status) { flb_plg_info(ctx->ins, - "%ld: limit is %0.2f per %s with window size of %i, " + "%ld: limit is %0.2f per %s with window size of %i, retain is %d, " "current rate is: %i per interval", timestamp, t->ctx->max_rate, t->ctx->slide_interval, - t->ctx->window_size, + t->ctx->window_size, t->ctx->retain_data, t->ctx->hash->total / t->ctx->hash->size); } sleep(t->seconds); @@ -105,6 +105,15 @@ static inline int throttle_data(struct flb_filter_throttle_ctx *ctx) return THROTTLE_RET_KEEP; } +static inline bool get_retain(struct flb_filter_throttle_ctx *ctx) +{ + if (ctx->retain_data) { + return true; + } + + return false; +} + static int configure(struct flb_filter_throttle_ctx *ctx, struct flb_filter_instance *f_ins) { int ret; @@ -200,6 +209,8 @@ static int cb_throttle_filter(const void *data, size_t bytes, struct flb_config *config) { int ret; + /* Do not drop some messages if rate limit is exceeded */ + bool retain = false; int old_size = 0; int new_size = 0; msgpack_unpacked result; @@ -227,12 +238,18 @@ static int cb_throttle_filter(const void *data, size_t bytes, old_size++; ret = throttle_data(context); + retain = get_retain(context); if (ret == THROTTLE_RET_KEEP) { msgpack_pack_object(&tmp_pck, root); new_size++; } else if (ret == THROTTLE_RET_DROP) { - /* Do nothing */ + /* If Retain is false, Do nothing */ + if (retain) { + usleep(10 * 1000); + msgpack_pack_object(&tmp_pck, root); + new_size++; + } } } msgpack_unpacked_destroy(&result); @@ -265,6 +282,7 @@ static struct flb_config_map config_map[] = { // rate // window // print_status + // retain // interval { FLB_CONFIG_MAP_DOUBLE, "rate", THROTTLE_DEFAULT_RATE, @@ -281,6 +299,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, print_status), "Set whether or not to print status information" }, + { + FLB_CONFIG_MAP_BOOL, "retain", THROTTLE_DEFAULT_RETAIN, + 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, retain_data), + "Set whether or not to drop some messages if rate limit is exceeded" + }, { FLB_CONFIG_MAP_STR, "interval", THROTTLE_DEFAULT_INTERVAL, 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, slide_interval), @@ -298,4 +321,4 @@ struct flb_filter_plugin filter_throttle_plugin = { .cb_exit = cb_throttle_exit, .config_map = config_map, .flags = 0 -}; +}; \ No newline at end of file diff --git a/plugins/filter_throttle/throttle.h b/plugins/filter_throttle/throttle.h index c3a570cf724..6a862d1c90a 100644 --- a/plugins/filter_throttle/throttle.h +++ b/plugins/filter_throttle/throttle.h @@ -32,12 +32,14 @@ #define THROTTLE_DEFAULT_WINDOW "5" #define THROTTLE_DEFAULT_INTERVAL "1" #define THROTTLE_DEFAULT_STATUS "false" +#define THROTTLE_DEFAULT_RETAIN "false" struct flb_filter_throttle_ctx { double max_rate; unsigned int window_size; const char *slide_interval; int print_status; + int retain_data; /* internal */ struct throttle_window *hash; @@ -50,4 +52,4 @@ struct ticker { double seconds; }; -#endif +#endif \ No newline at end of file