Skip to content

Commit ea11c59

Browse files
author
Petr Chalupa
committed
Merge pull request #496 from pitr-ch/futures
Better API for future zipping
2 parents 4ed77d3 + 778f6fa commit ea11c59

File tree

2 files changed

+96
-55
lines changed

2 files changed

+96
-55
lines changed

lib/concurrent/edge/future.rb

Lines changed: 60 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,23 @@ def schedule(intended_time, default_executor = :io, &task)
7676
ScheduledPromise.new(default_executor, intended_time).future.then(&task)
7777
end
7878

79-
# Constructs new {Future} which is completed after all futures are complete. Its value is array
80-
# of dependent future values. If there is an error it fails with the first one.
81-
# @param [Event] futures
79+
# Constructs new {Future} which is completed after all futures_and_or_events are complete. Its value is array
80+
# of dependent future values. If there is an error it fails with the first one. Event does not
81+
# have a value so it's represented by nil in the array of values.
82+
# @param [Event] futures_and_or_events
8283
# @return [Future]
83-
def zip(*futures)
84-
ZipPromise.new(futures, :io).future
84+
def zip_futures(*futures_and_or_events)
85+
ZipFuturesPromise.new(futures_and_or_events, :io).future
86+
end
87+
88+
alias_method :zip, :zip_futures
89+
90+
# Constructs new {Event} which is completed after all futures_and_or_events are complete
91+
# (Future is completed when Success or Failed).
92+
# @param [Event] futures_and_or_events
93+
# @return [Event]
94+
def zip_events(*futures_and_or_events)
95+
ZipEventsPromise.new(futures_and_or_events, :io).future
8596
end
8697

8798
# Constructs new {Future} which is completed after first of the futures is complete.
@@ -95,6 +106,7 @@ def any(*futures)
95106
# @return [Future]
96107
def select(*channels)
97108
future do
109+
# noinspection RubyArgCount
98110
Channel.select do |s|
99111
channels.each do |ch|
100112
s.take(ch) { |value| [value, ch] }
@@ -503,9 +515,9 @@ def apply(block)
503515
# @!visibility private
504516
class PartiallyFailed < CompletedWithResult
505517
def initialize(value, reason)
518+
super()
506519
@Value = value
507520
@Reason = reason
508-
super()
509521
end
510522

511523
def success?
@@ -670,7 +682,7 @@ def schedule(intended_time)
670682
# Zips with selected value form the suplied channels
671683
# @return [Future]
672684
def then_select(*channels)
673-
ZipPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future
685+
ZipFuturesPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future
674686
end
675687

676688
# Changes default executor for rest of the chain
@@ -987,12 +999,16 @@ class InnerPromise < AbstractPromise
987999
# @abstract
9881000
# @!visibility private
9891001
class BlockedPromise < InnerPromise
1002+
def self.new(*args, &block)
1003+
promise = super(*args, &block)
1004+
promise.blocked_by.each { |f| f.add_callback :pr_callback_notify_blocked, promise }
1005+
promise
1006+
end
1007+
9901008
def initialize(future, blocked_by_futures, countdown)
1009+
super(future)
9911010
initialize_blocked_by(blocked_by_futures)
9921011
@Countdown = AtomicFixnum.new countdown
993-
994-
super(future)
995-
@BlockedBy.each { |f| f.add_callback :pr_callback_notify_blocked, self }
9961012
end
9971013

9981014
# @api private
@@ -1053,9 +1069,9 @@ def on_completable(done_future)
10531069
class BlockedTaskPromise < BlockedPromise
10541070
def initialize(blocked_by_future, default_executor, executor, &task)
10551071
raise ArgumentError, 'no block given' unless block_given?
1072+
super Future.new(self, default_executor), blocked_by_future, 1
10561073
@Executor = executor
10571074
@Task = task
1058-
super Future.new(self, default_executor), blocked_by_future, 1
10591075
end
10601076

10611077
def executor
@@ -1203,8 +1219,8 @@ def on_completable(done_future)
12031219
# @!visibility private
12041220
class ZipFutureEventPromise < BlockedPromise
12051221
def initialize(future, event, default_executor)
1206-
@FutureResult = future
12071222
super Future.new(self, default_executor), [future, event], 2
1223+
@FutureResult = future
12081224
end
12091225

12101226
def on_completable(done_future)
@@ -1215,9 +1231,9 @@ def on_completable(done_future)
12151231
# @!visibility private
12161232
class ZipFutureFuturePromise < BlockedPromise
12171233
def initialize(future1, future2, default_executor)
1234+
super Future.new(self, default_executor), [future1, future2], 2
12181235
@Future1Result = future1
12191236
@Future2Result = future2
1220-
super Future.new(self, default_executor), [future1, future2], 2
12211237
end
12221238

12231239
def on_completable(done_future)
@@ -1256,62 +1272,54 @@ def on_completable(done_future)
12561272
end
12571273

12581274
# @!visibility private
1259-
class ZipPromise < BlockedPromise
1275+
class ZipFuturesPromise < BlockedPromise
12601276

12611277
private
12621278

12631279
def initialize(blocked_by_futures, default_executor)
1264-
klass = Event
1265-
blocked_by_futures.each do |f|
1266-
if f.is_a?(Future)
1267-
if klass == Event
1268-
klass = Future
1269-
break
1270-
end
1271-
end
1272-
end
1273-
1274-
# noinspection RubyArgCount
1275-
super(klass.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
1280+
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
12761281

1277-
if blocked_by_futures.empty?
1278-
on_completable nil
1279-
end
1282+
on_completable nil if blocked_by_futures.empty?
12801283
end
12811284

12821285
def on_completable(done_future)
12831286
all_success = true
1284-
values = []
1285-
reasons = []
1286-
1287-
blocked_by.each do |future|
1288-
next unless future.is_a?(Future)
1289-
success, value, reason = future.result
1287+
values = Array.new(blocked_by.size)
1288+
reasons = Array.new(blocked_by.size)
12901289

1291-
unless success
1292-
all_success = false
1290+
blocked_by.each_with_index do |future, i|
1291+
if future.is_a?(Future)
1292+
success, values[i], reasons[i] = future.result
1293+
all_success &&= success
1294+
else
1295+
values[i] = reasons[i] = nil
12931296
end
1294-
1295-
values << value
1296-
reasons << reason
12971297
end
12981298

12991299
if all_success
1300-
if values.empty?
1301-
complete_with Event::COMPLETED
1302-
else
1303-
if values.size == 1
1304-
complete_with Future::Success.new(values.first)
1305-
else
1306-
complete_with Future::SuccessArray.new(values)
1307-
end
1308-
end
1300+
complete_with Future::SuccessArray.new(values)
13091301
else
13101302
complete_with Future::PartiallyFailed.new(values, reasons)
13111303
end
13121304
end
13131305
end
13141306

1307+
# @!visibility private
1308+
class ZipEventsPromise < BlockedPromise
1309+
1310+
private
1311+
1312+
def initialize(blocked_by_futures, default_executor)
1313+
super(Event.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
1314+
1315+
on_completable nil if blocked_by_futures.empty?
1316+
end
1317+
1318+
def on_completable(done_future)
1319+
complete_with Event::COMPLETED
1320+
end
1321+
end
1322+
13151323
# @!visibility private
13161324
class AnyPromise < BlockedPromise
13171325

@@ -1354,8 +1362,8 @@ def touch
13541362
private
13551363

13561364
def initialize(default_executor, value)
1357-
@Value = value
13581365
super Future.new(self, default_executor)
1366+
@Value = value
13591367
end
13601368
end
13611369

@@ -1373,6 +1381,8 @@ def inspect
13731381
private
13741382

13751383
def initialize(default_executor, intended_time)
1384+
super Event.new(self, default_executor)
1385+
13761386
@IntendedTime = intended_time
13771387

13781388
in_seconds = begin
@@ -1385,8 +1395,6 @@ def initialize(default_executor, intended_time)
13851395
[0, schedule_time.to_f - now.to_f].max
13861396
end
13871397

1388-
super Event.new(self, default_executor)
1389-
13901398
Concurrent.global_timer_set.post(in_seconds) do
13911399
@Future.complete_with Event::COMPLETED
13921400
end

spec/concurrent/edge/future_spec.rb

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
specify do
110110
completable_future = Concurrent.future
111111
one = completable_future.then(&:succ)
112-
join = Concurrent.zip(completable_future).then { |v| v }
112+
join = Concurrent.zip_futures(completable_future).then { |v| v }
113113
expect(one.completed?).to be false
114114
completable_future.success 0
115115
expect(one.value!).to eq 1
@@ -138,25 +138,43 @@
138138
end
139139

140140
describe '.zip' do
141-
it 'continues on first result' do
141+
it 'waits for all results' do
142142
a = Concurrent.future { 1 }
143143
b = Concurrent.future { 2 }
144144
c = Concurrent.future { 3 }
145145

146146
z1 = a & b
147147
z2 = Concurrent.zip a, b, c
148+
z3 = Concurrent.zip a
149+
z4 = Concurrent.zip
148150

149151
expect(z1.value!).to eq [1, 2]
150152
expect(z2.value!).to eq [1, 2, 3]
153+
expect(z3.value!).to eq [1]
154+
expect(z4.value!).to eq []
151155

152156
q = Queue.new
153157
z1.then { |*args| q << args }
154158
expect(q.pop).to eq [1, 2]
159+
155160
z1.then { |a, b, c| q << [a, b, c] }
156161
expect(q.pop).to eq [1, 2, nil]
162+
157163
z2.then { |a, b, c| q << [a, b, c] }
158164
expect(q.pop).to eq [1, 2, 3]
159165

166+
z3.then { |a| q << a }
167+
expect(q.pop).to eq 1
168+
169+
z3.then { |*a| q << a }
170+
expect(q.pop).to eq [1]
171+
172+
z4.then { |a| q << a }
173+
expect(q.pop).to eq nil
174+
175+
z4.then { |*a| q << a }
176+
expect(q.pop).to eq []
177+
160178
expect(z1.then { |a, b| a+b }.value!).to eq 3
161179
expect(z1.then { |a, b| a+b }.value!).to eq 3
162180
expect(z1.then(&:+).value!).to eq 3
@@ -188,7 +206,22 @@
188206
end
189207

190208
end
209+
end
210+
211+
describe '.zip_events' do
212+
it 'waits for all and returns event' do
213+
a = Concurrent.succeeded_future 1
214+
b = Concurrent.failed_future :any
215+
c = Concurrent.event.complete
191216

217+
z2 = Concurrent.zip_events a, b, c
218+
z3 = Concurrent.zip_events a
219+
z4 = Concurrent.zip_events
220+
221+
expect(z2.completed?).to be_truthy
222+
expect(z3.completed?).to be_truthy
223+
expect(z4.completed?).to be_truthy
224+
end
192225
end
193226

194227
describe 'Future' do
@@ -340,7 +373,7 @@
340373
f.wait 1
341374
expect(f).to be_completed
342375
expect(f).to be_failed
343-
expect{ f.value! }.to raise_error(Exception, 'fail')
376+
expect { f.value! }.to raise_error(Exception, 'fail')
344377
end
345378
end
346379

0 commit comments

Comments
 (0)