Skip to content

Commit 0347d89

Browse files
authored
Allow assigning to semaphore limit. (#215)
1 parent c5911ec commit 0347d89

File tree

2 files changed

+63
-0
lines changed

2 files changed

+63
-0
lines changed

lib/async/semaphore.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,25 @@ def initialize(limit = 1, parent: nil)
2828
# The tasks waiting on this semaphore.
2929
attr :waiting
3030

31+
# Allow setting the limit. This is useful for cases where the semaphore is used to limit the number of concurrent tasks, but the number of tasks is not known in advance or needs to be modified.
32+
#
33+
# On increasing the limit, some tasks may be immediately resumed. On decreasing the limit, some tasks may execute until the count is < than the limit.
34+
#
35+
# @parameter limit [Integer] The new limit.
36+
def limit= limit
37+
difference = limit - @limit
38+
@limit = limit
39+
40+
# We can't suspend
41+
if difference > 0
42+
difference.times do
43+
break unless node = @waiting.first
44+
45+
node.resume
46+
end
47+
end
48+
end
49+
3150
# Is the semaphore currently acquired?
3251
def empty?
3352
@count.zero?

test/async/semaphore.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,5 +151,49 @@
151151
end
152152
end
153153

154+
with '#limit' do
155+
it "has a default limit of 1" do
156+
expect(semaphore).to have_attributes(limit: be == 1)
157+
expect(semaphore).not.to be(:blocking?)
158+
end
159+
end
160+
161+
with '#limit=' do
162+
it "releases tasks when limit is increased" do
163+
semaphore.acquire
164+
expect(semaphore).to have_attributes(count: be == 1)
165+
expect(semaphore).to be(:blocking?)
166+
167+
task = Async do
168+
semaphore.acquire
169+
end
170+
171+
semaphore.limit = 2
172+
task.wait
173+
174+
expect(semaphore.count).to be == 2
175+
end
176+
177+
it "blocks tasks when limit is decreased" do
178+
semaphore.limit = 2
179+
semaphore.acquire
180+
semaphore.acquire
181+
182+
expect(semaphore).to have_attributes(count: be == 2)
183+
expect(semaphore).to be(:blocking?)
184+
185+
task = Async do
186+
semaphore.acquire
187+
end
188+
189+
semaphore.limit = 1
190+
semaphore.release
191+
semaphore.release
192+
task.wait
193+
194+
expect(semaphore.count).to be == 1
195+
end
196+
end
197+
154198
it_behaves_like ChainableAsync
155199
end

0 commit comments

Comments
 (0)