11# frozen_string_literal: true
22require 'fluent/plugin/filter'
3+ require 'fluent/plugin/prometheus'
34
45module Fluent ::Plugin
56 class ThrottleFilter < Filter
67 Fluent ::Plugin . register_filter ( 'throttle' , self )
8+ include Fluent ::Plugin ::Prometheus
79
810 desc "Used to group logs. Groups are rate limited independently"
911 config_param :group_key , :array , :default => [ 'kubernetes.container_name' ]
@@ -41,7 +43,13 @@ class ThrottleFilter < Filter
4143 :aprox_rate ,
4244 :bucket_count ,
4345 :bucket_last_reset ,
44- :last_warning )
46+ :last_warning ,
47+ :rate_count_last_exceeded )
48+
49+ def initialize
50+ super
51+ @registry = ::Prometheus ::Client . registry
52+ end
4553
4654 def configure ( conf )
4755 super
@@ -68,12 +76,15 @@ def configure(conf)
6876
6977 raise "group_warning_delay_s must be >= 1" \
7078 unless @group_warning_delay_s >= 1
79+
80+ @base_labels = { }
7181 end
7282
7383 def start
7484 super
7585
7686 @counters = { }
87+ @metrics = { throttle_rate_limit_exceeded : get_counter ( :fluentd_throttle_rate_limit_exceeded , "The exceeded rate of pods in the group" ) }
7788 end
7889
7990 def shutdown
@@ -85,17 +96,18 @@ def filter(tag, time, record)
8596 now = Time . now
8697 rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
8798 group = extract_group ( record )
88-
89- # Ruby hashes are ordered by insertion.
99+
100+ # Ruby hashes are ordered by insertion.
90101 # Deleting and inserting moves the item to the end of the hash (most recently used)
91- counter = @counters [ group ] = @counters . delete ( group ) || Group . new ( 0 , now , 0 , 0 , now , nil )
102+ counter = @counters [ group ] = @counters . delete ( group ) || Group . new ( 0 , now , 0 , 0 , now , nil , 0 )
92103
93104 counter . rate_count += 1
94105 since_last_rate_reset = now - counter . rate_last_reset
95106 if since_last_rate_reset >= 1
96107 # compute and store rate/s at most every second
97108 counter . aprox_rate = ( counter . rate_count / since_last_rate_reset ) . round ( )
98109 counter . rate_count = 0
110+ counter . rate_count_last_exceeded = 0
99111 counter . rate_last_reset = now
100112 end
101113
@@ -151,6 +163,10 @@ def extract_group(record)
151163 end
152164
153165 def log_rate_limit_exceeded ( now , group , counter )
166+ metric = @metrics [ :throttle_rate_limit_exceeded ]
167+ log . debug ( "current rate" , counter . rate_count , "current metric" , metric . get ( labels : @base_labels . merge ( podname : group ) ) )
168+ metric . increment ( by : counter . rate_count - counter . rate_count_last_exceeded , labels : @base_labels . merge ( podname : group ) )
169+ counter . rate_count_last_exceeded = counter . rate_count
154170 emit = counter . last_warning == nil ? true \
155171 : ( now - counter . last_warning ) >= @group_warning_delay_s
156172 if emit
@@ -176,5 +192,13 @@ def log_items(now, group, counter)
176192 'rate_limit_s' : @group_rate_limit ,
177193 'reset_rate_s' : @group_reset_rate_s }
178194 end
195+
196+ def get_counter ( name , docstring )
197+ if @registry . exist? ( name )
198+ @registry . get ( name )
199+ else
200+ @registry . counter ( name , docstring : docstring , labels : @base_labels . keys + [ :podname ] )
201+ end
202+ end
179203 end
180204end
0 commit comments