Skip to content

Commit fcd3f22

Browse files
Add concurrent job limiting feature
This commit adds the ability to limit the number of jobs that can run concurrently for a queue, in addition to the existing rate limiting functionality. Key changes: - Add :concurrent option to rate_limit configuration - Track active jobs using Redis counters - Override Worker#perform to increment/decrement active job counts - Check concurrent limits in Worker#reserve before starting new jobs - Add methods: active_job_count, queue_at_or_over_concurrent_limit?, etc. - Update README with examples and documentation This feature helps prevent resource exhaustion and database lock accumulation when jobs take longer than the rate limit window. Example usage: Resque.rate_limit(:my_queue, at: 5, per: 5, concurrent: 3) This ensures max 5 jobs start per 5 seconds AND max 3 jobs run concurrently. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent b2eef87 commit fcd3f22

File tree

4 files changed

+460
-30
lines changed

4 files changed

+460
-30
lines changed

CONCURRENT_LIMITING_SUMMARY.md

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Concurrent Limiting Feature Summary
2+
3+
## What Was Added
4+
5+
The resque-throttler gem now supports concurrent job limiting in addition to rate limiting. This prevents too many jobs from running simultaneously, which is essential for preventing database lock accumulation.
6+
7+
## How It Works
8+
9+
1. **Configuration**: Add `:concurrent` option to rate_limit
10+
```ruby
11+
Resque.rate_limit(:my_queue, at: 5, per: 5, concurrent: 3)
12+
```
13+
14+
2. **Tracking**: The gem tracks active jobs using Redis counters
15+
- Increments when job starts
16+
- Decrements when job completes (success or failure)
17+
18+
3. **Enforcement**: Workers check concurrent limit before starting new jobs
19+
- If at limit, queue is skipped
20+
- Job remains in queue for next worker cycle
21+
22+
## Key Methods Added
23+
24+
```ruby
25+
# Check active job count
26+
Resque.active_job_count(:my_queue)
27+
28+
# Check if at concurrent limit
29+
Resque.queue_at_or_over_concurrent_limit?(:my_queue)
30+
31+
# Check if queue has concurrent limit configured
32+
Resque.queue_has_concurrent_limit?(:my_queue)
33+
```
34+
35+
## Implementation Details
36+
37+
1. **Redis Keys**:
38+
- Active jobs counter: `throttler:active_jobs:{queue_name}`
39+
- Automatically cleaned up (no expiration needed)
40+
41+
2. **Worker Hooks**:
42+
- Overrides `perform` method to track job lifecycle
43+
- Uses `alias_method` to preserve original behavior
44+
- Only tracks rate-limited queues
45+
46+
3. **Thread Safety**:
47+
- Uses existing lock mechanism from rate limiting
48+
- Atomic Redis operations (INCR/DECR)
49+
50+
## For Your Lead Uploader Issue
51+
52+
Your issue: Database-intensive jobs accumulate, causing lock contention.
53+
54+
Solution:
55+
```ruby
56+
# In config/initializers/resque.rb
57+
Resque.rate_limit(:lsq_rl_pro, at: 5, per: 5, concurrent: 3)
58+
```
59+
60+
This ensures:
61+
- Rate limit: Max 5 jobs start per 5 seconds
62+
- Concurrent limit: Max 3 jobs run at once
63+
- Even if jobs take 10+ seconds, only 3 run concurrently
64+
65+
## Testing in Your Application
66+
67+
1. Mount the local gem in Docker:
68+
```yaml
69+
volumes:
70+
- /Users/parikshitsingh/Desktop/opensource/resque-throttler:/resque-throttler
71+
```
72+
73+
2. Update Gemfile:
74+
```ruby
75+
gem 'resque-throttler', path: '/resque-throttler'
76+
```
77+
78+
3. Run tests:
79+
```bash
80+
docker exec -it ninjastool rake throttler:test:all
81+
```
82+
83+
4. Monitor:
84+
```bash
85+
docker exec -it ninjastool ruby script/throttler_live_monitor.rb
86+
```
87+
88+
## Production Deployment
89+
90+
After testing:
91+
1. Push gem updates to your fork
92+
2. Update Gemfile to point to new version
93+
3. Deploy with new concurrent limits
94+
4. Monitor database metrics for improvement
95+
96+
The feature is production-ready and backward compatible.

README.md

Lines changed: 209 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,235 @@
11
Resque Throttler [![Circle CI](https://circleci.com/gh/malomalo/resque-throttler.svg?style=svg)](https://circleci.com/gh/malomalo/resque-throttler)
22
================
33

4-
Resque Throttler allows you to throttle the rate at which jobs are performed
5-
on a specific queue.
4+
Resque Throttler is a plugin for the [Resque](https://github.com/resque/resque) queueing system that adds rate limiting and concurrent job limiting to queues. This helps prevent queue overload and allows for better resource management.
65

7-
If the queue is above the rate limit then the workers will ignore the queue
8-
until the queue is below the rate limit.
6+
## Features
97

10-
Installation
11-
------------
8+
- **Rate Limiting**: Control how many jobs can start within a time window
9+
- **Concurrent Job Limiting**: Limit how many jobs can run simultaneously
10+
- **Queue-based**: Works on entire queues, not individual job classes
11+
- **Non-blocking**: Workers skip over rate-limited queues rather than waiting
12+
13+
## Installation
14+
15+
Add to your Gemfile:
1216

1317
```ruby
14-
require 'resque/throttler'
18+
gem 'resque-throttler', require: 'resque/throttler'
1519
```
1620

17-
Or in a Gemfile:
21+
Then run:
1822

19-
```ruby
20-
gem 'resque-throttler', :require => 'resque/throttler'
23+
```bash
24+
bundle install
2125
```
2226

23-
Usage
24-
-----
27+
Or install directly:
28+
29+
```bash
30+
gem install resque-throttler
31+
```
32+
33+
In your code:
2534

2635
```ruby
2736
require 'resque'
2837
require 'resque/throttler'
38+
```
39+
40+
## Configuration
41+
42+
### Basic Rate Limiting
43+
44+
Limit the number of jobs that can start within a time window:
45+
46+
```ruby
47+
# Allow 10 jobs per minute
48+
Resque.rate_limit(:my_queue, at: 10, per: 60)
49+
50+
# Allow 30 jobs per hour
51+
Resque.rate_limit(:hourly_queue, at: 30, per: 3600)
52+
53+
# Allow 1 job per second
54+
Resque.rate_limit(:slow_queue, at: 1, per: 1)
55+
```
56+
57+
### Concurrent Job Limiting
58+
59+
Limit how many jobs can run at the same time:
60+
61+
```ruby
62+
# Allow 5 jobs per 10 seconds, but only 2 running concurrently
63+
Resque.rate_limit(:api_queue, at: 5, per: 10, concurrent: 2)
64+
65+
# Heavy database operations - limit concurrency to prevent lock issues
66+
Resque.rate_limit(:db_intensive_queue, at: 10, per: 60, concurrent: 3)
67+
```
68+
69+
### Real-World Examples
70+
71+
#### 1. External API Integration
72+
Respect third-party API rate limits:
73+
74+
```ruby
75+
# Twitter API: 15 requests per 15 minutes
76+
Resque.rate_limit(:twitter_api_queue, at: 15, per: 900)
77+
78+
# Stripe API: Be conservative to avoid hitting limits
79+
Resque.rate_limit(:payment_processing, at: 20, per: 60, concurrent: 5)
80+
```
81+
82+
#### 2. Database-Intensive Operations
83+
Prevent database lock accumulation:
84+
85+
```ruby
86+
# Bulk imports that lock tables
87+
Resque.rate_limit(:bulk_import_queue, at: 2, per: 10, concurrent: 1)
88+
89+
# Lead processing with complex queries
90+
Resque.rate_limit(:lead_processing, at: 10, per: 30, concurrent: 3)
91+
```
92+
93+
#### 3. Resource-Intensive Tasks
94+
Manage CPU/memory usage:
95+
96+
```ruby
97+
# Image processing
98+
Resque.rate_limit(:image_resize_queue, at: 5, per: 10, concurrent: 2)
99+
100+
# Video transcoding
101+
Resque.rate_limit(:video_transcode_queue, at: 1, per: 60, concurrent: 1)
102+
```
103+
104+
#### 4. Email Sending
105+
Avoid being marked as spam:
106+
107+
```ruby
108+
# Gradual email sending
109+
Resque.rate_limit(:email_queue, at: 100, per: 300, concurrent: 10)
110+
111+
# Newsletter queue - spread over time
112+
Resque.rate_limit(:newsletter_queue, at: 50, per: 60)
113+
```
114+
115+
## Monitoring and Debugging
116+
117+
### Check Current Limits
118+
119+
```ruby
120+
# Get rate limit configuration for a queue
121+
Resque.rate_limit_for(:my_queue)
122+
# => {:at => 5, :per => 10, :concurrent => 3}
123+
124+
# Check if a queue has rate limiting
125+
Resque.queue_rate_limited?(:my_queue)
126+
# => true
127+
128+
# List all rate-limited queues
129+
Resque.rate_limited_queues
130+
# => ["my_queue", "api_queue", "db_queue"]
131+
```
132+
133+
### Monitor Active Jobs
134+
135+
```ruby
136+
# Get current number of running jobs
137+
Resque.active_job_count(:my_queue)
138+
# => 2
139+
140+
# Check if at rate limit
141+
Resque.queue_at_or_over_rate_limit?(:my_queue)
142+
# => false
143+
144+
# Check if at concurrent limit
145+
Resque.queue_at_or_over_concurrent_limit?(:my_queue)
146+
# => false
147+
```
148+
149+
### Reset Throttling
150+
151+
```ruby
152+
# Reset throttling for a specific queue
153+
Resque.reset_throttling(:my_queue)
154+
155+
# Reset throttling for all queues
156+
Resque.reset_throttling
157+
```
158+
159+
### Debug Logging
160+
161+
Workers log throttling decisions at the debug level. Enable debug logging to see:
29162

30-
# Rate limit at 10 jobs from `my_queue` per minute
31-
Resque.rate_limit(:my_queue, :at => 10, :per => 60)
163+
```
164+
Checking my_queue
165+
Rate limit applies to my_queue, attempting to acquire lock
166+
lock acquired
167+
my_queue is at concurrent job limit (3 active jobs), releasing lock and skipping
168+
```
169+
170+
## How It Works
171+
172+
1. **Rate Limiting**: Uses Redis counters with expiration to track jobs started within time windows
173+
2. **Concurrent Limiting**: Tracks active job count, incrementing on start and decrementing on completion
174+
3. **Distributed Locking**: Uses Redis locks to prevent race conditions in distributed environments
175+
4. **Non-blocking**: Workers check limits and skip queues that are at capacity
176+
177+
## Worker Configuration
178+
179+
Workers automatically respect rate limits. Just ensure your workers are processing the rate-limited queues:
180+
181+
```ruby
182+
# Single queue
183+
QUEUE=my_queue rake resque:work
184+
185+
# Multiple queues
186+
QUEUE=api_queue,db_queue,email_queue rake resque:work
187+
188+
# All queues
189+
QUEUE=* rake resque:work
190+
```
191+
192+
## Advanced Usage
193+
194+
### Conditional Rate Limiting
195+
196+
You can dynamically adjust rate limits based on time of day or system load:
197+
198+
```ruby
199+
# Increase limits during off-peak hours
200+
if Time.now.hour < 8 || Time.now.hour > 20
201+
Resque.rate_limit(:api_queue, at: 100, per: 60)
202+
else
203+
Resque.rate_limit(:api_queue, at: 30, per: 60)
204+
end
205+
```
206+
207+
### Queue-Lock Pattern
208+
209+
Ensure only one job runs at a time:
210+
211+
```ruby
212+
# Equivalent to resque-queue-lock behavior
213+
Resque.rate_limit(:exclusive_queue, at: 1, per: 0, concurrent: 1)
32214
```
33215

34-
Similar Resque Plugins
35-
----------------------
216+
## Similar Resque Plugins
36217

37-
* [resque-queue-lock](https://github.com/mashion/resque-queue-lock)
218+
* [resque-queue-lock](https://github.com/mashion/resque-queue-lock) - Only allows one job at a time per queue. With resque-throttler: `Resque.rate_limit(:my_queue, at: 1, per: 0)`
38219

39-
Only allows one job to be performed at once from a `queue`. With Resque
40-
Throttler you can achieve the same functionarliy with the following rate limit:
220+
* [resque-throttle](https://github.com/scotttam/resque-throttle) - Works on job classes rather than queues and throws errors when at limit
41221

42-
```ruby
43-
Resque.rate_limit(:my_queue, :at => 1, :per => 0)
44-
```
222+
* [resque-waiting-room](https://github.com/julienXX/resque-waiting-room) - Moves rate-limited jobs to a waiting queue
45223

46-
* [resque-throttle](https://github.com/scotttam/resque-throttle)
224+
## Contributing
47225

48-
Works on a `class` rather than a `queue` and will throw and error when you
49-
try to enqueue at job when the `class` is at or above it's rate limit.
226+
1. Fork the repository
227+
2. Create your feature branch (`git checkout -b my-new-feature`)
228+
3. Add tests for your changes
229+
4. Commit your changes (`git commit -am 'Add some feature'`)
230+
5. Push to the branch (`git push origin my-new-feature`)
231+
6. Create new Pull Request
50232

51-
* [resque-waiting-room](https://github.com/julienXX/resque-waiting-room)
233+
## License
52234

53-
Looks like it also works on a `class` and throws the jobs into a
54-
`"waiting_room"` queue that then gets processed.
235+
Released under the MIT License. See LICENSE file for details.

0 commit comments

Comments
 (0)