Skip to content

Commit 1e45960

Browse files
committed
Couple more examples for "Shared mutable state and concurrency" section
1 parent 7c864d8 commit 1e45960

File tree

11 files changed

+358
-134
lines changed

11 files changed

+358
-134
lines changed

coroutines-guide.md

Lines changed: 116 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ This is a short guide on core features of `kotlinx.coroutines` with a series of
7878
* [Buffered channels](#buffered-channels)
7979
* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
8080
* [The problem](#the-problem)
81+
* [Volatiles are of no help](#volatiles-are-of-no-help)
8182
* [Thread-safe data structures](#thread-safe-data-structures)
82-
* [Thread confinement](#thread-confinement)
83+
* [Thread confinement fine-grained](#thread-confinement-fine-grained)
84+
* [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
8385
* [Mutual exclusion](#mutual-exclusion)
8486
* [Actors](#actors)
8587
* [Select expression](#select-expression)
@@ -1484,49 +1486,52 @@ but others are unique.
14841486

14851487
### The problem
14861488

1487-
Let us launch 100k coroutines all doing the same action. We'll also measure their completion time for
1488-
further comparisons:
1489+
Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1490+
We'll also measure their completion time for further comparisons:
14891491

14901492
<!--- INCLUDE .*/example-sync-([0-9]+).kt
1493+
import kotlin.coroutines.experimental.CoroutineContext
14911494
import kotlin.system.measureTimeMillis
14921495
-->
14931496

1494-
<!--- INCLUDE .*/example-sync-02.kt
1497+
<!--- INCLUDE .*/example-sync-03.kt
14951498
import java.util.concurrent.atomic.AtomicInteger
14961499
-->
14971500

1498-
<!--- INCLUDE .*/example-sync-04.kt
1501+
<!--- INCLUDE .*/example-sync-06.kt
14991502
import kotlinx.coroutines.experimental.sync.Mutex
15001503
-->
15011504

1502-
<!--- INCLUDE .*/example-sync-05.kt
1505+
<!--- INCLUDE .*/example-sync-07.kt
15031506
import kotlinx.coroutines.experimental.channels.*
15041507
-->
15051508

15061509
```kotlin
1507-
suspend fun massiveRun(action: suspend () -> Unit) {
1508-
val n = 100_000
1510+
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1511+
val n = 1000 // number of coroutines to launch
1512+
val k = 1000 // times an action is repeated by each coroutine
15091513
val time = measureTimeMillis {
15101514
val jobs = List(n) {
1511-
launch(CommonPool) {
1512-
action()
1515+
launch(context) {
1516+
repeat(k) { action() }
15131517
}
15141518
}
15151519
jobs.forEach { it.join() }
15161520
}
1517-
println("Completed in $time ms")
1521+
println("Completed ${n * k} actions in $time ms")
15181522
}
15191523
```
15201524

15211525
<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
15221526

1523-
We start with a very simple action that increments a shared mutable variable.
1527+
We start with a very simple action that increments a shared mutable variable using
1528+
multi-threaded [CommonPool] context.
15241529

15251530
```kotlin
15261531
var counter = 0
15271532

15281533
fun main(args: Array<String>) = runBlocking<Unit> {
1529-
massiveRun {
1534+
massiveRun(CommonPool) {
15301535
counter++
15311536
}
15321537
println("Counter = $counter")
@@ -1535,40 +1540,73 @@ fun main(args: Array<String>) = runBlocking<Unit> {
15351540

15361541
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
15371542
1538-
<!--- TEST lines.size == 2 && lines[1].startsWith("Counter = ") -->
1543+
<!--- TEST LINES_START
1544+
Completed 1000000 actions in
1545+
Counter =
1546+
-->
1547+
1548+
What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1549+
increment the `counter` concurrently from multiple threads without any synchronization.
1550+
1551+
### Volatiles are of no help
15391552

1540-
What does it print at the end? It is highly unlikely to ever print "100000", because all the
1541-
100k coroutines increment the `counter` concurrently without any synchronization.
1553+
There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1554+
1555+
```kotlin
1556+
@Volatile // in Kotlin `volatile` is an annotation
1557+
var counter = 0
1558+
1559+
fun main(args: Array<String>) = runBlocking<Unit> {
1560+
massiveRun(CommonPool) {
1561+
counter++
1562+
}
1563+
println("Counter = $counter")
1564+
}
1565+
```
1566+
1567+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1568+
1569+
<!--- TEST LINES_START
1570+
Completed 1000000 actions in
1571+
Counter =
1572+
-->
1573+
1574+
This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1575+
linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1576+
do not provide atomicity of larger actions (increment in our case).
15421577

15431578
### Thread-safe data structures
15441579

15451580
The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
15461581
linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
15471582
operations that needs to be performed on a shared state.
1548-
In the case of a simple counter we can use `AtomicInteger` class:
1583+
In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
15491584

15501585
```kotlin
15511586
var counter = AtomicInteger()
15521587

15531588
fun main(args: Array<String>) = runBlocking<Unit> {
1554-
massiveRun {
1589+
massiveRun(CommonPool) {
15551590
counter.incrementAndGet()
15561591
}
15571592
println("Counter = ${counter.get()}")
15581593
}
15591594
```
15601595

1561-
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1596+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
15621597
1563-
<!--- TEST lines.size == 2 && lines[1] == "Counter = 100000" -->
1598+
<!--- TEST ARBITRARY_TIME
1599+
Completed 1000000 actions in xxx ms
1600+
Counter = 1000000
1601+
-->
15641602

15651603
This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
15661604
standard data structures and basic operations on them. However, it does not easily scale to complex
15671605
state or to complex operations that do not have ready-to-use thread-safe implementations.
15681606

1569-
### Thread confinement
1607+
### Thread confinement fine-grained
15701608

1571-
Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared
1609+
_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
15721610
state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
15731611
the single event-dispatch/application thread. It is easy to apply with coroutines by using a
15741612
single-threaded context:
@@ -1578,18 +1616,51 @@ val counterContext = newSingleThreadContext("CounterContext")
15781616
var counter = 0
15791617

15801618
fun main(args: Array<String>) = runBlocking<Unit> {
1581-
massiveRun {
1582-
run(counterContext) {
1619+
massiveRun(CommonPool) { // run each coroutine in CommonPool
1620+
run(counterContext) { // but confine each increment to the single-threaded context
15831621
counter++
15841622
}
15851623
}
15861624
println("Counter = $counter")
15871625
}
15881626
```
15891627

1590-
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
1628+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
1629+
1630+
<!--- TEST ARBITRARY_TIME
1631+
Completed 1000000 actions in xxx ms
1632+
Counter = 1000000
1633+
-->
1634+
1635+
This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1636+
from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1637+
1638+
### Thread confinement coarse-grained
1639+
1640+
In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1641+
are confined to the single thread. The following example does it like that, running each coroutine in
1642+
the single-threaded context to start with.
15911643

1592-
<!--- TEST lines.size == 2 && lines[1] == "Counter = 100000" -->
1644+
```kotlin
1645+
val counterContext = newSingleThreadContext("CounterContext")
1646+
var counter = 0
1647+
1648+
fun main(args: Array<String>) = runBlocking<Unit> {
1649+
massiveRun(counterContext) { // run each coroutine in the single-threaded context
1650+
counter++
1651+
}
1652+
println("Counter = $counter")
1653+
}
1654+
```
1655+
1656+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1657+
1658+
<!--- TEST ARBITRARY_TIME
1659+
Completed 1000000 actions in xxx ms
1660+
Counter = 1000000
1661+
-->
1662+
1663+
This now works much faster and produces correct result.
15931664

15941665
### Mutual exclusion
15951666

@@ -1603,7 +1674,7 @@ val mutex = Mutex()
16031674
var counter = 0
16041675

16051676
fun main(args: Array<String>) = runBlocking<Unit> {
1606-
massiveRun {
1677+
massiveRun(CommonPool) {
16071678
mutex.lock()
16081679
try { counter++ }
16091680
finally { mutex.unlock() }
@@ -1612,9 +1683,16 @@ fun main(args: Array<String>) = runBlocking<Unit> {
16121683
}
16131684
```
16141685

1615-
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
1686+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
1687+
1688+
<!--- TEST ARBITRARY_TIME
1689+
Completed 1000000 actions in xxx ms
1690+
Counter = 1000000
1691+
-->
16161692

1617-
<!--- TEST lines.size == 2 && lines[1] == "Counter = 100000" -->
1693+
The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1694+
where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1695+
is confined to.
16181696

16191697
### Actors
16201698

@@ -1643,7 +1721,7 @@ fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
16431721
fun main(args: Array<String>) = runBlocking<Unit> {
16441722
val request = Channel<CounterMsg>()
16451723
counterActor(request)
1646-
massiveRun {
1724+
massiveRun(CommonPool) {
16471725
request.send(IncCounter)
16481726
}
16491727
val response = Channel<Int>()
@@ -1652,14 +1730,20 @@ fun main(args: Array<String>) = runBlocking<Unit> {
16521730
}
16531731
```
16541732

1655-
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1733+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
16561734
1657-
<!--- TEST lines.size == 2 && lines[1] == "Counter = 100000" -->
1735+
<!--- TEST ARBITRARY_TIME
1736+
Completed 1000000 actions in xxx ms
1737+
Counter = 1000000
1738+
-->
16581739

16591740
Notice, that it does not matter (for correctness) what context the actor itself is executed in. An actor is
16601741
a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
16611742
works as a solution to the problem of shared mutable state.
16621743

1744+
Actor is more efficient than locking under load, because in this case it always has work to do and does not
1745+
have to switch at all.
1746+
16631747
## Select expression
16641748

16651749
Select expression makes it possible to await multiple suspending functions simultaneously and _select_

knit/src/Knit.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const val SECTION_START = "##"
4141

4242
const val PACKAGE_PREFIX = "package "
4343
const val STARTS_WITH_PREDICATE = "STARTS_WITH"
44+
const val ARBITRARY_TIME_PREDICATE = "ARBITRARY_TIME"
4445
const val FLEXIBLE_TIME_PREDICATE = "FLEXIBLE_TIME"
4546
const val FLEXIBLE_THREAD_PREDICATE = "FLEXIBLE_THREAD"
4647
const val LINES_START_UNORDERED_PREDICATE = "LINES_START_UNORDERED"
@@ -250,6 +251,7 @@ fun writeTest(testOut: PrintWriter, pgk: String, test: List<String>, predicate:
250251
when (predicate) {
251252
"" -> writeTestLines("verifyLines", test)
252253
STARTS_WITH_PREDICATE -> writeTestLines("verifyLinesStartWith", test)
254+
ARBITRARY_TIME_PREDICATE -> writeTestLines("verifyLinesArbitraryTime", test)
253255
FLEXIBLE_TIME_PREDICATE -> writeTestLines("verifyLinesFlexibleTime", test)
254256
FLEXIBLE_THREAD_PREDICATE -> writeTestLines("verifyLinesFlexibleThread", test)
255257
LINES_START_UNORDERED_PREDICATE -> writeTestLines("verifyLinesStartUnordered", test)

kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,27 @@
1818
package guide.sync.example01
1919

2020
import kotlinx.coroutines.experimental.*
21+
import kotlin.coroutines.experimental.CoroutineContext
2122
import kotlin.system.measureTimeMillis
2223

23-
suspend fun massiveRun(action: suspend () -> Unit) {
24-
val n = 100_000
24+
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
25+
val n = 1000 // number of coroutines to launch
26+
val k = 1000 // times an action is repeated by each coroutine
2527
val time = measureTimeMillis {
2628
val jobs = List(n) {
27-
launch(CommonPool) {
28-
action()
29+
launch(context) {
30+
repeat(k) { action() }
2931
}
3032
}
3133
jobs.forEach { it.join() }
3234
}
33-
println("Completed in $time ms")
35+
println("Completed ${n * k} actions in $time ms")
3436
}
3537

3638
var counter = 0
3739

3840
fun main(args: Array<String>) = runBlocking<Unit> {
39-
massiveRun {
41+
massiveRun(CommonPool) {
4042
counter++
4143
}
4244
println("Counter = $counter")

kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,29 @@
1818
package guide.sync.example02
1919

2020
import kotlinx.coroutines.experimental.*
21+
import kotlin.coroutines.experimental.CoroutineContext
2122
import kotlin.system.measureTimeMillis
22-
import java.util.concurrent.atomic.AtomicInteger
2323

24-
suspend fun massiveRun(action: suspend () -> Unit) {
25-
val n = 100_000
24+
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
25+
val n = 1000 // number of coroutines to launch
26+
val k = 1000 // times an action is repeated by each coroutine
2627
val time = measureTimeMillis {
2728
val jobs = List(n) {
28-
launch(CommonPool) {
29-
action()
29+
launch(context) {
30+
repeat(k) { action() }
3031
}
3132
}
3233
jobs.forEach { it.join() }
3334
}
34-
println("Completed in $time ms")
35+
println("Completed ${n * k} actions in $time ms")
3536
}
3637

37-
var counter = AtomicInteger()
38+
@Volatile // in Kotlin `volatile` is an annotation
39+
var counter = 0
3840

3941
fun main(args: Array<String>) = runBlocking<Unit> {
40-
massiveRun {
41-
counter.incrementAndGet()
42+
massiveRun(CommonPool) {
43+
counter++
4244
}
43-
println("Counter = ${counter.get()}")
45+
println("Counter = $counter")
4446
}

0 commit comments

Comments
 (0)