Skip to content

Commit b2eef87

Browse files
Strict rate limits :: Comments
1 parent 0f50bae commit b2eef87

File tree

1 file changed

+72
-11
lines changed

1 file changed

+72
-11
lines changed

lib/resque/throttler.rb

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,52 +3,90 @@
33

44
module Resque
55
module Plugins
6+
# The Throttler module provides rate-limiting capabilities for Resque queues.
67
module Throttler
78
extend self
89

10+
# Prefix used for Redis keys related to throttling.
911
LOCK_KEYS_PREFIX = 'throttler'.freeze
1012

11-
# Initialize rate limits
13+
# Initializes rate limits when the module is extended.
14+
#
15+
# @param [Object] other The object extending this module.
1216
def self.extended(other)
1317
other.instance_variable_set(:@rate_limits, {})
1418
end
1519

16-
# Method to set rate limits on a specific queue
20+
# Sets rate limits for a specific queue.
21+
#
22+
# @param [Symbol, String] queue The name of the queue to rate limit.
23+
# @param [Hash] options The rate limit options.
24+
# @option options [Integer] :at The maximum number of jobs allowed in the time window.
25+
# @option options [Integer] :per The time window in seconds.
26+
#
27+
# @example
28+
# Resque.rate_limit(:my_queue, at: 5, per: 60)
29+
#
30+
# @raise [ArgumentError] If either :at or :per option is missing.
1731
def rate_limit(queue, options = {})
18-
if options.keys.sort != [:at, :per]
32+
unless options.keys.sort == [:at, :per]
1933
raise ArgumentError.new("Missing either :at or :per in options")
2034
end
2135

2236
@rate_limits[queue.to_s] = options
2337
end
2438

25-
# Get rate limit configuration for a queue
39+
# Retrieves the rate limit configuration for a queue.
40+
#
41+
# @param [Symbol, String] queue The name of the queue.
42+
# @return [Hash, nil] The rate limit options for the queue, or nil if not set.
2643
def rate_limit_for(queue)
2744
@rate_limits[queue.to_s]
2845
end
2946

47+
# Returns all queues that have rate limits configured.
48+
#
49+
# @return [Array<String>] List of queue names as strings.
3050
def rate_limited_queues
3151
@rate_limits.keys
3252
end
3353

34-
# Check if a queue has a rate limit configured
54+
# Checks if a queue has a rate limit configured.
55+
#
56+
# @param [Symbol, String] queue The name of the queue.
57+
# @return [Boolean] True if the queue has a rate limit, false otherwise.
3558
def queue_rate_limited?(queue)
3659
!!@rate_limits[queue.to_s]
3760
end
3861

62+
# Generates the Redis key for the lock of a specific queue.
63+
#
64+
# @param [Symbol, String] queue The name of the queue.
65+
# @return [String] The Redis key for the queue lock.
3966
def rate_limiting_queue_lock_key(queue)
4067
"#{LOCK_KEYS_PREFIX}:lock:#{queue}"
4168
end
4269

70+
# Generates the Redis key for the rate limit counter of a specific queue.
71+
#
72+
# @param [Symbol, String] queue The name of the queue.
73+
# @return [String] The Redis key for the rate limit counter.
4374
def rate_limit_key_for(queue)
4475
"#{LOCK_KEYS_PREFIX}:rate_limit:#{queue}"
4576
end
4677

78+
# Retrieves the number of jobs processed in the current rate limit window for a queue.
79+
#
80+
# @param [Symbol, String] queue The name of the queue.
81+
# @return [Integer] The number of jobs processed.
4782
def processed_job_count_in_rate_limit_window(queue)
4883
Resque.redis.get(rate_limit_key_for(queue)).to_i
4984
end
5085

51-
# Check if a queue has exceeded its rate limit
86+
# Checks if a queue has reached or exceeded its rate limit.
87+
#
88+
# @param [Symbol, String] queue The name of the queue.
89+
# @return [Boolean] True if the queue is at or over its rate limit, false otherwise.
5290
def queue_at_or_over_rate_limit?(queue)
5391
if queue_rate_limited?(queue)
5492
processed_job_count_in_rate_limit_window(queue) >= rate_limit_for(queue)[:at]
@@ -57,6 +95,9 @@ def queue_at_or_over_rate_limit?(queue)
5795
end
5896
end
5997

98+
# Resets throttling data for one or all rate-limited queues.
99+
#
100+
# @param [Symbol, String, nil] queue The name of the queue to reset, or nil to reset all.
60101
def reset_throttling(queue = nil)
61102
if queue
62103
reset_queue_throttling(queue)
@@ -69,6 +110,9 @@ def reset_throttling(queue = nil)
69110

70111
private
71112

113+
# Resets throttling data for a specific queue.
114+
#
115+
# @param [Symbol, String] queue The name of the queue.
72116
def reset_queue_throttling(queue)
73117
lock_key = rate_limiting_queue_lock_key(queue)
74118
rate_limit_key = rate_limit_key_for(queue)
@@ -85,7 +129,12 @@ def reset_queue_throttling(queue)
85129
# Extend Resque::Worker to manage rate-limited queue jobs
86130
module Resque
87131
class Worker
88-
# Override the `reserve` method to implement rate-limiting logic
132+
# Overrides the `reserve` method to implement rate-limiting logic.
133+
#
134+
# This method attempts to reserve a job from the queues in order,
135+
# applying rate limits where configured.
136+
#
137+
# @return [Resque::Job, nil] The next job to process, or nil if none are available.
89138
def reserve
90139
queues.each do |queue|
91140
log_with_severity :debug, "Checking #{queue}"
@@ -144,17 +193,29 @@ def reserve
144193

145194
private
146195

147-
# Helper method to acquire a Redis lock
196+
# Acquires a Redis lock for the given queue.
197+
#
198+
# @param [Symbol, String] queue The name of the queue.
199+
# @return [Boolean] True if the lock was acquired, false otherwise.
148200
def acquire_lock(queue)
149-
Resque.redis.set(Resque.rate_limiting_queue_lock_key(queue), "locked", ex: 30, nx: true) # NX means "set if not exists"
201+
Resque.redis.set(
202+
Resque.rate_limiting_queue_lock_key(queue),
203+
"locked",
204+
ex: 30, # Set expiration to 30 seconds.
205+
nx: true # NX option ensures the key is set only if it does not exist.
206+
)
150207
end
151208

152-
# Helper method to release the Redis lock
209+
# Releases the Redis lock for the given queue.
210+
#
211+
# @param [Symbol, String] queue The name of the queue.
153212
def release_lock(queue)
154213
Resque.redis.del(Resque.rate_limiting_queue_lock_key(queue))
155214
end
156215

157-
# Helper method to increment the job counter for rate-limiting
216+
# Increments the job counter for rate limiting on a queue.
217+
#
218+
# @param [Symbol, String] queue The name of the queue.
158219
def increment_job_counter(queue)
159220
Resque.redis.incr(Resque.rate_limit_key_for(queue))
160221
limit = Resque.rate_limit_for(queue)

0 commit comments

Comments
 (0)