Skip to content

Commit 837efb0

Browse files
committed
filter_throttle: Support a feature that do not drop messages if rate limit is exceeded
1 parent 9ca777d commit 837efb0

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

plugins/filter_throttle/throttle.c

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ void *time_ticker(void *args)
8181

8282
if (t->ctx->print_status) {
8383
flb_plg_info(ctx->ins,
84-
"%ld: limit is %0.2f per %s with window size of %i, "
84+
"%ld: limit is %0.2f per %s with window size of %i, retain is %d, "
8585
"current rate is: %i per interval",
8686
timestamp, t->ctx->max_rate, t->ctx->slide_interval,
87-
t->ctx->window_size,
87+
t->ctx->window_size, t->ctx->retain_data,
8888
t->ctx->hash->total / t->ctx->hash->size);
8989
}
9090
sleep(t->seconds);
@@ -105,6 +105,15 @@ static inline int throttle_data(struct flb_filter_throttle_ctx *ctx)
105105
return THROTTLE_RET_KEEP;
106106
}
107107

108+
static inline bool get_retain(struct flb_filter_throttle_ctx *ctx)
109+
{
110+
if (ctx->retain_data) {
111+
return true;
112+
}
113+
114+
return false;
115+
}
116+
108117
static int configure(struct flb_filter_throttle_ctx *ctx, struct flb_filter_instance *f_ins)
109118
{
110119
int ret;
@@ -200,6 +209,8 @@ static int cb_throttle_filter(const void *data, size_t bytes,
200209
struct flb_config *config)
201210
{
202211
int ret;
212+
/* Do not drop some messages if rate limit is exceeded */
213+
bool retain = false;
203214
int old_size = 0;
204215
int new_size = 0;
205216
msgpack_unpacked result;
@@ -227,12 +238,18 @@ static int cb_throttle_filter(const void *data, size_t bytes,
227238
old_size++;
228239

229240
ret = throttle_data(context);
241+
retain = get_retain(context);
230242
if (ret == THROTTLE_RET_KEEP) {
231243
msgpack_pack_object(&tmp_pck, root);
232244
new_size++;
233245
}
234246
else if (ret == THROTTLE_RET_DROP) {
235-
/* Do nothing */
247+
/* If Retain is false, Do nothing */
248+
if (retain) {
249+
usleep(10 * 1000);
250+
msgpack_pack_object(&tmp_pck, root);
251+
new_size++;
252+
}
236253
}
237254
}
238255
msgpack_unpacked_destroy(&result);
@@ -265,6 +282,7 @@ static struct flb_config_map config_map[] = {
265282
// rate
266283
// window
267284
// print_status
285+
// retain
268286
// interval
269287
{
270288
FLB_CONFIG_MAP_DOUBLE, "rate", THROTTLE_DEFAULT_RATE,
@@ -281,6 +299,11 @@ static struct flb_config_map config_map[] = {
281299
0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, print_status),
282300
"Set whether or not to print status information"
283301
},
302+
{
303+
FLB_CONFIG_MAP_BOOL, "retain", THROTTLE_DEFAULT_RETAIN,
304+
0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, retain_data),
305+
"Set whether or not to drop some messages if rate limit is exceeded"
306+
},
284307
{
285308
FLB_CONFIG_MAP_STR, "interval", THROTTLE_DEFAULT_INTERVAL,
286309
0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, slide_interval),
@@ -298,4 +321,4 @@ struct flb_filter_plugin filter_throttle_plugin = {
298321
.cb_exit = cb_throttle_exit,
299322
.config_map = config_map,
300323
.flags = 0
301-
};
324+
};

plugins/filter_throttle/throttle.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
#define THROTTLE_DEFAULT_WINDOW "5"
3333
#define THROTTLE_DEFAULT_INTERVAL "1"
3434
#define THROTTLE_DEFAULT_STATUS "false"
35+
#define THROTTLE_DEFAULT_RETAIN "false"
3536

3637
struct flb_filter_throttle_ctx {
3738
double max_rate;
3839
unsigned int window_size;
3940
const char *slide_interval;
4041
int print_status;
42+
int retain_data;
4143

4244
/* internal */
4345
struct throttle_window *hash;
@@ -50,4 +52,4 @@ struct ticker {
5052
double seconds;
5153
};
5254

53-
#endif
55+
#endif

0 commit comments

Comments
 (0)