Skip to content

Commit 426c42e

Browse files
bruno-ioquatix
andauthored
Enable enqueuing multiple items to Async::Queue (#81)
Co-authored-by: Samuel Williams <[email protected]>
1 parent 5fa36dc commit 426c42e

File tree

2 files changed

+74
-6
lines changed

2 files changed

+74
-6
lines changed

lib/async/queue.rb

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ def empty?
4343
@items.empty?
4444
end
4545

46-
def enqueue(item)
47-
@items.push(item)
46+
def <<(item)
47+
@items << item
4848

4949
self.signal unless self.empty?
5050
end
5151

52-
def <<(item)
53-
enqueue(item)
52+
def enqueue(*items)
53+
@items.concat(items)
54+
55+
self.signal unless self.empty?
5456
end
5557

5658
def dequeue
@@ -91,14 +93,27 @@ def limited?
9193
@items.size >= @limit
9294
end
9395

94-
def enqueue item
96+
def <<(item)
9597
while limited?
9698
@full.wait
9799
end
98100

99101
super
100102
end
101103

104+
def enqueue *items
105+
while !items.empty?
106+
while limited?
107+
@full.wait
108+
end
109+
110+
available = @limit - @items.size
111+
@items.concat(items.shift(available))
112+
113+
self.signal unless self.empty?
114+
end
115+
end
116+
102117
def dequeue
103118
item = super
104119

spec/async/queue_spec.rb

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@
4242
end
4343
end
4444

45+
it 'can enqueue multiple items' do
46+
items = Array.new(10) { rand(10) }
47+
48+
reactor.async do |task|
49+
subject.enqueue(*items)
50+
end
51+
52+
items.each do |item|
53+
expect(subject.dequeue).to be == item
54+
end
55+
end
56+
4557
it 'can dequeue items asynchronously' do
4658
reactor.async do |task|
4759
subject << 1
@@ -53,6 +65,14 @@
5365
end
5466
end
5567

68+
describe '#<<' do
69+
it 'adds an item to the queue' do
70+
subject << :item
71+
expect(subject.size).to be == 1
72+
expect(subject.dequeue).to be == :item
73+
end
74+
end
75+
5676
describe '#size' do
5777
it 'returns queue size' do
5878
expect(subject.size).to be == 0
@@ -117,7 +137,17 @@
117137
subject.enqueue(10)
118138
expect(subject).to be_limited
119139
end
120-
140+
141+
it 'enqueues items up to a limit' do
142+
items = Array.new(2) { rand(10) }
143+
reactor.async do
144+
subject.enqueue(*items)
145+
end
146+
147+
expect(subject.size).to be 1
148+
expect(subject.dequeue).to be == items.first
149+
end
150+
121151
it 'should resume waiting tasks in order' do
122152
total_resumed = 0
123153
total_dequeued = 0
@@ -138,4 +168,27 @@
138168
expect(total_resumed).to be == total_dequeued
139169
end
140170
end
171+
172+
describe '#<<' do
173+
context 'when queue is limited' do
174+
before do
175+
subject << :item1
176+
expect(subject.size).to be == 1
177+
expect(subject).to be_limited
178+
end
179+
180+
it 'waits until a queue is dequeued' do
181+
reactor.async do
182+
subject << :item2
183+
end
184+
185+
reactor.async do |task|
186+
task.sleep 0.01
187+
expect(subject.items).to contain_exactly :item1
188+
subject.dequeue
189+
expect(subject.items).to contain_exactly :item2
190+
end
191+
end
192+
end
193+
end
141194
end

0 commit comments

Comments
 (0)