From b62db57d48c14985e7b83d610d3a2de80c3bbcbc Mon Sep 17 00:00:00 2001 From: Jason Martens Date: Mon, 13 Jul 2020 16:46:23 -0700 Subject: [PATCH 1/2] Add individually configurable throttle groups Added a group_override config parameter which allows setting different throttle limits per group. --- README.md | 15 +++ lib/fluent/plugin/filter_throttle.rb | 120 ++++++++++++++------- test/fluent/plugin/filter_throttle_test.rb | 38 +++++++ 3 files changed, 137 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index ecf2167..8bbecc2 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,21 @@ When a group reaches its limit and as long as it is not reset, a warning message with the current log rate of the group is emitted repeatedly. This is the delay between every repetition. +#### group\_override + +Allows overriding the defaults per group. To use this value, configure a JSON +object with the group name value as the object name, and the above group_* +parameters as entries. For example: +``` +group_override {"group_bucket_1": { + "group_bucket_period_s": 1, + "group_bucket_limit": 7, + "group_drop_logs": true + }} +``` +This will configure an override for a group name of `group_bucket_1` with +different throttling limits than others. + ## License Apache License, Version 2.0 diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index 3defe8d..a69f758 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -35,39 +35,86 @@ class ThrottleFilter < Filter DESC config_param :group_warning_delay_s, :integer, :default => 10 + desc <<~DESC + Override the default rate limit for a specific group. Example hash: + {"group_key1_value,group_key2_value": { # comma separated if multiple group_key values are given + "group_bucket_period_s": 60, # Remaining values match the default value names + "group_bucket_limit": 10000, + "group_drop_logs": true} + } + DESC + config_param :group_override, :hash, :default => {} + + BucketConfig = Struct.new( + :period_s, + :limit, + :drop_logs, + :rate_limit, + :gc_timeout_s, + :reset_rate_s, + :warning_delay_s) + + Group = Struct.new( :rate_count, :rate_last_reset, :aprox_rate, :bucket_count, :bucket_last_reset, - :last_warning) + :last_warning, + :config) + + def create_override_bucket_configs(config_hash) + config_hash.each do |group_key, config| + group_key_value = group_key.split(',') + period = config.fetch("group_bucket_period_s", @group_bucket_period_s) + limit = config.fetch("group_bucket_limit", @group_bucket_limit) + drop_logs = config.fetch("group_drop_logs", @group_drop_logs) + rate_limit = (limit / period) + gc_timeout_s = 2 * period + reset_rate_s = config.fetch("group_reset_rate_s", rate_limit) + warning_delay_s = config.fetch("group_warning_delay_s", @group_warning_delay_s) + b = BucketConfig.new(period, limit, drop_logs, rate_limit, gc_timeout_s, reset_rate_s, warning_delay_s) + @bucket_configs[group_key_value] = b + end + end + + def validate_bucket(n, b) + raise "#{n} period_s must be > 0" unless b.period_s > 0 + raise "#{n} limit must be > 0" unless b.limit > 0 + raise "#{n} reset_rate_s must be > -1" unless b.reset_rate_s >= -1 + raise "#{n} reset_rate_s must be > limit \\ period_s" unless b.reset_rate_s <= b.rate_limit + raise "#{n} warning_delay_s must be >= 1" unless b.warning_delay_s >= 1 + end def configure(conf) super - @group_key_paths = group_key.map { |key| key.split(".") } - - raise "group_bucket_period_s must be > 0" \ - unless @group_bucket_period_s > 0 - - @group_gc_timeout_s = 2 * @group_bucket_period_s - - raise "group_bucket_limit must be > 0" \ - unless @group_bucket_limit > 0 - - @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s) - - @group_reset_rate_s = @group_rate_limit \ - if @group_reset_rate_s == nil + # Set up default bucket & calculate derived values + default_rate_limit = (@group_bucket_limit / @group_bucket_period_s) + default_reset_rate_s = @group_reset_rate_s.nil? ? default_rate_limit : @group_reset_rate_s + default_gc_timeout_s = 2 * @group_bucket_period_s + default_bucket_config = BucketConfig.new( + @group_bucket_period_s, + @group_bucket_limit, + @group_drop_logs, + default_rate_limit, + default_gc_timeout_s, + default_reset_rate_s, + @group_warning_delay_s) + + validate_bucket("default", default_bucket_config) + @bucket_configs = Hash.new(default_bucket_config) + # Parse override configs and add to bucket_configs + create_override_bucket_configs(@group_override) + + # Make sure the config for each bucket are valid + @bucket_configs.each do |key_path, config| + validate_bucket(key_path, config) + end - raise "group_reset_rate_s must be >= -1" \ - unless @group_reset_rate_s >= -1 - raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \ - unless @group_reset_rate_s <= @group_rate_limit - raise "group_warning_delay_s must be >= 1" \ - unless @group_warning_delay_s >= 1 + @group_key_paths = group_key.map { |key| key.split(".") } end def start @@ -83,12 +130,13 @@ def shutdown def filter(tag, time, record) now = Time.now - rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record group = extract_group(record) - - # Ruby hashes are ordered by insertion. + bucket_config = @bucket_configs[group] + rate_limit_exceeded = bucket_config.drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record + + # Ruby hashes are ordered by insertion. # Deleting and inserting moves the item to the end of the hash (most recently used) - counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil) + counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil, bucket_config) counter.rate_count += 1 since_last_rate_reset = now - counter.rate_last_reset @@ -101,17 +149,17 @@ def filter(tag, time, record) # try to evict the least recently used group lru_group, lru_counter = @counters.first - if !lru_group.nil? && now - lru_counter.rate_last_reset > @group_gc_timeout_s + if !lru_group.nil? && now - lru_counter.rate_last_reset > counter.config.gc_timeout_s @counters.delete(lru_group) end - if (now.to_i / @group_bucket_period_s) \ - > (counter.bucket_last_reset.to_i / @group_bucket_period_s) + if (now.to_i / counter.config.period_s) \ + > (counter.bucket_last_reset.to_i / counter.config.period_s) # next time period reached. # wait until rate drops back down (if enabled). - if counter.bucket_count == -1 and @group_reset_rate_s != -1 - if counter.aprox_rate < @group_reset_rate_s + if counter.bucket_count == -1 and counter.config.reset_rate_s != -1 + if counter.aprox_rate < counter.config.reset_rate_s log_rate_back_down(now, group, counter) else log_rate_limit_exceeded(now, group, counter) @@ -133,7 +181,7 @@ def filter(tag, time, record) counter.bucket_count += 1 # if we are out of credit, we drop logs for the rest of the time period. - if counter.bucket_count > @group_bucket_limit + if counter.bucket_count > counter.config.limit log_rate_limit_exceeded(now, group, counter) counter.bucket_count = -1 return rate_limit_exceeded @@ -152,7 +200,7 @@ def extract_group(record) def log_rate_limit_exceeded(now, group, counter) emit = counter.last_warning == nil ? true \ - : (now - counter.last_warning) >= @group_warning_delay_s + : (now - counter.last_warning) >= counter.config.warning_delay_s if emit log.warn("rate exceeded", log_items(now, group, counter)) counter.last_warning = now @@ -171,10 +219,10 @@ def log_items(now, group, counter) {'group_key': group, 'rate_s': rate, - 'period_s': @group_bucket_period_s, - 'limit': @group_bucket_limit, - 'rate_limit_s': @group_rate_limit, - 'reset_rate_s': @group_reset_rate_s} + 'period_s': counter.config.period_s, + 'limit': counter.config.limit, + 'rate_limit_s': counter.config.rate_limit, + 'reset_rate_s': counter.config.reset_rate_s} end end end diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 188051f..1bb39fe 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -61,6 +61,44 @@ def create_driver(conf='') assert_equal(5, groups["b"].size) end + it 'rejects override configurations with invalid values' do + assert_raises { create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 5 + group_override {"group_bucket_1":{ + "group_bucket_period_s": -1, + "group_bucket_limit": 7, + "group_drop_logs": true + }} + CONF + } + end + + it 'throttles with different rates in override configs' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 5 + group_override {"group_bucket_1":{ + "group_bucket_period_s": 1, + "group_bucket_limit": 7, + "group_drop_logs": true + }} + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test", "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test", "group": "group_bucket_1"}]] * 10) + end + + groups = driver.filtered_records.group_by { |r| r[:group] } + assert_equal(5, groups["a"].size) + assert_equal(5, groups["b"].size) + assert_equal(7, groups["group_bucket_1"].size) + end + it 'allows composite group keys' do driver = create_driver <<~CONF group_key "group1,group2" From 636bcd272ac3e7d6a42511e941359e656d0e8e06 Mon Sep 17 00:00:00 2001 From: Jason Martens Date: Tue, 14 Jul 2020 15:56:23 -0700 Subject: [PATCH 2/2] Add test coverage for lru group removal --- test/fluent/plugin/filter_throttle_test.rb | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 1bb39fe..d6640a2 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -155,6 +155,22 @@ def create_driver(conf='') ], messages_per_minute end + it 'removes lru groups after 2*period' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 2 + group_bucket_limit 6 + group_reset_rate_s 2 + CONF + + driver.run(default_tag: "test") do + Time.stubs(now: Time.at(1)) + driver.feed([[event_time, {"msg": "test", "group": "a"}]] * 2) + Time.stubs(now: Time.at(10)) + driver.feed([[event_time, {"msg": "test", "group": "b"}]] * 2) + end + # TODO: Figure out how to assert the group was removed from the private variable + end it 'does not throttle when in log only mode' do driver = create_driver <<~CONF