@@ -131,6 +131,8 @@ def post_on(executor, *args, &job)
131
131
# Represents an event which will happen in future (will be completed). It has to always happen.
132
132
class Event < Synchronization ::LockableObject
133
133
safe_initialization!
134
+ private *attr_volatile_with_cas ( :internal_state )
135
+ public :internal_state
134
136
include Concern ::Deprecation
135
137
136
138
# @!visibility private
@@ -180,17 +182,17 @@ def initialize(promise, default_executor)
180
182
# TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
181
183
# TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
182
184
@Waiters = LockFreeStack . new
183
- @State = AtomicReference . new PENDING
185
+ self . internal_state = PENDING
184
186
end
185
187
186
188
# @return [:pending, :completed]
187
189
def state
188
- @State . get . to_sym
190
+ internal_state . to_sym
189
191
end
190
192
191
193
# Is Event/Future pending?
192
194
# @return [Boolean]
193
- def pending? ( state = @State . get )
195
+ def pending? ( state = internal_state )
194
196
!state . completed?
195
197
end
196
198
@@ -202,7 +204,7 @@ def unscheduled?
202
204
203
205
# Has the Event been completed?
204
206
# @return [Boolean]
205
- def completed? ( state = @State . get )
207
+ def completed? ( state = internal_state )
206
208
state . completed?
207
209
end
208
210
@@ -313,7 +315,7 @@ def set(*args, &block)
313
315
314
316
# @!visibility private
315
317
def complete_with ( state , raise_on_reassign = true )
316
- if @State . compare_and_set ( PENDING , state )
318
+ if compare_and_set_internal_state ( PENDING , state )
317
319
#(state)
318
320
# go to synchronized block only if there were waiting threads
319
321
synchronize { ns_broadcast } if @Waiters . clear
@@ -369,11 +371,6 @@ def waiting_threads
369
371
@Waiters . each . to_a
370
372
end
371
373
372
- # @!visibility private
373
- def internal_state
374
- @State . get
375
- end
376
-
377
374
private
378
375
379
376
# @return [true, false]
@@ -536,7 +533,7 @@ def apply(block)
536
533
537
534
# Has Future been success?
538
535
# @return [Boolean]
539
- def success? ( state = @State . get )
536
+ def success? ( state = internal_state )
540
537
state . completed? && state . success?
541
538
end
542
539
@@ -547,7 +544,7 @@ def fulfilled?
547
544
548
545
# Has Future been failed?
549
546
# @return [Boolean]
550
- def failed? ( state = @State . get )
547
+ def failed? ( state = internal_state )
551
548
state . completed? && !state . success?
552
549
end
553
550
@@ -563,23 +560,23 @@ def rejected?
563
560
# @!macro edge.periodical_wait
564
561
def value ( timeout = nil )
565
562
touch
566
- @State . get . value if wait_until_complete timeout
563
+ internal_state . value if wait_until_complete timeout
567
564
end
568
565
569
566
# @return [Exception, nil] the reason of the Future's failure
570
567
# @!macro edge.timeout_nil
571
568
# @!macro edge.periodical_wait
572
569
def reason ( timeout = nil )
573
570
touch
574
- @State . get . reason if wait_until_complete timeout
571
+ internal_state . reason if wait_until_complete timeout
575
572
end
576
573
577
574
# @return [Array(Boolean, Object, Exception), nil] triplet of success, value, reason
578
575
# @!macro edge.timeout_nil
579
576
# @!macro edge.periodical_wait
580
577
def result ( timeout = nil )
581
578
touch
582
- @State . get . result if wait_until_complete timeout
579
+ internal_state . result if wait_until_complete timeout
583
580
end
584
581
585
582
# Wait until Future is #complete?
@@ -601,14 +598,14 @@ def wait!(timeout = nil)
601
598
# @!macro edge.periodical_wait
602
599
def value! ( timeout = nil )
603
600
touch
604
- @State . get . value if wait_until_complete! timeout
601
+ internal_state . value if wait_until_complete! timeout
605
602
end
606
603
607
604
# @example allows failed Future to be risen
608
605
# raise Concurrent.future.fail
609
606
def exception ( *args )
610
607
raise 'obligation is not failed' unless failed?
611
- reason = @State . get . reason
608
+ reason = internal_state . reason
612
609
if reason . is_a? ( ::Array )
613
610
reason . each { |e | log ERROR , 'Edge::Future' , e }
614
611
Concurrent ::Error . new 'multiple exceptions, inspect log'
@@ -727,7 +724,7 @@ def on_failure!(&callback)
727
724
728
725
# @!visibility private
729
726
def complete_with ( state , raise_on_reassign = true )
730
- if @State . compare_and_set ( PENDING , state )
727
+ if compare_and_set_internal_state ( PENDING , state )
731
728
@Waiters . clear
732
729
synchronize { ns_broadcast }
733
730
call_callbacks state
@@ -745,12 +742,12 @@ def complete_with(state, raise_on_reassign = true)
745
742
746
743
# @!visibility private
747
744
def add_callback ( method , *args )
748
- state = @State . get
745
+ state = internal_state
749
746
if completed? ( state )
750
747
call_callback method , state , *args
751
748
else
752
749
@Callbacks . push [ method , *args ]
753
- state = @State . get
750
+ state = internal_state
754
751
# take back if it was completed in the meanwhile
755
752
call_callbacks state if completed? ( state )
756
753
end
@@ -759,7 +756,7 @@ def add_callback(method, *args)
759
756
760
757
# @!visibility private
761
758
def apply ( block )
762
- @State . get . apply block
759
+ internal_state . apply block
763
760
end
764
761
765
762
private
0 commit comments