Skip to content

Commit eadb12c

Browse files
committed
Allow to pass args to Futures as to Threads
1 parent f2474bf commit eadb12c

File tree

2 files changed

+116
-39
lines changed

2 files changed

+116
-39
lines changed

lib/concurrent/edge/future.rb

Lines changed: 90 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module Concurrent
2020
module Edge
2121

2222
module FutureShortcuts
23-
# User is responsible for completing the event once.
23+
# User is responsible for completing the event once by {CompletableEvent#complete}
2424
# @return [CompletableEvent]
2525
def event(default_executor = :io)
2626
CompletableEventPromise.new(default_executor).future
@@ -29,42 +29,57 @@ def event(default_executor = :io)
2929
# @overload future(default_executor = :io, &task)
3030
# Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately.
3131
# @return [Future]
32-
# @note FIXME allow to pass in variables as Thread.new(args) {|args| _ } does
3332
# @overload future(default_executor = :io)
34-
# User is responsible for completing the future once.
33+
# User is responsible for completing the future once by {CompletableFuture#success} or {CompletableFuture#fail}
3534
# @return [CompletableFuture]
36-
def future(default_executor = :io, &task)
35+
def future(*args, &task)
36+
future_on :io, *args, &task
37+
end
38+
39+
def future_on(default_executor, *args, &task)
3740
if task
38-
ImmediatePromise.new(default_executor).event.chain(&task)
41+
ImmediatePromise.new(default_executor, *args).future.then(&task)
3942
else
4043
CompletableFuturePromise.new(default_executor).future
4144
end
4245
end
4346

4447
alias_method :async, :future
4548

46-
# Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delayed until
47-
# requested by `#wait`, `#value`, `#value!`, etc.
49+
# Constructs new Future which will evaluate to the block after
50+
# requested by calling `#wait`, `#value`, `#value!`, etc. on it or on any of the chained futures.
4851
# @return [Delay]
49-
def delay(default_executor = :io, &task)
50-
Delay.new(default_executor).event.chain(&task)
52+
def delay(*args, &task)
53+
delay_on :io, *args, &task
54+
end
55+
56+
def delay_on(default_executor, *args, &task)
57+
Delay.new(default_executor, *args).future.then(&task)
5158
end
5259

5360
# Schedules the block to be executed on executor in given intended_time.
5461
# @return [Future]
55-
def schedule(intended_time, default_executor = :io, &task)
56-
ScheduledPromise.new(intended_time, default_executor).future.chain(&task)
62+
def schedule(intended_time, *args, &task)
63+
schedule_on :io, intended_time, *args, &task
64+
end
65+
66+
def schedule_on(default_executor, intended_time, *args, &task)
67+
ScheduledPromise.new(default_executor, intended_time, *args).future.then(&task)
5768
end
5869

59-
# fails on first error
60-
# does not block a thread
70+
# Constructs new {Future} which is completed after all futures are complete. Its value is array
71+
# of dependent future values. If there is an error it fails with the first one.
72+
# @param [Event] futures
6173
# @return [Future]
6274
def zip(*futures)
63-
AllPromise.new(futures).future
75+
AllPromise.new(futures, :io).future
6476
end
6577

78+
# Constructs new {Future} which is completed after first of the futures is complete.
79+
# @param [Event] futures
80+
# @return [Future]
6681
def any(*futures)
67-
AnyPromise.new(futures).future
82+
AnyPromise.new(futures, :io).future
6883
end
6984

7085
def post!(*args, &job)
@@ -89,7 +104,7 @@ def post_on(executor, *args, &job)
89104
class Event < Synchronization::Object
90105
extend FutureShortcuts
91106

92-
def initialize(promise, default_executor = :io)
107+
def initialize(promise, default_executor)
93108
@Promise = promise
94109
@DefaultExecutor = default_executor
95110
@Touched = AtomicBoolean.new(false)
@@ -157,7 +172,7 @@ def delay
157172
end
158173

159174
def schedule(intended_time)
160-
chain { ScheduledPromise.new(intended_time).future.zip(self) }.flat
175+
chain { ScheduledPromise.new(@DefaultExecutor, intended_time).event.zip(self) }.flat
161176
end
162177

163178
# @yield [success, value, reason] executed async on `executor` when completed
@@ -172,7 +187,7 @@ def on_completion!(&callback)
172187
add_callback :pr_callback_on_completion, callback
173188
end
174189

175-
def with_default_executor(executor = @DefaultExecutor)
190+
def with_default_executor(executor)
176191
AllPromise.new([self], executor).future
177192
end
178193

@@ -615,7 +630,7 @@ def evaluate_to(*args, block)
615630
class CompletableEventPromise < AbstractPromise
616631
public :complete
617632

618-
def initialize(default_executor = :io)
633+
def initialize(default_executor)
619634
super CompletableEvent.new(self, default_executor)
620635
end
621636
end
@@ -624,7 +639,7 @@ def initialize(default_executor = :io)
624639
class CompletableFuturePromise < AbstractPromise
625640
# TODO consider to allow being blocked_by
626641

627-
def initialize(default_executor = :io)
642+
def initialize(default_executor)
628643
super CompletableFuture.new(self, default_executor)
629644
end
630645

@@ -732,7 +747,7 @@ def on_completable(done_future)
732747

733748
# @abstract
734749
class BlockedTaskPromise < BlockedPromise
735-
def initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task)
750+
def initialize(blocked_by_future, default_executor, executor, &task)
736751
raise ArgumentError, 'no block given' unless block_given?
737752
@Executor = executor
738753
@Task = task
@@ -747,7 +762,7 @@ def executor
747762
class ThenPromise < BlockedTaskPromise
748763
private
749764

750-
def initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task)
765+
def initialize(blocked_by_future, default_executor, executor, &task)
751766
raise ArgumentError, 'only Future can be appended with then' unless blocked_by_future.is_a? Future
752767
super blocked_by_future, default_executor, executor, &task
753768
end
@@ -766,7 +781,7 @@ def on_completable(done_future)
766781
class RescuePromise < BlockedTaskPromise
767782
private
768783

769-
def initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task)
784+
def initialize(blocked_by_future, default_executor, executor, &task)
770785
raise ArgumentError, 'only Future can be rescued' unless blocked_by_future.is_a? Future
771786
super blocked_by_future, default_executor, executor, &task
772787
end
@@ -794,8 +809,16 @@ def on_completable(done_future)
794809

795810
# will be immediately completed
796811
class ImmediatePromise < InnerPromise
797-
def initialize(default_executor = :io)
798-
super Event.new(self, default_executor).complete
812+
def initialize(default_executor, *args)
813+
# TODO optimize, create completed futures directly
814+
super case args.size
815+
when 0
816+
Event.new(self, default_executor).complete
817+
when 1
818+
Future.new(self, default_executor).complete(true, args[0], nil)
819+
else
820+
ArrayFuture.new(self, default_executor).complete(true, args, nil)
821+
end
799822
end
800823
end
801824

@@ -808,7 +831,7 @@ def blocked_by
808831

809832
def process_on_done(future)
810833
countdown = super(future)
811-
value = future.value
834+
value = future.value!
812835
if countdown.nonzero?
813836
case value
814837
when Future
@@ -824,7 +847,7 @@ def process_on_done(future)
824847
countdown
825848
end
826849

827-
def initialize(blocked_by_future, levels = 1, default_executor = :io)
850+
def initialize(blocked_by_future, levels, default_executor)
828851
raise ArgumentError, 'levels has to be higher than 0' if levels < 1
829852
blocked_by_future.is_a? Future or
830853
raise ArgumentError, 'only Future can be flatten'
@@ -851,7 +874,7 @@ class AllPromise < BlockedPromise
851874

852875
private
853876

854-
def initialize(blocked_by_futures, default_executor = :io)
877+
def initialize(blocked_by_futures, default_executor)
855878
klass = Event
856879
blocked_by_futures.each do |f|
857880
if f.is_a?(Future)
@@ -901,7 +924,7 @@ class AnyPromise < BlockedPromise
901924

902925
private
903926

904-
def initialize(blocked_by_futures, default_executor = :io)
927+
def initialize(blocked_by_futures, default_executor)
905928
blocked_by_futures.all? { |f| f.is_a? Future } or
906929
raise ArgumentError, 'accepts only Futures not Events'
907930
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
@@ -918,13 +941,28 @@ def on_completable(done_future)
918941

919942
class Delay < InnerPromise
920943
def touch
921-
complete
944+
case @Args.size
945+
when 0
946+
@Future.complete
947+
when 1
948+
@Future.complete(true, @Args[0], nil)
949+
else
950+
@Future.complete(true, @Args, nil)
951+
end
922952
end
923953

924954
private
925955

926-
def initialize(default_executor = :io)
927-
super Event.new(self, default_executor)
956+
def initialize(default_executor, *args)
957+
@Args = args
958+
super case args.size
959+
when 0
960+
Event.new(self, default_executor)
961+
when 1
962+
Future.new(self, default_executor)
963+
else
964+
ArrayFuture.new(self, default_executor)
965+
end
928966
end
929967
end
930968

@@ -940,7 +978,7 @@ def inspect
940978

941979
private
942980

943-
def initialize(intended_time, default_executor = :io)
981+
def initialize(default_executor, intended_time, *args)
944982
@IntendedTime = intended_time
945983

946984
in_seconds = begin
@@ -953,9 +991,26 @@ def initialize(intended_time, default_executor = :io)
953991
[0, schedule_time.to_f - now.to_f].max
954992
end
955993

956-
super Event.new(self, default_executor)
994+
super case args.size
995+
when 0
996+
Event.new(self, default_executor)
997+
when 1
998+
Future.new(self, default_executor)
999+
else
1000+
ArrayFuture.new(self, default_executor)
1001+
end
1002+
1003+
Concurrent.global_timer_set.post(in_seconds, *args) do |*args|
1004+
case args.size
1005+
when 0
1006+
@Future.complete
1007+
when 1
1008+
@Future.complete(true, args[0], nil)
1009+
else
1010+
@Future.complete(true, args, nil)
1011+
end
1012+
end
9571013

958-
Concurrent.global_timer_set.post(in_seconds) { complete }
9591014
end
9601015
end
9611016
end

spec/concurrent/edge/future_spec.rb

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
require 'concurrent'
1+
require 'concurrent-edge'
22
require 'thread'
33

44
describe 'Concurrent::Edge futures' do
@@ -16,16 +16,30 @@
1616

1717
describe '.future' do
1818
it 'executes' do
19-
future = Concurrent.future(:immediate) { 1 + 1 }
20-
expect(future.value).to eq 2
19+
future = Concurrent.future { 1 + 1 }
20+
expect(future.value!).to eq 2
21+
22+
future = Concurrent.future(1) { |v| v + 1 }
23+
expect(future.value!).to eq 2
24+
25+
future = Concurrent.future(1, 1, &:+)
26+
expect(future.value!).to eq 2
2127
end
2228
end
2329

2430
describe '.delay' do
2531
it 'delays execution' do
2632
delay = Concurrent.delay { 1 + 1 }
2733
expect(delay.completed?).to eq false
28-
expect(delay.value).to eq 2
34+
expect(delay.value!).to eq 2
35+
36+
delay = Concurrent.delay(1) { |v| v + 1 }
37+
expect(delay.completed?).to eq false
38+
expect(delay.value!).to eq 2
39+
40+
delay = Concurrent.delay(1, 1, &:+)
41+
expect(delay.completed?).to eq false
42+
expect(delay.value!).to eq 2
2943
end
3044
end
3145

@@ -38,6 +52,14 @@
3852
expect(future.value).to eq queue
3953
expect(queue.pop).to eq 2
4054
expect(queue.pop).to be_between(0.1, 0.2)
55+
56+
start = Time.now.to_f
57+
queue = Queue.new
58+
future = Concurrent.schedule(0.1, 1, 1, &:+).then { |v| queue.push(v); queue.push(Time.now.to_f - start); queue }
59+
60+
expect(future.value).to eq queue
61+
expect(queue.pop).to eq 2
62+
expect(queue.pop).to be_between(0.1, 0.2)
4163
end
4264

4365
it 'scheduled execution in graph' do

0 commit comments

Comments
 (0)