Skip to content

Commit 821d47e

Browse files
committed
Merge pull request #206 from adamruzicka/counting-semaphores
Adds counting semaphores
2 parents 1d3e005 + 8a1f8c6 commit 821d47e

File tree

3 files changed

+393
-0
lines changed

3 files changed

+393
-0
lines changed

lib/concurrent/atomic/semaphore.rb

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
require 'concurrent/atomic/condition'
2+
3+
module Concurrent
4+
class MutexSemaphore
5+
# @!macro [attach] semaphore_method_initialize
6+
#
7+
# Create a new `Semaphore` with the initial `count`.
8+
#
9+
# @param [Fixnum] count the initial count
10+
#
11+
# @raise [ArgumentError] if `count` is not an integer or is less than zero
12+
def initialize(count)
13+
unless count.is_a?(Fixnum) && count >= 0
14+
fail ArgumentError, 'count must be an non-negative integer'
15+
end
16+
@mutex = Mutex.new
17+
@condition = Condition.new
18+
@free = count
19+
end
20+
21+
# @!macro [attach] semaphore_method_acquire
22+
#
23+
# Acquires the given number of permits from this semaphore,
24+
# blocking until all are available.
25+
#
26+
# @param [Fixnum] permits Number of permits to acquire
27+
#
28+
# @raise [ArgumentError] if `permits` is not an integer or is less than
29+
# one
30+
#
31+
# @return [Nil]
32+
def acquire(permits = 1)
33+
unless permits.is_a?(Fixnum) && permits > 0
34+
fail ArgumentError, 'permits must be an integer greater than zero'
35+
end
36+
@mutex.synchronize do
37+
try_acquire_timed(permits, nil)
38+
nil
39+
end
40+
end
41+
42+
# @!macro [attach] semaphore_method_available_permits
43+
#
44+
# Returns the current number of permits available in this semaphore.
45+
#
46+
# @return [Integer]
47+
def available_permits
48+
@mutex.synchronize { @free }
49+
end
50+
51+
# @!macro [attach] semaphore_method_drain_permits
52+
#
53+
# Acquires and returns all permits that are immediately available.
54+
#
55+
# @return [Integer]
56+
def drain_permits
57+
@mutex.synchronize do
58+
@free.tap { |_| @free = 0 }
59+
end
60+
end
61+
62+
# @!macro [attach] semaphore_method_try_acquire
63+
#
64+
# Acquires the given number of permits from this semaphore,
65+
# only if all are available at the time of invocation or within
66+
# `timeout` interval
67+
#
68+
# @param [Fixnum] permits the number of permits to acquire
69+
#
70+
# @param [Fixnum] timeout the number of seconds to wait for the counter
71+
# or `nil` to return immediately
72+
#
73+
# @raise [ArgumentError] if `permits` is not an integer or is less than
74+
# one
75+
#
76+
# @return [Boolean] `false` if no permits are available, `true` when
77+
# acquired a permit
78+
def try_acquire(permits = 1, timeout = nil)
79+
unless permits.is_a?(Fixnum) && permits > 0
80+
fail ArgumentError, 'permits must be an integer greater than zero'
81+
end
82+
@mutex.synchronize do
83+
if timeout.nil?
84+
try_acquire_now(permits)
85+
else
86+
try_acquire_timed(permits, timeout)
87+
end
88+
end
89+
end
90+
91+
# @!macro [attach] semaphore_method_release
92+
#
93+
# Releases the given number of permits, returning them to the semaphore.
94+
#
95+
# @param [Fixnum] permits Number of permits to return to the semaphore.
96+
#
97+
# @raise [ArgumentError] if `permits` is not a number or is less than one
98+
#
99+
# @return [Nil]
100+
def release(permits = 1)
101+
unless permits.is_a?(Fixnum) && permits > 0
102+
fail ArgumentError, 'permits must be an integer greater than zero'
103+
end
104+
@mutex.synchronize do
105+
@free += permits
106+
permits.times { @condition.signal }
107+
end
108+
nil
109+
end
110+
111+
# @!macro [attach] semaphore_method_reduce_permits
112+
#
113+
# @api private
114+
#
115+
# Shrinks the number of available permits by the indicated reduction.
116+
#
117+
# @param [Fixnum] reduction Number of permits to remove.
118+
#
119+
# @raise [ArgumentError] if `reduction` is not an integer or is negative
120+
#
121+
# @raise [ArgumentError] if `@free` - `@reduction` is less than zero
122+
#
123+
# @return [Nil]
124+
def reduce_permits(reduction)
125+
unless reduction.is_a?(Fixnum) && reduction >= 0
126+
fail ArgumentError, 'reduction must be an non-negative integer'
127+
end
128+
@mutex.synchronize { @free -= reduction }
129+
nil
130+
end
131+
132+
private
133+
134+
def try_acquire_now(permits)
135+
if @free >= permits
136+
@free -= permits
137+
true
138+
else
139+
false
140+
end
141+
end
142+
143+
def try_acquire_timed(permits, timeout)
144+
remaining = Condition::Result.new(timeout)
145+
while !try_acquire_now(permits) && remaining.can_wait?
146+
@condition.signal
147+
remaining = @condition.wait(@mutex, remaining.remaining_time)
148+
end
149+
remaining.can_wait? ? true : false
150+
end
151+
end
152+
153+
if RUBY_PLATFORM == 'java'
154+
155+
# @!macro semaphore
156+
class JavaSemaphore
157+
# @!macro semaphore_method_initialize
158+
def initialize(count)
159+
unless count.is_a?(Fixnum) && count >= 0
160+
fail(ArgumentError,
161+
'count must be in integer greater than or equal zero')
162+
end
163+
@semaphore = java.util.concurrent.Semaphore.new(count)
164+
end
165+
166+
# @!macro semaphore_method_acquire
167+
def acquire(permits = 1)
168+
unless permits.is_a?(Fixnum) && permits > 0
169+
fail ArgumentError, 'permits must be an integer greater than zero'
170+
end
171+
@semaphore.acquire(permits)
172+
end
173+
174+
# @!macro semaphore_method_available_permits
175+
def available_permits
176+
@semaphore.availablePermits
177+
end
178+
179+
# @!macro semaphore_method_drain_permits
180+
def drain_permits
181+
@semaphore.drainPermits
182+
end
183+
184+
# @!macro semaphore_method_try_acquire
185+
def try_acquire(permits = 1, timeout = nil)
186+
unless permits.is_a?(Fixnum) && permits > 0
187+
fail ArgumentError, 'permits must be an integer greater than zero'
188+
end
189+
if timeout.nil?
190+
@semaphore.tryAcquire(permits)
191+
else
192+
@semaphore.tryAcquire(permits,
193+
timeout,
194+
java.util.concurrent.TimeUnit::SECONDS)
195+
end
196+
end
197+
198+
# @!macro semaphore_method_release
199+
def release(permits = 1)
200+
unless permits.is_a?(Fixnum) && permits > 0
201+
fail ArgumentError, 'permits must be an integer greater than zero'
202+
end
203+
@semaphore.release(permits)
204+
true
205+
end
206+
207+
# @!macro semaphore_method_reduce_permits
208+
def reduce_permits(reduction)
209+
unless reduction.is_a?(Fixnum) && reduction >= 0
210+
fail ArgumentError, 'reduction must be an non-negative integer'
211+
end
212+
@semaphore.reducePermits(reduction)
213+
end
214+
end
215+
216+
# @!macro semaphore
217+
class Semaphore < JavaSemaphore
218+
end
219+
220+
else
221+
222+
# @!macro semaphore
223+
class Semaphore < MutexSemaphore
224+
end
225+
end
226+
end

lib/concurrent/atomics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
require 'concurrent/atomic/count_down_latch'
99
require 'concurrent/atomic/event'
1010
require 'concurrent/atomic/synchronization'
11+
require 'concurrent/atomic/semaphore'
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
require 'spec_helper'
2+
3+
shared_examples :semaphore do
4+
let(:semaphore) { described_class.new(3) }
5+
6+
context '#initialize' do
7+
it 'raises an exception if the initial count is not an integer' do
8+
expect {
9+
described_class.new('foo')
10+
}.to raise_error(ArgumentError)
11+
end
12+
end
13+
14+
describe '#acquire' do
15+
context 'permits available' do
16+
it 'should return true immediately' do
17+
result = semaphore.acquire
18+
expect(result).to be_nil
19+
end
20+
end
21+
22+
context 'not enough permits available' do
23+
it 'should block thread until permits are available' do
24+
semaphore.drain_permits
25+
Thread.new { sleep(0.2) && semaphore.release }
26+
27+
result = semaphore.acquire
28+
expect(result).to be_nil
29+
expect(semaphore.available_permits).to eq 0
30+
end
31+
end
32+
end
33+
34+
describe '#drain_permits' do
35+
it 'drains all available permits' do
36+
drained = semaphore.drain_permits
37+
expect(drained).to eq 3
38+
expect(semaphore.available_permits).to eq 0
39+
end
40+
41+
it 'drains nothing in no permits are available' do
42+
semaphore.reduce_permits 3
43+
drained = semaphore.drain_permits
44+
expect(drained).to eq 0
45+
end
46+
end
47+
48+
describe '#try_acquire' do
49+
context 'without timeout' do
50+
it 'acquires immediately if permits are available' do
51+
result = semaphore.try_acquire(1)
52+
expect(result).to be_truthy
53+
end
54+
55+
it 'returns false immediately in no permits are available' do
56+
result = semaphore.try_acquire(20)
57+
expect(result).to be_falsey
58+
end
59+
end
60+
61+
context 'with timeout' do
62+
it 'acquires immediately if permits are available' do
63+
result = semaphore.try_acquire(1, 5)
64+
expect(result).to be_truthy
65+
end
66+
67+
it 'acquires when permits are available within timeout' do
68+
semaphore.drain_permits
69+
Thread.new { sleep 0.1 && semaphore.release }
70+
result = semaphore.try_acquire(1, 1)
71+
expect(result).to be_truthy
72+
end
73+
74+
it 'returns false on timeout' do
75+
semaphore.drain_permits
76+
result = semaphore.try_acquire(1, 0.1)
77+
expect(result).to be_falsey
78+
end
79+
end
80+
end
81+
82+
describe '#reduce_permits' do
83+
it 'raises ArgumentError if reducing by negative number' do
84+
expect {
85+
semaphore.reduce_permits(-1)
86+
}.to raise_error(ArgumentError)
87+
end
88+
89+
it 'reduces permits below zero' do
90+
semaphore.reduce_permits 1003
91+
expect(semaphore.available_permits).to eq -1000
92+
end
93+
94+
it 'reduces permits' do
95+
semaphore.reduce_permits 1
96+
expect(semaphore.available_permits).to eq 2
97+
semaphore.reduce_permits 2
98+
expect(semaphore.available_permits).to eq 0
99+
end
100+
end
101+
end
102+
103+
module Concurrent
104+
describe MutexSemaphore do
105+
it_should_behave_like :semaphore
106+
107+
context 'spurious wake ups' do
108+
subject { described_class.new(1) }
109+
110+
before(:each) do
111+
def subject.simulate_spurious_wake_up
112+
@mutex.synchronize do
113+
@condition.signal
114+
@condition.broadcast
115+
end
116+
end
117+
subject.drain_permits
118+
end
119+
120+
it 'should resist to spurious wake ups without timeout' do
121+
@expected = true
122+
# would set @expected to false
123+
Thread.new { @expected = subject.acquire }
124+
125+
sleep(0.1)
126+
subject.simulate_spurious_wake_up
127+
128+
sleep(0.1)
129+
expect(@expected).to be_truthy
130+
end
131+
132+
it 'should resist to spurious wake ups with timeout' do
133+
@expected = true
134+
# sets @expected to false in another thread
135+
t = Thread.new { @expected = subject.try_acquire(1, 0.3) }
136+
137+
sleep(0.1)
138+
subject.simulate_spurious_wake_up
139+
140+
sleep(0.1)
141+
expect(@expected).to be_truthy
142+
143+
t.join
144+
expect(@expected).to be_falsey
145+
end
146+
end
147+
end
148+
149+
if TestHelpers.jruby?
150+
describe JavaSemaphore do
151+
it_should_behave_like :semaphore
152+
end
153+
end
154+
155+
describe Semaphore do
156+
if jruby?
157+
it 'inherits from JavaSemaphore' do
158+
expect(Semaphore.ancestors).to include(JavaSemaphore)
159+
end
160+
else
161+
it 'inherits from MutexSemaphore' do
162+
expect(Semaphore.ancestors).to include(MutexSemaphore)
163+
end
164+
end
165+
end
166+
end

0 commit comments

Comments
 (0)