diff --git a/plugins/filter_throttle/throttle.c b/plugins/filter_throttle/throttle.c index a7284dbf1fd..8f4211667ce 100644 --- a/plugins/filter_throttle/throttle.c +++ b/plugins/filter_throttle/throttle.c @@ -83,11 +83,11 @@ void *time_ticker(void *args) if (ctx->print_status) { flb_plg_info(ctx->ins, - "%ld: limit is %0.2f per %s with window size of %i, " + "%ld: limit is %0.2f per %s with window size of %i, retain is %d, " "current rate is: %i per interval", - timestamp, ctx->max_rate, ctx->slide_interval, - ctx->window_size, - ctx->hash->total / ctx->hash->size); + timestamp, t->ctx->max_rate, t->ctx->slide_interval, + t->ctx->window_size, t->ctx->retain_data, + t->ctx->hash->total / t->ctx->hash->size); } pthread_mutex_unlock(&throttle_mut); /* sleep is a cancelable function */ @@ -107,6 +107,15 @@ static inline int throttle_data(struct flb_filter_throttle_ctx *ctx) return THROTTLE_RET_KEEP; } +static inline bool get_retain(struct flb_filter_throttle_ctx *ctx) +{ + if (ctx->retain_data) { + return true; + } + + return false; +} + static int configure(struct flb_filter_throttle_ctx *ctx, struct flb_filter_instance *f_ins) { int ret; @@ -193,6 +202,8 @@ static int cb_throttle_filter(const void *data, size_t bytes, struct flb_config *config) { int ret; + /* Do not drop some messages if rate limit is exceeded */ + bool retain = false; int old_size = 0; int new_size = 0; msgpack_unpacked result; @@ -220,13 +231,19 @@ static int cb_throttle_filter(const void *data, size_t bytes, old_size++; pthread_mutex_lock(&throttle_mut); ret = throttle_data(context); + retain = get_retain(context); pthread_mutex_unlock(&throttle_mut); if (ret == THROTTLE_RET_KEEP) { msgpack_pack_object(&tmp_pck, root); new_size++; } else if (ret == THROTTLE_RET_DROP) { - /* Do nothing */ + /* If Retain is false, Do nothing */ + if (retain) { + usleep(10 * 1000); + msgpack_pack_object(&tmp_pck, root); + new_size++; + } } } msgpack_unpacked_destroy(&result); @@ -275,6 +292,7 @@ static struct flb_config_map config_map[] = { // rate // window // print_status + // retain // interval { FLB_CONFIG_MAP_DOUBLE, "rate", THROTTLE_DEFAULT_RATE, @@ -291,6 +309,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, print_status), "Set whether or not to print status information" }, + { + FLB_CONFIG_MAP_BOOL, "retain", THROTTLE_DEFAULT_RETAIN, + 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, retain_data), + "Set whether or not to drop some messages if rate limit is exceeded" + }, { FLB_CONFIG_MAP_STR, "interval", THROTTLE_DEFAULT_INTERVAL, 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, slide_interval), @@ -308,4 +331,4 @@ struct flb_filter_plugin filter_throttle_plugin = { .cb_exit = cb_throttle_exit, .config_map = config_map, .flags = 0 -}; +}; \ No newline at end of file diff --git a/plugins/filter_throttle/throttle.h b/plugins/filter_throttle/throttle.h index 30ca318c1da..d2cf255f467 100644 --- a/plugins/filter_throttle/throttle.h +++ b/plugins/filter_throttle/throttle.h @@ -33,6 +33,7 @@ #define THROTTLE_DEFAULT_WINDOW "5" #define THROTTLE_DEFAULT_INTERVAL "1" #define THROTTLE_DEFAULT_STATUS "false" +#define THROTTLE_DEFAULT_RETAIN "false" struct ticker { pthread_t thr; @@ -44,6 +45,7 @@ struct flb_filter_throttle_ctx { unsigned int window_size; const char *slide_interval; int print_status; + int retain_data; /* internal */ struct throttle_window *hash; @@ -53,4 +55,4 @@ struct flb_filter_throttle_ctx { -#endif +#endif \ No newline at end of file