Skip to content

Commit 21136ba

Browse files
committed
refactor!: require TTL upon expirable elements add
The final goal is to allow specify cooldown_period as Proc: Sidekiq::Throttled.configure do |c| c.cooldown_period = lambda do |unit_of_work| unit_of_work.queue == "default" ? 2.0 : 10.0 end end
1 parent 02704ab commit 21136ba

File tree

3 files changed

+46
-30
lines changed

3 files changed

+46
-30
lines changed

lib/sidekiq/throttled/cooldown.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@ def [](config)
2323

2424
# @param config [Config]
2525
def initialize(config)
26-
@queues = ExpirableSet.new(config.cooldown_period)
27-
@threshold = config.cooldown_threshold
26+
@queues = ExpirableSet.new
2827
@tracker = Concurrent::Map.new
28+
@period = config.cooldown_period
29+
@threshold = config.cooldown_threshold
2930
end
3031

3132
# Notify that given queue returned job that was throttled.
3233
#
3334
# @param queue [String]
3435
# @return [void]
3536
def notify_throttled(queue)
36-
@queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ)
37+
@queues.add(queue, ttl: @period) if @threshold <= @tracker.merge_pair(queue, 1, &:succ)
3738
end
3839

3940
# Notify that given queue returned job that was not throttled.

lib/sidekiq/throttled/expirable_set.rb

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,36 @@ module Throttled
99
# Set of elements with expirations.
1010
#
1111
# @example
12-
# set = ExpirableSet.new(10.0)
13-
# set.add("a")
12+
# set = ExpirableSet.new
13+
# set.add("a", ttl: 10.0)
1414
# sleep(5)
15-
# set.add("b")
15+
# set.add("b", ttl: 10.0)
1616
# set.to_a # => ["a", "b"]
1717
# sleep(5)
1818
# set.to_a # => ["b"]
1919
class ExpirableSet
2020
include Enumerable
2121

22-
# @param ttl [Float] expiration is seconds
23-
# @raise [ArgumentError] if `ttl` is not positive Float
24-
def initialize(ttl)
25-
raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive?
26-
22+
def initialize
2723
@elements = Concurrent::Map.new
28-
@ttl = ttl
2924
end
3025

3126
# @param element [Object]
27+
# @param ttl [Float] expiration is seconds
28+
# @raise [ArgumentError] if `ttl` is not positive Float
3229
# @return [ExpirableSet] self
33-
def add(element)
34-
# cleanup expired elements to avoid mem-leak
30+
def add(element, ttl:)
31+
raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive?
32+
3533
horizon = now
34+
35+
# Cleanup expired elements
3636
expired = @elements.each_pair.select { |(_, sunset)| expired?(sunset, horizon) }
3737
expired.each { |pair| @elements.delete_pair(*pair) }
3838

39-
# add new element
40-
@elements[element] = now + @ttl
39+
# Add or update an element
40+
sunset = horizon + ttl
41+
@elements.merge_pair(element, sunset) { |old_sunset| [old_sunset, sunset].max }
4142

4243
self
4344
end

spec/lib/sidekiq/throttled/expirable_set_spec.rb

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,44 @@
33
require "sidekiq/throttled/expirable_set"
44

55
RSpec.describe Sidekiq::Throttled::ExpirableSet do
6-
subject(:expirable_set) { described_class.new(2.0) }
6+
subject(:expirable_set) { described_class.new }
77

88
it { is_expected.to be_an Enumerable }
99

10-
describe ".new" do
10+
describe "#add" do
1111
it "raises ArgumentError if given TTL is not Float" do
12-
expect { described_class.new(42) }.to raise_error(ArgumentError)
12+
expect { expirable_set.add("a", ttl: 42) }.to raise_error(ArgumentError)
1313
end
1414

1515
it "raises ArgumentError if given TTL is not positive" do
16-
expect { described_class.new(0.0) }.to raise_error(ArgumentError)
16+
expect { expirable_set.add("a", ttl: 0.0) }.to raise_error(ArgumentError)
1717
end
18-
end
1918

20-
describe "#add" do
2119
it "returns self" do
22-
expect(expirable_set.add("a")).to be expirable_set
20+
expect(expirable_set.add("a", ttl: 1.0)).to be expirable_set
2321
end
2422

2523
it "adds uniq elements to the set" do
26-
expirable_set.add("a").add("b").add("b").add("a")
24+
expirable_set.add("a", ttl: 1.0).add("b", ttl: 1.0).add("b", ttl: 1.0).add("a", ttl: 1.0)
2725

2826
expect(expirable_set).to contain_exactly("a", "b")
2927
end
28+
29+
it "uses longest sunset" do
30+
monotonic_time = 0.0
31+
allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time }
32+
33+
expirable_set.add("a", ttl: 1.0).add("b", ttl: 42.0).add("b", ttl: 1.0).add("a", ttl: 2.0)
34+
35+
monotonic_time += 0.5
36+
expect(expirable_set).to contain_exactly("a", "b")
37+
38+
monotonic_time += 1.0
39+
expect(expirable_set).to contain_exactly("a", "b")
40+
41+
monotonic_time += 0.5
42+
expect(expirable_set).to contain_exactly("b")
43+
end
3044
end
3145

3246
describe "#each" do
@@ -37,16 +51,16 @@
3751

3852
allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time }
3953

40-
expirable_set.add("lorem")
41-
expirable_set.add("ipsum")
54+
expirable_set.add("lorem", ttl: 1.0)
55+
expirable_set.add("ipsum", ttl: 1.0)
4256

43-
monotonic_time += 1
57+
monotonic_time += 0.5
4458

45-
expirable_set.add("ipsum")
59+
expirable_set.add("ipsum", ttl: 1.0)
4660

47-
monotonic_time += 1
61+
monotonic_time += 0.5
4862

49-
expirable_set.add("dolor")
63+
expirable_set.add("dolor", ttl: 1.0)
5064
end
5165

5266
it { is_expected.to be_an(Enumerator) }

0 commit comments

Comments
 (0)