Skip to content

Commit 701a6b6

Browse files
committed
Improvements
- :io is now default executor - Concurrent.future without block returns CompletableFuture - Concurrent.event returns CompletableEvent
1 parent 8fd8bf2 commit 701a6b6

File tree

2 files changed

+111
-54
lines changed

2 files changed

+111
-54
lines changed

lib/concurrent/edge/future.rb

Lines changed: 85 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,39 @@ module Edge
1010
module FutureShortcuts
1111
# TODO to construct event to be set later to trigger rest of the tree
1212

13-
def event(default_executor = :fast)
14-
CompletableEvent.new(default_executor)
15-
end
16-
17-
# Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately.
18-
# @return [Future]
19-
# @note TODO allow to pass in variables as Thread.new(args) {|args| _ } does
20-
def future(default_executor = :fast, &task)
21-
ImmediatePromise.new(default_executor).event.chain(&task)
13+
# User is responsible for completing the event once.
14+
# @return [CompletableEvent]
15+
def event(default_executor = :io)
16+
CompletableEventPromise.new(default_executor).future
17+
end
18+
19+
# @overload future(default_executor = :io, &task)
20+
# Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately.
21+
# @return [Future]
22+
# @note FIXME allow to pass in variables as Thread.new(args) {|args| _ } does
23+
# @overload future(default_executor = :io)
24+
# User is responsible for completing the future once.
25+
# @return [CompletableFuture]
26+
def future(default_executor = :io, &task)
27+
if task
28+
ImmediatePromise.new(default_executor).event.chain(&task)
29+
else
30+
CompletableFuturePromise.new(default_executor).future
31+
end
2232
end
2333

2434
alias_method :async, :future
2535

2636
# Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delays until
2737
# requested by {Future#wait} method, {Future#value} and {Future#value!} methods are calling {Future#wait} internally.
2838
# @return [Delay]
29-
def delay(default_executor = :fast, &task)
39+
def delay(default_executor = :io, &task)
3040
Delay.new(default_executor).event.chain(&task)
3141
end
3242

33-
# Constructs {Promise} which helds its {Future} in {AbstractPromise#future} method. Intended for completion by user.
34-
# User is responsible not to complete the Promise twice.
35-
# @return [AbstractPromise] in this case instance of {OuterPromise}
36-
def promise(default_executor = :fast)
37-
CompletablePromise.new(default_executor)
38-
end
39-
4043
# Schedules the block to be executed on executor in given intended_time.
4144
# @return [Future]
42-
def schedule(intended_time, default_executor = :fast, &task)
45+
def schedule(intended_time, default_executor = :io, &task)
4346
ScheduledPromise.new(intended_time, default_executor).future.chain(&task)
4447
end
4548

@@ -82,7 +85,7 @@ class Event < Synchronization::Object
8285
extend FutureShortcuts
8386

8487
# @api private
85-
def initialize(promise, default_executor = :fast)
88+
def initialize(promise, default_executor = :io)
8689
super()
8790
synchronize { ns_initialize(promise, default_executor) }
8891
end
@@ -196,7 +199,7 @@ def with_default_executor(executor = default_executor)
196199

197200
private
198201

199-
def ns_initialize(promise, default_executor = :fast)
202+
def ns_initialize(promise, default_executor = :io)
200203
@promise = promise
201204
@state = :pending
202205
@callbacks = []
@@ -375,13 +378,14 @@ def wait!(timeout = nil)
375378
# @raise [Exception] when #failed? it raises #reason
376379
# @return [Object] see Dereferenceable#deref
377380
def value!(timeout = nil)
381+
touch
378382
synchronize { ns_value! timeout }
379383
end
380384

381-
# @example allows Obligation to be risen
382-
# failed_ivar = Ivar.new.fail
383-
# raise failed_ivar
385+
# @example allows failed Future to be risen
386+
# raise Concurrent.future.fail
384387
def exception(*args)
388+
touch
385389
synchronize { ns_exception(*args) }
386390
end
387391

@@ -447,7 +451,7 @@ def ns_add_callback(method, *args)
447451

448452
private
449453

450-
def ns_initialize(promise, default_executor = :fast)
454+
def ns_initialize(promise, default_executor = :io)
451455
super(promise, default_executor)
452456
@value = nil
453457
@reason = nil
@@ -528,8 +532,6 @@ def ns_complete(success, value, reason, raise = true)
528532
callbacks
529533
end
530534

531-
private
532-
533535
def ns_complete_state(success, value, reason)
534536
if success
535537
@value = value
@@ -573,6 +575,46 @@ def pr_async_callback_on_completion(success, value, reason, executor, callback)
573575
end
574576
end
575577

578+
class CompletableEvent < Event
579+
# Complete the event
580+
# @api public
581+
def complete(raise = true)
582+
super raise
583+
end
584+
end
585+
586+
class CompletableFuture < Future
587+
# Complete the future
588+
# @api public
589+
def complete(success, value, reason, raise = true)
590+
super success, value, reason, raise
591+
end
592+
593+
def success(value)
594+
promise.success(value)
595+
end
596+
597+
def try_success(value)
598+
promise.try_success(value)
599+
end
600+
601+
def fail(reason = StandardError.new)
602+
promise.fail(reason)
603+
end
604+
605+
def try_fail(reason = StandardError.new)
606+
promise.try_fail(reason)
607+
end
608+
609+
def evaluate_to(*args, &block)
610+
promise.evaluate_to(*args, &block)
611+
end
612+
613+
def evaluate_to!(*args, &block)
614+
promise.evaluate_to!(*args, &block)
615+
end
616+
end
617+
576618
# TODO modularize blocked_by and notify blocked
577619

578620
# @abstract
@@ -638,21 +680,21 @@ def pr_evaluate_to(future, *args, &block)
638680
end
639681
end
640682

641-
class CompletableEvent < AbstractPromise
683+
class CompletableEventPromise < AbstractPromise
642684
public :complete
643685

644686
private
645687

646-
def ns_initialize(default_executor = :fast)
647-
super Event.new(self, default_executor)
688+
def ns_initialize(default_executor = :io)
689+
super CompletableEvent.new(self, default_executor)
648690
end
649691
end
650692

651693
# @note Be careful not to fullfill the promise twice
652694
# @example initialization
653695
# Concurrent.promise
654696
# @note TODO consider to allow being blocked_by
655-
class CompletablePromise < CompletableEvent
697+
class CompletableFuturePromise < AbstractPromise
656698
# Set the `IVar` to a value and wake or notify all threads waiting on it.
657699
#
658700
# @param [Object] value the value to store in the `IVar`
@@ -679,6 +721,7 @@ def try_fail(reason = StandardError.new)
679721
!!complete(false, nil, reason, false)
680722
end
681723

724+
public :complete
682725
public :evaluate_to
683726

684727
# @return [Future]
@@ -688,8 +731,8 @@ def evaluate_to!(*args, &block)
688731

689732
private
690733

691-
def ns_initialize(default_executor = :fast)
692-
super Future.new(self, default_executor)
734+
def ns_initialize(default_executor = :io)
735+
super CompletableFuture.new(self, default_executor)
693736
end
694737
end
695738

@@ -765,7 +808,7 @@ def executor
765808

766809
private
767810

768-
def ns_initialize(blocked_by_future, default_executor = :fast, executor = default_executor, &task)
811+
def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task)
769812
raise ArgumentError, 'no block given' unless block_given?
770813
super Future.new(self, default_executor), [blocked_by_future]
771814
@task = task
@@ -796,7 +839,7 @@ def pr_completable(_, _, _, _, _)
796839
class ThenPromise < BlockedTaskPromise
797840
private
798841

799-
def ns_initialize(blocked_by_future, default_executor = :fast, executor = default_executor, &task)
842+
def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task)
800843
blocked_by_future.is_a? Future or
801844
raise ArgumentError, 'only Future can be appended with then'
802845
super(blocked_by_future, default_executor, executor, &task)
@@ -814,7 +857,7 @@ def pr_completable(done_future, _, future, executor, task)
814857
class RescuePromise < BlockedTaskPromise
815858
private
816859

817-
def ns_initialize(blocked_by_future, default_executor = :fast, executor = default_executor, &task)
860+
def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task)
818861
blocked_by_future.is_a? Future or
819862
raise ArgumentError, 'only Future can be rescued'
820863
super(blocked_by_future, default_executor, executor, &task)
@@ -851,7 +894,7 @@ def self.new(*args)
851894

852895
private
853896

854-
def ns_initialize(default_executor = :fast)
897+
def ns_initialize(default_executor = :io)
855898
super Event.new(self, default_executor)
856899
end
857900
end
@@ -877,7 +920,7 @@ def ns_done(future)
877920
super future
878921
end
879922

880-
def ns_initialize(blocked_by_future, levels = 1, default_executor = :fast)
923+
def ns_initialize(blocked_by_future, levels = 1, default_executor = :io)
881924
blocked_by_future.is_a? Future or
882925
raise ArgumentError, 'only Future can be flatten'
883926
super(Future.new(self, default_executor), [blocked_by_future])
@@ -893,8 +936,9 @@ def pr_completable(_, blocked_by, future)
893936
class AllPromise < BlockedPromise
894937
private
895938

896-
def ns_initialize(blocked_by_futures, default_executor = :fast)
939+
def ns_initialize(blocked_by_futures, default_executor = :io)
897940
klass = blocked_by_futures.any? { |f| f.is_a?(Future) } ? Future : Event
941+
# noinspection RubyArgCount
898942
super(klass.new(self, default_executor), blocked_by_futures)
899943
end
900944

@@ -918,7 +962,7 @@ class AnyPromise < BlockedPromise
918962

919963
private
920964

921-
def ns_initialize(blocked_by_futures, default_executor = :fast)
965+
def ns_initialize(blocked_by_futures, default_executor = :io)
922966
blocked_by_futures.all? { |f| f.is_a? Future } or
923967
raise ArgumentError, 'accepts only Futures not Events'
924968
super(Future.new(self, default_executor), blocked_by_futures)
@@ -940,7 +984,7 @@ def touch
940984

941985
private
942986

943-
def ns_initialize(default_executor = :fast)
987+
def ns_initialize(default_executor = :io)
944988
super Event.new(self, default_executor)
945989
end
946990
end
@@ -957,7 +1001,7 @@ def inspect
9571001

9581002
private
9591003

960-
def ns_initialize(intended_time, default_executor = :fast)
1004+
def ns_initialize(intended_time, default_executor = :io)
9611005
super Event.new(self, default_executor)
9621006
in_seconds = begin
9631007
@intended_time = intended_time

spec/concurrent/edge/future_spec.rb

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,28 @@
6565
describe '.event' do
6666
specify do
6767
completable_event = Concurrent.event
68-
one = completable_event.event.chain { 1 }
69-
join = Concurrent.join(completable_event.event).chain { 1 }
68+
one = completable_event.chain { 1 }
69+
join = Concurrent.join(completable_event).chain { 1 }
7070
expect(one.completed?).to be false
7171
completable_event.complete
7272
expect(one.value).to eq 1
7373
expect(join.wait.completed?).to be true
7474
end
7575
end
7676

77+
describe '.future without block' do
78+
specify do
79+
completable_future = Concurrent.future
80+
one = completable_future.then(&:succ)
81+
join = Concurrent.join(completable_future).then { |v| v }
82+
expect(one.completed?).to be false
83+
completable_future.success 0
84+
expect(one.value).to eq 1
85+
expect(join.wait!.completed?).to be true
86+
expect(join.value!).to eq 0
87+
end
88+
end
89+
7790
describe '.any' do
7891
it 'continues on first result' do
7992
queue = Queue.new
@@ -107,11 +120,11 @@
107120

108121
it 'chains' do
109122
future0 = Concurrent.future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR
110-
future1 = future0.then(:io) { raise 'boo' } # executed on IO_EXECUTOR
123+
future1 = future0.then(:fast) { raise 'boo' } # executed on IO_EXECUTOR
111124
future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR
112125
future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR
113126
future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR
114-
future5 = future3.with_default_executor(:io) # connects new future with different executor, the new future is completed when future3 is
127+
future5 = future3.with_default_executor(:fast) # connects new future with different executor, the new future is completed when future3 is
115128
future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5
116129
future7 = Concurrent.join(future0, future3)
117130
future8 = future0.rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0
@@ -127,15 +140,15 @@
127140

128141
expect(table.join("\n")).to eq <<-TABLE.gsub(/^\s+\|/, '').strip
129142
|index success value reason pool d.pool
130-
| 0 true 3 fast fast
131-
| 1 false boo io fast
132-
| 2 false boo fast fast
133-
| 3 true boo fast fast
134-
| 4 true true fast fast
135-
| 5 true boo io
136-
| 6 true Boo io io
137-
| 7 true [3, "boo"] fast
138-
| 8 true 3 fast fast
143+
| 0 true 3 io io
144+
| 1 false boo fast io
145+
| 2 false boo io io
146+
| 3 true boo io io
147+
| 4 true true io io
148+
| 5 true boo fast
149+
| 6 true Boo fast fast
150+
| 7 true [3, "boo"] io
151+
| 8 true 3 io io
139152
TABLE
140153
end
141154

0 commit comments

Comments
 (0)