16
16
17
17
package kotlinx.coroutines.experimental
18
18
19
+ import kotlinx.atomicfu.atomic
20
+ import kotlinx.atomicfu.loop
19
21
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20
22
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21
23
import kotlinx.coroutines.experimental.internal.OpDescriptor
@@ -24,8 +26,6 @@ import kotlinx.coroutines.experimental.selects.SelectBuilder
24
26
import kotlinx.coroutines.experimental.selects.SelectInstance
25
27
import kotlinx.coroutines.experimental.selects.select
26
28
import java.util.concurrent.Future
27
- import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
28
- import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
29
29
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
30
30
import kotlin.coroutines.experimental.Continuation
31
31
import kotlin.coroutines.experimental.CoroutineContext
@@ -430,22 +430,12 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
430
430
FINAL_C (Cancelled) state on cancellation/completion
431
431
*/
432
432
433
- @Volatile
434
- private var _state : Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
433
+ // Note: use shared objects while we have no listeners
434
+ private val _state = atomic< Any ?>( if (active) EmptyActive else EmptyNew )
435
435
436
436
@Volatile
437
437
private var parentHandle: DisposableHandle ? = null
438
438
439
- protected companion object {
440
- private val STATE : AtomicReferenceFieldUpdater <JobSupport , Any ?> =
441
- AtomicReferenceFieldUpdater .newUpdater(JobSupport ::class .java, Any ::class .java, " _state" )
442
-
443
- fun stateToString (state : Any? ): String =
444
- if (state is Incomplete )
445
- if (state.isActive) " Active" else " New"
446
- else " Completed"
447
- }
448
-
449
439
// ------------ initialization ------------
450
440
451
441
/* *
@@ -488,14 +478,13 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
488
478
* Returns current state of this job.
489
479
*/
490
480
protected val state: Any? get() {
491
- while (true ) { // helper loop on state (complete in-progress atomic operations)
492
- val state = _state
481
+ _state .loop { state -> // helper loop on state (complete in-progress atomic operations)
493
482
if (state !is OpDescriptor ) return state
494
483
state.perform(this )
495
484
}
496
485
}
497
486
498
- protected inline fun lockFreeLoopOnState (block : (Any? ) -> Unit ): Nothing {
487
+ protected inline fun loopOnState (block : (Any? ) -> Unit ): Nothing {
499
488
while (true ) {
500
489
block(state)
501
490
}
@@ -545,7 +534,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
545
534
*/
546
535
protected fun tryUpdateState (expect : Any , update : Any? ): Boolean {
547
536
require(expect is Incomplete && update !is Incomplete ) // only incomplete -> completed transition is allowed
548
- if (! STATE .compareAndSet(this , expect, update)) return false
537
+ if (! _state .compareAndSet(expect, update)) return false
549
538
// Unregister from parent job
550
539
parentHandle?.dispose() // volatile read parentHandle _after_ state was updated
551
540
return true // continues in completeUpdateState
@@ -593,7 +582,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
593
582
notifyHandlers<JobCancellationNode <* >>(list, cause)
594
583
595
584
public final override fun start (): Boolean {
596
- lockFreeLoopOnState { state ->
585
+ loopOnState { state ->
597
586
when (startInternal(state)) {
598
587
FALSE -> return false
599
588
TRUE -> return true
@@ -609,13 +598,13 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
609
598
when (state) {
610
599
is Empty -> { // EMPTY_X state -- no completion handlers
611
600
if (state.isActive) return FALSE // already active
612
- if (! STATE .compareAndSet(this , state, EmptyActive )) return RETRY
601
+ if (! _state .compareAndSet(state, EmptyActive )) return RETRY
613
602
onStart()
614
603
return TRUE
615
604
}
616
605
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
617
- if (state.active != 0 ) return FALSE
618
- if (! NodeList . ACTIVE .compareAndSet(state, 0 , 1 )) return RETRY
606
+ if (state._active .value != 0 ) return FALSE
607
+ if (! state. _active .compareAndSet(0 , 1 )) return RETRY
619
608
onStart()
620
609
return TRUE
621
610
}
@@ -663,13 +652,13 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
663
652
664
653
private fun installHandler (handler : CompletionHandler , onCancelling : Boolean ): DisposableHandle {
665
654
var nodeCache: JobNode <* >? = null
666
- lockFreeLoopOnState { state ->
655
+ loopOnState { state ->
667
656
when (state) {
668
657
is Empty -> { // EMPTY_X state -- no completion handlers
669
658
if (state.isActive) {
670
659
// try move to SINGLE state
671
660
val node = nodeCache ? : makeNode(handler, onCancelling).also { nodeCache = it }
672
- if (STATE .compareAndSet(this , state, node)) return node
661
+ if (_state .compareAndSet(state, node)) return node
673
662
} else
674
663
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
675
664
}
@@ -710,7 +699,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
710
699
711
700
private fun promoteEmptyToNodeList (state : Empty ) {
712
701
// try to promote it to list in new state
713
- STATE .compareAndSet(this , state, NodeList (state.isActive))
702
+ _state .compareAndSet(state, NodeList (state.isActive))
714
703
}
715
704
716
705
private fun promoteSingleToNodeList (state : JobNode <* >) {
@@ -719,7 +708,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
719
708
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
720
709
val list = state.next // either NodeList or somebody else won the race, updated state
721
710
// just attempt converting it to list if state is still the same, then we'll continue lock-free loop
722
- STATE .compareAndSet(this , state, list)
711
+ _state .compareAndSet(state, list)
723
712
}
724
713
725
714
final override suspend fun join () {
@@ -728,7 +717,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
728
717
}
729
718
730
719
private fun joinInternal (): Boolean {
731
- lockFreeLoopOnState { state ->
720
+ loopOnState { state ->
732
721
if (state !is Incomplete ) return false // not active anymore (complete) -- no need to wait
733
722
if (startInternal(state) >= 0 ) return true // wait unless need to retry
734
723
}
@@ -740,7 +729,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
740
729
741
730
override fun <R > registerSelectJoin (select : SelectInstance <R >, block : suspend () -> R ) {
742
731
// fast-path -- check state and select/return if needed
743
- lockFreeLoopOnState { state ->
732
+ loopOnState { state ->
744
733
if (select.isSelected) return
745
734
if (state !is Incomplete ) {
746
735
// already complete -- select result
@@ -758,12 +747,12 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
758
747
759
748
internal fun removeNode (node : JobNode <* >) {
760
749
// remove logic depends on the state of the job
761
- lockFreeLoopOnState { state ->
750
+ loopOnState { state ->
762
751
when (state) {
763
752
is JobNode <* > -> { // SINGE/SINGLE+ state -- one completion handler
764
753
if (state != = node) return // a different job node --> we were already removed
765
754
// try remove and revert back to empty state
766
- if (STATE .compareAndSet(this , state, EmptyActive )) return
755
+ if (_state .compareAndSet(state, EmptyActive )) return
767
756
}
768
757
is NodeList , is Cancelling -> { // LIST or CANCELLING -- a list of completion handlers
769
758
// remove node from the list
@@ -789,15 +778,15 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
789
778
790
779
// transitions to Cancelled state
791
780
private fun makeCancelled (cause : Throwable ? ): Boolean {
792
- lockFreeLoopOnState { state ->
781
+ loopOnState { state ->
793
782
if (state !is Incomplete ) return false // quit if already complete
794
783
if (updateStateCancelled(state, cause)) return true
795
784
}
796
785
}
797
786
798
787
// transitions to Cancelling state
799
788
private fun makeCancelling (cause : Throwable ? ): Boolean {
800
- lockFreeLoopOnState { state ->
789
+ loopOnState { state ->
801
790
when (state) {
802
791
is Empty -> { // EMPTY_X state -- no completion handlers
803
792
if (state.isActive) {
@@ -814,7 +803,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
814
803
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
815
804
if (state.isActive) {
816
805
// try make it cancelling on the condition that we're still in this state
817
- if (STATE .compareAndSet(this , state, Cancelling (state, Cancelled (cause)))) {
806
+ if (_state .compareAndSet(state, Cancelling (state, Cancelled (cause)))) {
818
807
notifyCancellation(state, cause)
819
808
onCancellation()
820
809
return true
@@ -876,17 +865,9 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
876
865
private class NodeList (
877
866
active : Boolean
878
867
) : LockFreeLinkedListHead(), Incomplete {
879
- @Volatile
880
- @JvmField
881
- var active: Int = if (active) 1 else 0
868
+ val _active = atomic(if (active) 1 else 0 )
882
869
883
- override val isActive: Boolean get() = active != 0
884
-
885
- companion object {
886
- @JvmField
887
- val ACTIVE : AtomicIntegerFieldUpdater <NodeList > =
888
- AtomicIntegerFieldUpdater .newUpdater(NodeList ::class .java, " active" )
889
- }
870
+ override val isActive: Boolean get() = _active .value != 0
890
871
891
872
override fun toString (): String = buildString {
892
873
append(" List" )
@@ -976,7 +957,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
976
957
977
958
protected fun <R > registerSelectAwaitInternal (select : SelectInstance <R >, block : suspend (Any? ) -> R ) {
978
959
// fast-path -- check state and select/return if needed
979
- lockFreeLoopOnState { state ->
960
+ loopOnState { state ->
980
961
if (select.isSelected) return
981
962
if (state !is Incomplete ) {
982
963
// already complete -- select result
@@ -1006,6 +987,11 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
1006
987
}
1007
988
}
1008
989
990
+ internal fun stateToString (state : Any? ): String =
991
+ if (state is JobSupport .Incomplete )
992
+ if (state.isActive) " Active" else " New"
993
+ else " Completed"
994
+
1009
995
private const val RETRY = - 1
1010
996
private const val FALSE = 0
1011
997
private const val TRUE = 1
@@ -1095,16 +1081,10 @@ private class SelectAwaitOnCompletion<R>(
1095
1081
1096
1082
internal abstract class JobCancellationNode <out J : Job >(job : J ) : JobNode<J>(job) {
1097
1083
// shall be invoked at most once, so here is an additional flag
1098
- @Volatile
1099
- private var invoked: Int = 0
1100
-
1101
- private companion object {
1102
- private val INVOKED : AtomicIntegerFieldUpdater <JobCancellationNode <* >> = AtomicIntegerFieldUpdater
1103
- .newUpdater<JobCancellationNode <* >>(JobCancellationNode ::class .java, " invoked" )
1104
- }
1084
+ private val _invoked = atomic(0 )
1105
1085
1106
1086
final override fun invoke (reason : Throwable ? ) {
1107
- if (INVOKED .compareAndSet(this , 0 , 1 )) invokeOnce(reason)
1087
+ if (_invoked .compareAndSet(0 , 1 )) invokeOnce(reason)
1108
1088
}
1109
1089
1110
1090
abstract fun invokeOnce (reason : Throwable ? )
0 commit comments