Skip to content

Commit 82273d5

Browse files
committed
Introduce Controller#wait_until_free for race-free clean-up.
1 parent 5d30423 commit 82273d5

File tree

5 files changed

+68
-15
lines changed

5 files changed

+68
-15
lines changed

async-pool.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ Gem::Specification.new do |spec|
2525

2626
spec.required_ruby_version = ">= 3.2"
2727

28-
spec.add_dependency "async", ">= 1.25"
28+
spec.add_dependency "async", ">= 2.0"
2929
end

benchmark/acquire.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
require "sus/fixtures/benchmark"
3+
require "sus/fixtures/async/scheduler_context"
4+
5+
require "async/pool"
6+
require "async/pool/resource"
7+
8+
include Sus::Fixtures::Benchmark
9+
include Sus::Fixtures::Async::SchedulerContext
10+
11+
describe Async::Pool::Controller do
12+
let(:pool) {subject.new(Async::Pool::Resource)}
13+
measure Async::Pool::Controller do |repeats|
14+
pool = self.pool
15+
16+
repeats.times do
17+
pool.acquire do |resource|
18+
end
19+
end
20+
end
21+
end

gems.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
gem "traces"
2525
gem "sus-fixtures-async"
26+
gem "sus-fixtures-benchmark"
2627

2728
gem "bake-test"
2829
gem "bake-test-external"

lib/async/pool/controller.rb

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
require "console/logger"
1010

1111
require "async"
12-
require "async/notification"
1312
require "async/semaphore"
1413

14+
require "thread"
15+
1516
module Async
1617
module Pool
1718
# A resource pool controller.
@@ -47,7 +48,8 @@ def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil,
4748
@available = []
4849

4950
# Used to signal when a resource has been released:
50-
@notification = Async::Notification.new
51+
@mutex = Thread::Mutex.new
52+
@condition = Thread::ConditionVariable.new
5153
end
5254

5355
# @attribute [Proc] The constructor used to create new resources.
@@ -124,8 +126,23 @@ def available?
124126
end
125127

126128
# Wait until a pool resource has been freed.
129+
# @deprecated Use {wait_until_free} instead.
127130
def wait
128-
@notification.wait
131+
@mutex.synchronize do
132+
@condition.wait(@mutex)
133+
end
134+
end
135+
136+
# Wait until the pool is not busy (no resources in use).
137+
def wait_until_free
138+
@mutex.synchronize do
139+
if busy?
140+
yield self if block_given?
141+
142+
# Wait until the pool is not busy:
143+
@condition.wait(@mutex) while busy?
144+
end
145+
end
129146
end
130147

131148
# Whether the pool is empty.
@@ -221,7 +238,7 @@ def retire(resource)
221238

222239
resource.close
223240

224-
@notification.signal
241+
@mutex.synchronize {@condition.broadcast}
225242

226243
return true
227244
end
@@ -285,19 +302,17 @@ def reuse(resource)
285302

286303
@resources[resource] = usage - 1
287304

288-
@notification.signal
305+
@mutex.synchronize {@condition.broadcast}
289306

290307
return true
291308
end
292309

293310
def wait_for_resource
294311
# If we fail to create a resource (below), we will end up waiting for one to become resources.
295312
until resource = available_resource
296-
@notification.wait
313+
@mutex.synchronize {@condition.wait(@mutex)}
297314
end
298-
299315
# Be careful not to context switch or fail here.
300-
301316
return resource
302317
end
303318

@@ -345,10 +360,8 @@ def acquire_existing_resource
345360
return resource
346361
end
347362
end
348-
349-
@notification.wait
363+
@mutex.synchronize {@condition.wait(@mutex)}
350364
end
351-
352365
# Only when the pool has been completely drained, return nil:
353366
return nil
354367
end

test/async/pool/controller.rb

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,28 @@ def failures(repeats: 500, time_scale: 0.001, &block)
407407
expect(pool).not.to be(:busy?)
408408
end
409409
end
410-
end
410+
411+
with "#wait_until_free" do
412+
it "waits until the pool is not busy and yields if waited" do
413+
pool = subject.new(Async::Pool::Resource, limit: 1)
414+
resource = pool.acquire
415+
waited = false
416+
finished = false
411417

412-
describe Async::Pool::Controller do
413-
let(:pool) {subject.new(Async::Pool::Resource)}
418+
Async do
419+
Async::Task.current.sleep(0.01)
420+
pool.release(resource)
421+
end
422+
423+
pool.wait_until_free do
424+
waited = true
425+
end
426+
427+
finished = true
428+
expect(waited).to be == true
429+
expect(finished).to be == true
430+
end
431+
end
414432

415433
with "#close" do
416434
it "closes all resources when going out of scope" do

0 commit comments

Comments
 (0)