|
10 | 10 | # capacity is updated through update_settings |
11 | 11 | module SolarWindsAPM |
12 | 12 | class TokenBucket |
13 | | - # Maximum value of a signed 32-bit integer |
14 | | - MAX_INTERVAL = (2**31) - 1 |
15 | | - |
16 | | - attr_reader :capacity, :rate, :interval, :tokens, :type |
| 13 | + attr_reader :type |
17 | 14 |
|
18 | 15 | def initialize(token_bucket_settings) |
19 | | - self.capacity = token_bucket_settings.capacity || 0 |
20 | | - self.rate = token_bucket_settings.rate || 0 |
21 | | - self.interval = token_bucket_settings.interval || MAX_INTERVAL |
22 | | - self.tokens = @capacity |
| 16 | + @capacity = token_bucket_settings.capacity || 0 |
| 17 | + @rate = token_bucket_settings.rate || 0 |
| 18 | + @tokens = @capacity |
| 19 | + @last_update_time = Time.now.to_f |
23 | 20 | @type = token_bucket_settings.type |
24 | | - @timer = nil |
| 21 | + @lock = ::Mutex.new |
25 | 22 | end |
26 | 23 |
|
27 | | - # oboe sampler update_settings will update the token |
28 | | - # (thread safe as update_settings is guarded by mutex from oboe sampler) |
29 | | - def update(settings) |
30 | | - settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_token_bucket_settings(settings) |
| 24 | + def capacity |
| 25 | + @lock.synchronize { @capacity } |
31 | 26 | end |
32 | 27 |
|
33 | | - def update_from_hash(settings) |
34 | | - if settings[:capacity] |
35 | | - difference = settings[:capacity] - @capacity |
36 | | - self.capacity = settings[:capacity] |
37 | | - self.tokens = @tokens + difference |
38 | | - end |
39 | | - |
40 | | - self.rate = settings[:rate] if settings[:rate] |
41 | | - |
42 | | - return unless settings[:interval] |
43 | | - |
44 | | - self.interval = settings[:interval] |
45 | | - return unless running |
46 | | - |
47 | | - stop |
48 | | - start |
| 28 | + def rate |
| 29 | + @lock.synchronize { @rate } |
49 | 30 | end |
50 | 31 |
|
51 | | - def update_from_token_bucket_settings(settings) |
52 | | - if settings.capacity |
53 | | - difference = settings.capacity - @capacity |
54 | | - self.capacity = settings.capacity |
55 | | - self.tokens = @tokens + difference |
| 32 | + def tokens |
| 33 | + @lock.synchronize do |
| 34 | + calculate_tokens |
| 35 | + @tokens |
56 | 36 | end |
57 | | - |
58 | | - self.rate = settings.rate if settings.rate |
59 | | - |
60 | | - return unless settings.interval |
61 | | - |
62 | | - self.interval = settings.interval |
63 | | - return unless running |
64 | | - |
65 | | - stop |
66 | | - start |
67 | | - end |
68 | | - |
69 | | - def capacity=(capacity) |
70 | | - @capacity = [0, capacity].max |
71 | | - end |
72 | | - |
73 | | - def rate=(rate) |
74 | | - @rate = [0, rate].max |
75 | | - end |
76 | | - |
77 | | - # self.interval= sets the @interval and @sleep_interval |
78 | | - # @sleep_interval is used in the timer thread to sleep between replenishing the bucket |
79 | | - def interval=(interval) |
80 | | - @interval = interval.clamp(0, MAX_INTERVAL) |
81 | | - @sleep_interval = @interval / 1000.0 |
82 | 37 | end |
83 | 38 |
|
84 | | - def tokens=(tokens) |
85 | | - @tokens = tokens.clamp(0, @capacity) |
| 39 | + # oboe sampler update_settings will update the token |
| 40 | + def update(settings) |
| 41 | + settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_hash(tb_to_hash(settings)) |
86 | 42 | end |
87 | 43 |
|
88 | 44 | # Attempts to consume tokens from the bucket |
89 | | - # @param n [Integer] Number of tokens to consume |
| 45 | + # @param token [Integer] Number of tokens to consume |
90 | 46 | # @return [Boolean] Whether there were enough tokens |
91 | | - # TODO: we need to include thread-safety here since sampler is shared across threads |
92 | | - # and we may have multiple threads trying to consume tokens at the same time |
93 | 47 | def consume(token = 1) |
94 | | - if @tokens >= token |
95 | | - self.tokens = @tokens - token |
96 | | - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Consumed #{token} from total #{@tokens} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" } |
97 | | - true |
98 | | - else |
99 | | - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } |
100 | | - false |
101 | | - end |
102 | | - end |
103 | | - |
104 | | - # Starts replenishing the bucket |
105 | | - def start |
106 | | - return if running |
107 | | - |
108 | | - @timer = Thread.new do |
109 | | - loop do |
110 | | - task |
111 | | - sleep(@sleep_interval) |
| 48 | + @lock.synchronize do |
| 49 | + calculate_tokens |
| 50 | + if @tokens >= token |
| 51 | + @tokens -= token |
| 52 | + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Consumed #{token} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" } |
| 53 | + true |
| 54 | + else |
| 55 | + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } |
| 56 | + false |
112 | 57 | end |
113 | 58 | end |
114 | 59 | end |
115 | 60 |
|
116 | | - # Stops replenishing the bucket |
117 | | - def stop |
118 | | - return unless running |
| 61 | + private |
119 | 62 |
|
120 | | - @timer.kill |
121 | | - @timer = nil |
| 63 | + def calculate_tokens |
| 64 | + now = Time.now.to_f |
| 65 | + elapsed = now - @last_update_time |
| 66 | + @last_update_time = now |
| 67 | + @tokens += elapsed * @rate |
| 68 | + @tokens = [@tokens, @capacity].min |
122 | 69 | end |
123 | 70 |
|
124 | | - # Whether the bucket is actively being replenished |
125 | | - def running |
126 | | - !@timer.nil? |
127 | | - end |
| 71 | + # settings is from json sampler |
| 72 | + def update_from_hash(settings) |
| 73 | + @lock.synchronize do |
| 74 | + calculate_tokens |
| 75 | + |
| 76 | + if settings[:capacity] |
| 77 | + new_capacity = [0, settings[:capacity]].max |
| 78 | + difference = new_capacity - @capacity |
| 79 | + @capacity = new_capacity |
| 80 | + @tokens += difference |
| 81 | + @tokens = [0, @tokens].max |
| 82 | + end |
128 | 83 |
|
129 | | - private |
| 84 | + @rate = [0, settings[:rate]].max if settings[:rate] |
| 85 | + end |
| 86 | + end |
130 | 87 |
|
131 | | - def task |
132 | | - self.tokens = tokens + @rate |
| 88 | + # settings is from http sampler |
| 89 | + def tb_to_hash(settings) |
| 90 | + tb_hash = {} |
| 91 | + tb_hash[:capacity] = settings.capacity if settings.respond_to?(:capacity) |
| 92 | + tb_hash[:rate] = settings.rate if settings.respond_to?(:rate) |
| 93 | + tb_hash[:type] = settings.type if settings.respond_to?(:type) |
| 94 | + tb_hash |
133 | 95 | end |
134 | 96 | end |
135 | 97 | end |
0 commit comments