|
9 | 9 | require "console/logger" |
10 | 10 |
|
11 | 11 | require "async" |
12 | | -require "async/notification" |
13 | 12 | require "async/semaphore" |
14 | 13 |
|
| 14 | +require "thread" |
| 15 | + |
15 | 16 | module Async |
16 | 17 | module Pool |
17 | 18 | # A resource pool controller. |
@@ -47,7 +48,8 @@ def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, |
47 | 48 | @available = [] |
48 | 49 |
|
49 | 50 | # Used to signal when a resource has been released: |
50 | | - @notification = Async::Notification.new |
| 51 | + @mutex = Thread::Mutex.new |
| 52 | + @condition = Thread::ConditionVariable.new |
51 | 53 | end |
52 | 54 |
|
53 | 55 | # @attribute [Proc] The constructor used to create new resources. |
@@ -124,8 +126,23 @@ def available? |
124 | 126 | end |
125 | 127 |
|
126 | 128 | # Wait until a pool resource has been freed. |
| 129 | + # @deprecated Use {wait_until_free} instead. |
127 | 130 | def wait |
128 | | - @notification.wait |
| 131 | + @mutex.synchronize do |
| 132 | + @condition.wait(@mutex) |
| 133 | + end |
| 134 | + end |
| 135 | + |
| 136 | + # Wait until the pool is not busy (no resources in use). |
| 137 | + def wait_until_free |
| 138 | + @mutex.synchronize do |
| 139 | + if busy? |
| 140 | + yield self if block_given? |
| 141 | + |
| 142 | + # Wait until the pool is not busy: |
| 143 | + @condition.wait(@mutex) while busy? |
| 144 | + end |
| 145 | + end |
129 | 146 | end |
130 | 147 |
|
131 | 148 | # Whether the pool is empty. |
@@ -221,7 +238,7 @@ def retire(resource) |
221 | 238 |
|
222 | 239 | resource.close |
223 | 240 |
|
224 | | - @notification.signal |
| 241 | + @mutex.synchronize {@condition.broadcast} |
225 | 242 |
|
226 | 243 | return true |
227 | 244 | end |
@@ -285,19 +302,17 @@ def reuse(resource) |
285 | 302 |
|
286 | 303 | @resources[resource] = usage - 1 |
287 | 304 |
|
288 | | - @notification.signal |
| 305 | + @mutex.synchronize {@condition.broadcast} |
289 | 306 |
|
290 | 307 | return true |
291 | 308 | end |
292 | 309 |
|
293 | 310 | def wait_for_resource |
294 | 311 | # If we fail to create a resource (below), we will end up waiting for one to become resources. |
295 | 312 | until resource = available_resource |
296 | | - @notification.wait |
| 313 | + @mutex.synchronize {@condition.wait(@mutex)} |
297 | 314 | end |
298 | | - |
299 | 315 | # Be careful not to context switch or fail here. |
300 | | - |
301 | 316 | return resource |
302 | 317 | end |
303 | 318 |
|
@@ -345,10 +360,8 @@ def acquire_existing_resource |
345 | 360 | return resource |
346 | 361 | end |
347 | 362 | end |
348 | | - |
349 | | - @notification.wait |
| 363 | + @mutex.synchronize {@condition.wait(@mutex)} |
350 | 364 | end |
351 | | - |
352 | 365 | # Only when the pool has been completely drained, return nil: |
353 | 366 | return nil |
354 | 367 | end |
|
0 commit comments