Skip to content

Commit f2ad157

Browse files
bruno-ioquatix
andcommitted
Enable enqueuing multiple items to Async::Queue (#81)
Co-authored-by: Samuel Williams <[email protected]>
1 parent 1a20746 commit f2ad157

File tree

2 files changed

+74
-6
lines changed

2 files changed

+74
-6
lines changed

lib/async/queue.rb

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,16 @@ def empty?
4242
@items.empty?
4343
end
4444

45-
def enqueue(item)
46-
@items.push(item)
45+
def <<(item)
46+
@items << item
4747

4848
self.signal unless self.empty?
4949
end
5050

51-
def <<(item)
52-
enqueue(item)
51+
def enqueue(*items)
52+
@items.concat(items)
53+
54+
self.signal unless self.empty?
5355
end
5456

5557
def dequeue
@@ -89,14 +91,27 @@ def limited?
8991
@items.size >= @limit
9092
end
9193

92-
def enqueue item
94+
def <<(item)
9395
while limited?
9496
@full.wait
9597
end
9698

9799
super
98100
end
99101

102+
def enqueue *items
103+
while !items.empty?
104+
while limited?
105+
@full.wait
106+
end
107+
108+
available = @limit - @items.size
109+
@items.concat(items.shift(available))
110+
111+
self.signal unless self.empty?
112+
end
113+
end
114+
100115
def dequeue
101116
item = super
102117

spec/async/queue_spec.rb

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@
4242
end
4343
end
4444

45+
it 'can enqueue multiple items' do
46+
items = Array.new(10) { rand(10) }
47+
48+
reactor.async do |task|
49+
subject.enqueue(*items)
50+
end
51+
52+
items.each do |item|
53+
expect(subject.dequeue).to be == item
54+
end
55+
end
56+
4557
it 'can dequeue items asynchronously' do
4658
reactor.async do |task|
4759
subject << 1
@@ -53,6 +65,14 @@
5365
end
5466
end
5567

68+
describe '#<<' do
69+
it 'adds an item to the queue' do
70+
subject << :item
71+
expect(subject.size).to be == 1
72+
expect(subject.dequeue).to be == :item
73+
end
74+
end
75+
5676
describe '#size' do
5777
it 'returns queue size' do
5878
reactor.async do |task|
@@ -120,7 +140,17 @@
120140
subject.enqueue(10)
121141
expect(subject).to be_limited
122142
end
123-
143+
144+
it 'enqueues items up to a limit' do
145+
items = Array.new(2) { rand(10) }
146+
reactor.async do
147+
subject.enqueue(*items)
148+
end
149+
150+
expect(subject.size).to be 1
151+
expect(subject.dequeue).to be == items.first
152+
end
153+
124154
it 'should resume waiting tasks in order' do
125155
total_resumed = 0
126156
total_dequeued = 0
@@ -141,4 +171,27 @@
141171
expect(total_resumed).to be == total_dequeued
142172
end
143173
end
174+
175+
describe '#<<' do
176+
context 'when queue is limited' do
177+
before do
178+
subject << :item1
179+
expect(subject.size).to be == 1
180+
expect(subject).to be_limited
181+
end
182+
183+
it 'waits until a queue is dequeued' do
184+
reactor.async do
185+
subject << :item2
186+
end
187+
188+
reactor.async do |task|
189+
task.sleep 0.01
190+
expect(subject.items).to contain_exactly :item1
191+
subject.dequeue
192+
expect(subject.items).to contain_exactly :item2
193+
end
194+
end
195+
end
196+
end
144197
end

0 commit comments

Comments
 (0)