Skip to content

Commit 371234d

Browse files
Add comprehensive timeout support to queue implementations. (#420)
- Add `closed?` method to `PriorityQueue` for full queue interface compatibility.
1 parent 083452f commit 371234d

File tree

6 files changed

+162
-29
lines changed

6 files changed

+162
-29
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "sus/fixtures/async"
7+
8+
module Async
9+
AQueueWithTimeout = Sus::Shared("a queue with timeout support") do
10+
include Sus::Fixtures::Async::ReactorContext
11+
12+
let(:queue) {subject.new}
13+
14+
with "timeout support" do
15+
it "supports timeout: 0 for non-blocking dequeue" do
16+
# Empty queue should return nil immediately
17+
result = queue.dequeue(timeout: 0)
18+
expect(result).to be_nil
19+
20+
# With item, should return immediately
21+
queue.push("item")
22+
result = queue.dequeue(timeout: 0)
23+
expect(result).to be == "item"
24+
end
25+
26+
it "supports timeout: 0 for non-blocking pop" do
27+
# Empty queue should return nil immediately
28+
result = queue.pop(timeout: 0)
29+
expect(result).to be_nil
30+
31+
# With item, should return immediately
32+
queue.push("item")
33+
result = queue.pop(timeout: 0)
34+
expect(result).to be == "item"
35+
end
36+
37+
it "supports positive timeout values" do
38+
start_time = Time.now
39+
40+
# Should timeout after specified time
41+
result = queue.dequeue(timeout: 0.1)
42+
elapsed = Time.now - start_time
43+
44+
expect(result).to be_nil
45+
expect(elapsed).to be >= 0.1
46+
expect(elapsed).to be < 0.2 # Should not wait much longer
47+
end
48+
49+
it "returns item before timeout expires" do
50+
result = nil
51+
52+
# Start dequeue with timeout in background
53+
task = reactor.async do
54+
result = queue.dequeue(timeout: 1.0)
55+
end
56+
57+
# Add item quickly
58+
reactor.sleep(0.05)
59+
queue.push("quick_item")
60+
61+
task.wait
62+
expect(result).to be == "quick_item"
63+
end
64+
65+
it "handles concurrent timeouts" do
66+
results = []
67+
68+
# Start multiple consumers with different timeouts
69+
task1 = reactor.async do
70+
results << [:task1, queue.dequeue(timeout: 0.1)]
71+
end
72+
73+
task2 = reactor.async do
74+
results << [:task2, queue.dequeue(timeout: 0.2)]
75+
end
76+
77+
task3 = reactor.async do
78+
results << [:task3, queue.dequeue(timeout: 0.3)]
79+
end
80+
81+
# Wait for all to timeout
82+
[task1, task2, task3].each(&:wait)
83+
84+
# All should have timed out
85+
expect(results).to be == [
86+
[:task1, nil],
87+
[:task2, nil],
88+
[:task3, nil]
89+
]
90+
end
91+
92+
it "preserves FIFO order when items arrive before timeout" do
93+
results = []
94+
95+
# Start multiple consumers with same timeout
96+
tasks = 3.times.map do |i|
97+
reactor.async do
98+
results << [i, queue.dequeue(timeout: 1.0)]
99+
end
100+
end
101+
102+
# Add items quickly
103+
reactor.sleep(0.05)
104+
queue.push("item1")
105+
queue.push("item2")
106+
queue.push("item3")
107+
108+
tasks.each(&:wait)
109+
110+
# Should maintain FIFO order
111+
expect(results).to be == [
112+
[0, "item1"],
113+
[1, "item2"],
114+
[2, "item3"]
115+
]
116+
end
117+
end
118+
end
119+
end

lib/async/priority_queue.rb

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ def signal(value)
3939
condition.signal
4040
end
4141

42-
def wait_for_value(mutex)
43-
condition.wait(mutex)
42+
def wait_for_value(mutex, timeout = nil)
43+
condition.wait(mutex, timeout)
4444
return self.value
4545
end
4646

@@ -55,6 +55,8 @@ def valid?
5555
end
5656
end
5757

58+
private_constant :Waiter
59+
5860
# Create a new priority queue.
5961
#
6062
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
@@ -81,6 +83,11 @@ def close
8183
end
8284
end
8385

86+
# @returns [Boolean] Whether the queue is closed.
87+
def closed?
88+
@closed
89+
end
90+
8491
# @attribute [Array] The items in the queue.
8592
attr :items
8693

@@ -153,13 +160,14 @@ def enqueue(*items)
153160

154161
# Remove and return the next item from the queue.
155162
#
156-
# If the queue is empty, this method will block until an item is available.
163+
# If the queue is empty, this method will block until an item is available or timeout expires.
157164
# Fibers are served in priority order, with higher priority fibers receiving
158165
# items first.
159166
#
160167
# @parameter priority [Numeric] The priority of this consumer (higher = served first).
161-
# @returns [Object] The next item in the queue.
162-
def dequeue(priority: 0)
168+
# @parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
169+
# @returns [Object, nil] The next item in the queue, or nil if timeout expires.
170+
def dequeue(priority: 0, timeout: nil)
163171
@mutex.synchronize do
164172
# If queue is closed and empty, return nil immediately:
165173
if @closed && @items.empty?
@@ -174,6 +182,9 @@ def dequeue(priority: 0)
174182
end
175183
end
176184

185+
# Handle immediate timeout (non-blocking)
186+
return nil if timeout == 0
187+
177188
# Need to wait - create our own condition variable and add to waiting queue:
178189
sequence = @sequence
179190
@sequence += 1
@@ -185,7 +196,7 @@ def dequeue(priority: 0)
185196
@waiting.push(waiter)
186197

187198
# Wait for our specific condition variable to be signaled:
188-
return waiter.wait_for_value(@mutex)
199+
return waiter.wait_for_value(@mutex, timeout)
189200
ensure
190201
waiter&.invalidate!
191202
end
@@ -195,8 +206,10 @@ def dequeue(priority: 0)
195206
# Compatibility with {::Queue#pop}.
196207
#
197208
# @parameter priority [Numeric] The priority of this consumer.
198-
def pop(priority: 0)
199-
self.dequeue(priority: priority)
209+
# @parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
210+
# @returns [Object, nil] The dequeued item, or nil if timeout expires.
211+
def pop(priority: 0, timeout: nil)
212+
self.dequeue(priority: priority, timeout: timeout)
200213
end
201214

202215
# Process each item in the queue.

lib/async/queue.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,17 @@ def enqueue(*items)
7373
end
7474

7575
# Remove and return the next item from the queue.
76-
def dequeue
77-
@delegate.pop
76+
# @parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
77+
# @returns [Object, nil] The dequeued item, or nil if timeout expires.
78+
def dequeue(timeout: nil)
79+
@delegate.pop(timeout: timeout)
7880
end
7981

8082
# Compatibility with {::Queue#pop}.
81-
def pop(...)
82-
@delegate.pop(...)
83+
# @parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
84+
# @returns [Object, nil] The dequeued item, or nil if timeout expires.
85+
def pop(timeout: nil)
86+
@delegate.pop(timeout: timeout)
8387
end
8488

8589
# Process each item in the queue.

releases.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Add timeout support to `Async::Queue#dequeue` and `Async::Queue#pop` methods.
6+
- Add timeout support to `Async::PriorityQueue#dequeue` and `Async::PriorityQueue#pop` methods.
7+
- Add `closed?` method to `Async::PriorityQueue` for full queue interface compatibility.
8+
- Support non-blocking operations using `timeout: 0` parameter.
9+
310
## v2.29.0
411

512
This release introduces thread-safety as a core concept of Async. Many core classes now have thread-safe guarantees, allowing them to be used safely across multiple threads.

test/async/priority_queue.rb

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,17 @@
66

77
require "async/priority_queue"
88
require "sus/fixtures/async"
9+
require "async/a_queue"
10+
require "async/a_queue_with_timeout"
911

1012
describe Async::PriorityQueue do
1113
include Sus::Fixtures::Async::ReactorContext
1214

1315
let(:queue) {subject.new}
1416

17+
it_behaves_like Async::AQueue
18+
it_behaves_like Async::AQueueWithTimeout
19+
1520
with "#push" do
1621
it "can push and pop items" do
1722
queue.push("item")
@@ -608,21 +613,4 @@
608613
expect(priorities_served).to be == [2, 0]
609614
end
610615
end
611-
612-
describe Async::PriorityQueue::Waiter do
613-
it "should invalidate correctly" do
614-
condition = ConditionVariable.new
615-
fiber = Fiber.current
616-
waiter = Async::PriorityQueue::Waiter.new(fiber, 1, 1, condition, nil)
617-
618-
expect(waiter).to be(:valid?)
619-
expect(waiter.fiber).to be == fiber
620-
expect(waiter.condition).to be == condition
621-
622-
waiter.invalidate!
623-
624-
expect(waiter).not.to be(:valid?)
625-
expect(waiter.fiber).to be_nil
626-
end
627-
end
628616
end

test/async/queue.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010

1111
require "sus/fixtures/async"
1212
require "async/a_queue"
13+
require "async/a_queue_with_timeout"
1314

1415
describe Async::Queue do
1516
include Sus::Fixtures::Async::ReactorContext
1617

1718
it_behaves_like Async::AQueue
19+
it_behaves_like Async::AQueueWithTimeout
1820
end

0 commit comments

Comments
 (0)