Skip to content

Commit 17b4e78

Browse files
Add Async::PriorityQueue for Consumer-Priority Resource Allocation. (#412)
1 parent 6cece07 commit 17b4e78

File tree

4 files changed

+981
-0
lines changed

4 files changed

+981
-0
lines changed

fixtures/async/a_priority_queue.rb

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
# Copyright, 2025, by Shopify Inc.
6+
7+
require "async"
8+
require "async/priority_queue"
9+
require "sus/fixtures/async"
10+
11+
require "async/chainable_async"
12+
13+
module Async
14+
APriorityQueue = Sus::Shared("a priority queue") do
15+
let(:queue) {subject.new}
16+
17+
# Include all basic queue behaviors
18+
it_behaves_like Async::AQueue
19+
20+
with "priority ordering" do
21+
it "serves consumers in priority order" do
22+
results = []
23+
24+
# Create consumers with different priorities
25+
low = reactor.async do
26+
results << queue.dequeue(priority: 1)
27+
end
28+
29+
high = reactor.async do
30+
results << queue.dequeue(priority: 10)
31+
end
32+
33+
reactor.yield
34+
35+
# Add items
36+
queue.push(:first_item)
37+
queue.push(:second_item)
38+
39+
[low, high].each(&:wait)
40+
41+
# High priority should get first item
42+
expect(results).to be == [:first_item, :second_item]
43+
# Verify high priority got the first item by checking task completion order
44+
end
45+
46+
it "maintains FIFO within same priority" do
47+
results = []
48+
49+
first = reactor.async do
50+
results << [:first, queue.dequeue(priority: 5)]
51+
end
52+
53+
second = reactor.async do
54+
results << [:second, queue.dequeue(priority: 5)]
55+
end
56+
57+
reactor.yield
58+
59+
queue.push(:item1)
60+
queue.push(:item2)
61+
62+
[first, second].each(&:wait)
63+
64+
expect(results).to be == [
65+
[:first, :item1],
66+
[:second, :item2]
67+
]
68+
end
69+
70+
it "allows priority-based queue jumping" do
71+
results = []
72+
73+
# Start low priority consumer first
74+
low = reactor.async do
75+
results << [:low, queue.dequeue(priority: 1)]
76+
end
77+
78+
reactor.yield
79+
80+
# Start high priority consumer after low is waiting
81+
high = reactor.async do
82+
results << [:high, queue.dequeue(priority: 10)]
83+
end
84+
85+
reactor.yield
86+
87+
# High priority should get the first item despite arriving later
88+
queue.push(:item1)
89+
high.wait
90+
91+
queue.push(:item2)
92+
low.wait
93+
94+
expect(results).to be == [
95+
[:high, :item1],
96+
[:low, :item2]
97+
]
98+
end
99+
end
100+
101+
with "priority methods" do
102+
it "supports priority parameter in dequeue" do
103+
queue.push(:item)
104+
expect(queue.dequeue(priority: 5)).to be == :item
105+
end
106+
107+
it "supports priority parameter in pop" do
108+
queue.push(:item)
109+
expect(queue.pop(priority: 5)).to be == :item
110+
end
111+
112+
it "supports priority parameter in wait" do
113+
reactor.async {queue.push(:item)}
114+
expect(queue.wait(priority: 5)).to be == :item
115+
end
116+
117+
it "supports priority parameter in each" do
118+
items = [:item1, :item2]
119+
reactor.async do
120+
items.each {|item| queue.push(item)}
121+
queue.push(nil)
122+
end
123+
124+
results = []
125+
queue.each(priority: 5) do |item|
126+
results << item
127+
end
128+
129+
expect(results).to be == items
130+
end
131+
132+
it "supports priority parameter in async" do
133+
reactor.async do
134+
queue.push(:item)
135+
queue.push(nil)
136+
end
137+
138+
results = []
139+
queue.async(priority: 5) do |task, item|
140+
results << item
141+
end
142+
143+
expect(results).to be == [:item]
144+
end
145+
end
146+
147+
with "#waiting" do
148+
it "tracks number of waiting consumers" do
149+
expect(queue.waiting).to be == 0
150+
151+
task1 = reactor.async {queue.dequeue}
152+
reactor.yield
153+
expect(queue.waiting).to be == 1
154+
155+
task2 = reactor.async {queue.dequeue}
156+
reactor.yield
157+
expect(queue.waiting).to be == 2
158+
159+
queue.push(:item)
160+
task1.wait
161+
expect(queue.waiting).to be == 1
162+
163+
queue.push(:item)
164+
task2.wait
165+
expect(queue.waiting).to be == 0
166+
end
167+
end
168+
169+
with "edge cases" do
170+
it "handles immediate dequeue when items available" do
171+
queue.push(:item)
172+
expect(queue.dequeue(priority: 1)).to be == :item
173+
expect(queue.dequeue(priority: 10)).to be_nil if queue.empty?
174+
end
175+
176+
it "respects priority when items available but waiters exist" do
177+
queue.push(:available)
178+
179+
# Start low priority waiter
180+
low_task = reactor.async do
181+
queue.dequeue(priority: 1)
182+
end
183+
reactor.yield
184+
185+
# High priority should get available item
186+
result = queue.dequeue(priority: 10)
187+
expect(result).to be == :available
188+
189+
# Clean up
190+
queue.push(:cleanup)
191+
low_task.wait
192+
end
193+
end
194+
end
195+
end

0 commit comments

Comments
 (0)