Skip to content

Commit 3686738

Browse files
committed
Better API for future zipping
1 parent 7cb2dc6 commit 3686738

File tree

2 files changed

+81
-45
lines changed

2 files changed

+81
-45
lines changed

lib/concurrent/edge/future.rb

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,23 @@ def schedule(intended_time, default_executor = :io, &task)
7676
ScheduledPromise.new(default_executor, intended_time).future.then(&task)
7777
end
7878

79-
# Constructs new {Future} which is completed after all futures are complete. Its value is array
80-
# of dependent future values. If there is an error it fails with the first one.
81-
# @param [Event] futures
79+
# Constructs new {Future} which is completed after all futures_and_or_events are complete. Its value is array
80+
# of dependent future values. If there is an error it fails with the first one. Event does not
81+
# have a value so it's represented by nil in the array of values.
82+
# @param [Event] futures_and_or_events
8283
# @return [Future]
83-
def zip(*futures)
84-
ZipPromise.new(futures, :io).future
84+
def zip_futures(*futures_and_or_events)
85+
ZipFuturesPromise.new(futures_and_or_events, :io).future
86+
end
87+
88+
alias_method :zip, :zip_futures
89+
90+
# Constructs new {Event} which is completed after all futures_and_or_events are complete
91+
# (Future is completed when Success or Failed).
92+
# @param [Event] futures_and_or_events
93+
# @return [Event]
94+
def zip_events(*futures_and_or_events)
95+
ZipEventsPromise.new(futures_and_or_events, :io).future
8596
end
8697

8798
# Constructs new {Future} which is completed after first of the futures is complete.
@@ -670,7 +681,7 @@ def schedule(intended_time)
670681
# Zips with selected value form the suplied channels
671682
# @return [Future]
672683
def then_select(*channels)
673-
ZipPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future
684+
ZipFuturesPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future
674685
end
675686

676687
# Changes default executor for rest of the chain
@@ -1256,62 +1267,54 @@ def on_completable(done_future)
12561267
end
12571268

12581269
# @!visibility private
1259-
class ZipPromise < BlockedPromise
1270+
class ZipFuturesPromise < BlockedPromise
12601271

12611272
private
12621273

12631274
def initialize(blocked_by_futures, default_executor)
1264-
klass = Event
1265-
blocked_by_futures.each do |f|
1266-
if f.is_a?(Future)
1267-
if klass == Event
1268-
klass = Future
1269-
break
1270-
end
1271-
end
1272-
end
1273-
1274-
# noinspection RubyArgCount
1275-
super(klass.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
1275+
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
12761276

1277-
if blocked_by_futures.empty?
1278-
on_completable nil
1279-
end
1277+
on_completable nil if blocked_by_futures.empty?
12801278
end
12811279

12821280
def on_completable(done_future)
12831281
all_success = true
1284-
values = []
1285-
reasons = []
1286-
1287-
blocked_by.each do |future|
1288-
next unless future.is_a?(Future)
1289-
success, value, reason = future.result
1282+
values = Array.new(blocked_by.size)
1283+
reasons = Array.new(blocked_by.size)
12901284

1291-
unless success
1292-
all_success = false
1285+
blocked_by.each_with_index do |future, i|
1286+
if future.is_a?(Future)
1287+
success, values[i], reasons[i] = future.result
1288+
all_success &&= success
1289+
else
1290+
values[i] = reasons[i] = nil
12931291
end
1294-
1295-
values << value
1296-
reasons << reason
12971292
end
12981293

12991294
if all_success
1300-
if values.empty?
1301-
complete_with Event::COMPLETED
1302-
else
1303-
if values.size == 1
1304-
complete_with Future::Success.new(values.first)
1305-
else
1306-
complete_with Future::SuccessArray.new(values)
1307-
end
1308-
end
1295+
complete_with Future::SuccessArray.new(values)
13091296
else
13101297
complete_with Future::PartiallyFailed.new(values, reasons)
13111298
end
13121299
end
13131300
end
13141301

1302+
# @!visibility private
1303+
class ZipEventsPromise < BlockedPromise
1304+
1305+
private
1306+
1307+
def initialize(blocked_by_futures, default_executor)
1308+
super(Event.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
1309+
1310+
on_completable nil if blocked_by_futures.empty?
1311+
end
1312+
1313+
def on_completable(done_future)
1314+
complete_with Event::COMPLETED
1315+
end
1316+
end
1317+
13151318
# @!visibility private
13161319
class AnyPromise < BlockedPromise
13171320

spec/concurrent/edge/future_spec.rb

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
specify do
110110
completable_future = Concurrent.future
111111
one = completable_future.then(&:succ)
112-
join = Concurrent.zip(completable_future).then { |v| v }
112+
join = Concurrent.zip_futures(completable_future).then { |v| v }
113113
expect(one.completed?).to be false
114114
completable_future.success 0
115115
expect(one.value!).to eq 1
@@ -138,25 +138,43 @@
138138
end
139139

140140
describe '.zip' do
141-
it 'continues on first result' do
141+
it 'waits for all results' do
142142
a = Concurrent.future { 1 }
143143
b = Concurrent.future { 2 }
144144
c = Concurrent.future { 3 }
145145

146146
z1 = a & b
147147
z2 = Concurrent.zip a, b, c
148+
z3 = Concurrent.zip a
149+
z4 = Concurrent.zip
148150

149151
expect(z1.value!).to eq [1, 2]
150152
expect(z2.value!).to eq [1, 2, 3]
153+
expect(z3.value!).to eq [1]
154+
expect(z4.value!).to eq []
151155

152156
q = Queue.new
153157
z1.then { |*args| q << args }
154158
expect(q.pop).to eq [1, 2]
159+
155160
z1.then { |a, b, c| q << [a, b, c] }
156161
expect(q.pop).to eq [1, 2, nil]
162+
157163
z2.then { |a, b, c| q << [a, b, c] }
158164
expect(q.pop).to eq [1, 2, 3]
159165

166+
z3.then { |a| q << a }
167+
expect(q.pop).to eq 1
168+
169+
z3.then { |*a| q << a }
170+
expect(q.pop).to eq [1]
171+
172+
z4.then { |a| q << a }
173+
expect(q.pop).to eq nil
174+
175+
z4.then { |*a| q << a }
176+
expect(q.pop).to eq []
177+
160178
expect(z1.then { |a, b| a+b }.value!).to eq 3
161179
expect(z1.then { |a, b| a+b }.value!).to eq 3
162180
expect(z1.then(&:+).value!).to eq 3
@@ -188,7 +206,22 @@
188206
end
189207

190208
end
209+
end
210+
211+
describe '.zip_events' do
212+
it 'waits for all and returns event' do
213+
a = Concurrent.succeeded_future 1
214+
b = Concurrent.failed_future :any
215+
c = Concurrent.event.complete
191216

217+
z2 = Concurrent.zip_events a, b, c
218+
z3 = Concurrent.zip_events a
219+
z4 = Concurrent.zip_events
220+
221+
expect(z2.completed?).to be_truthy
222+
expect(z3.completed?).to be_truthy
223+
expect(z4.completed?).to be_truthy
224+
end
192225
end
193226

194227
describe 'Future' do
@@ -340,7 +373,7 @@
340373
f.wait 1
341374
expect(f).to be_completed
342375
expect(f).to be_failed
343-
expect{ f.value! }.to raise_error(Exception, 'fail')
376+
expect { f.value! }.to raise_error(Exception, 'fail')
344377
end
345378
end
346379

0 commit comments

Comments
 (0)