Skip to content

Commit 8a4a8e1

Browse files
committed
Guide to reactive streams with coroutines
1 parent 9d6dfbe commit 8a4a8e1

21 files changed

+1809
-67
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Modules that provide coroutine dispatchers for various single-threaded UI librar
2626

2727
* [Guide to kotlinx.coroutines by example](coroutines-guide.md) (**read it first**)
2828
* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
29+
* [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md)
2930
* [Change log for kotlinx.coroutines](CHANGES.md)
3031
* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
3132
* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)

coroutines-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,6 +2153,7 @@ Channel was closed
21532153
## Further reading
21542154

21552155
* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
2156+
* [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md)
21562157
* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
21572158
* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)
21582159

knit/src/Knit.kt

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,13 @@ fun main(args: Array<String>) {
6060
fun knit(markdownFileName: String) {
6161
println("*** Reading $markdownFileName")
6262
val markdownFile = File(markdownFileName)
63-
val toc = arrayListOf<String>()
63+
val tocLines = arrayListOf<String>()
6464
var knitRegex: Regex? = null
6565
val includes = arrayListOf<Include>()
66-
val code = arrayListOf<String>()
67-
val test = arrayListOf<String>()
68-
var testOut: PrintWriter? = null
66+
val codeLines = arrayListOf<String>()
67+
val testLines = arrayListOf<String>()
68+
var testOut: String? = null
69+
val testOutLines = arrayListOf<String>()
6970
var lastPgk: String? = null
7071
val files = mutableSetOf<File>()
7172
val allApiRefs = arrayListOf<ApiRef>()
@@ -98,46 +99,47 @@ fun knit(markdownFileName: String) {
9899
continue@mainLoop
99100
}
100101
INCLUDE_DIRECTIVE -> {
101-
require(!directive.param.isEmpty()) { "$INCLUDE_DIRECTIVE directive must include regex parameter" }
102-
val include = Include(Regex(directive.param))
103-
if (directive.singleLine) {
104-
include.lines += code
105-
code.clear()
102+
if (directive.param.isEmpty()) {
103+
require(!directive.singleLine) { "$INCLUDE_DIRECTIVE directive without parameters must not be single line" }
104+
readUntilTo(DIRECTIVE_END, codeLines)
106105
} else {
107-
readUntilTo(DIRECTIVE_END, include.lines)
106+
val include = Include(Regex(directive.param))
107+
if (directive.singleLine) {
108+
include.lines += codeLines
109+
codeLines.clear()
110+
} else {
111+
readUntilTo(DIRECTIVE_END, include.lines)
112+
}
113+
includes += include
108114
}
109-
includes += include
110115
continue@mainLoop
111116
}
112117
CLEAR_DIRECTIVE -> {
113118
requireSingleLine(directive)
114119
require(directive.param.isEmpty()) { "$CLEAR_DIRECTIVE directive must not have parameters" }
115-
code.clear()
120+
codeLines.clear()
116121
continue@mainLoop
117122
}
118123
TEST_OUT_DIRECTIVE -> {
119124
require(!directive.param.isEmpty()) { "$TEST_OUT_DIRECTIVE directive must include file name parameter" }
120-
val file = File(directive.param)
121-
file.parentFile?.mkdirs()
122-
closeTestOut(testOut)
123-
println("Writing tests to ${directive.param}")
124-
testOut = PrintWriter(file)
125-
readUntil(DIRECTIVE_END).forEach { testOut!!.println(it) }
125+
flushTestOut(markdownFile.parentFile, testOut, testOutLines)
126+
testOut = directive.param
127+
readUntil(DIRECTIVE_END).forEach { testOutLines += it }
126128
}
127129
TEST_DIRECTIVE -> {
128130
require(lastPgk != null) { "'$PACKAGE_PREFIX' prefix was not found in emitted code"}
129131
require(testOut != null) { "$TEST_OUT_DIRECTIVE directive was not specified" }
130132
var predicate = directive.param
131-
if (test.isEmpty()) {
133+
if (testLines.isEmpty()) {
132134
if (directive.singleLine) {
133135
require(!predicate.isEmpty()) { "$TEST_OUT_DIRECTIVE must be preceded by $TEST_START block or contain test predicate"}
134136
} else
135-
test += readUntil(DIRECTIVE_END)
137+
testLines += readUntil(DIRECTIVE_END)
136138
} else {
137139
requireSingleLine(directive)
138140
}
139-
writeTest(testOut!!, lastPgk!!, test, predicate)
140-
test.clear()
141+
makeTest(testOutLines, lastPgk!!, testLines, predicate)
142+
testLines.clear()
141143
}
142144
SITE_ROOT_DIRECTIVE -> {
143145
requireSingleLine(directive)
@@ -166,21 +168,21 @@ fun knit(markdownFileName: String) {
166168
}
167169
}
168170
if (inLine.startsWith(CODE_START)) {
169-
require(test.isEmpty()) { "Previous test was not emitted with $TEST_DIRECTIVE" }
170-
code += ""
171-
readUntilTo(CODE_END, code)
171+
require(testLines.isEmpty()) { "Previous test was not emitted with $TEST_DIRECTIVE" }
172+
codeLines += ""
173+
readUntilTo(CODE_END, codeLines)
172174
continue@mainLoop
173175
}
174176
if (inLine.startsWith(TEST_START)) {
175-
require(test.isEmpty()) { "Previous test was not emitted with $TEST_DIRECTIVE" }
176-
readUntilTo(TEST_END, test)
177+
require(testLines.isEmpty()) { "Previous test was not emitted with $TEST_DIRECTIVE" }
178+
readUntilTo(TEST_END, testLines)
177179
continue@mainLoop
178180
}
179181
if (inLine.startsWith(SECTION_START) && markdownPart == MarkdownPart.POST_TOC) {
180182
val i = inLine.indexOf(' ')
181183
require(i >= 2) { "Invalid section start" }
182184
val name = inLine.substring(i + 1).trim()
183-
toc += " ".repeat(i - 2) + "* [$name](#${makeSectionRef(name)})"
185+
tocLines += " ".repeat(i - 2) + "* [$name](#${makeSectionRef(name)})"
184186
continue@mainLoop
185187
}
186188
for (match in API_REF_REGEX.findAll(inLine)) {
@@ -203,21 +205,18 @@ fun knit(markdownFileName: String) {
203205
outLines += line
204206
}
205207
}
206-
outLines += code
207-
code.clear()
208-
val oldLines = try { file.readLines() } catch (e: IOException) { emptyList<String>() }
209-
if (outLines != oldLines) writeLines(file, outLines)
208+
outLines += codeLines
209+
codeLines.clear()
210+
writeLinesIfNeeded(file, outLines)
210211
}
211212
}
212213
}
213-
// close test output
214-
closeTestOut(testOut)
215214
// update markdown file with toc
216215
val newLines = buildList<String> {
217216
addAll(markdown.preTocText)
218-
if (!toc.isEmpty()) {
217+
if (!tocLines.isEmpty()) {
219218
add("")
220-
addAll(toc)
219+
addAll(tocLines)
221220
add("")
222221
}
223222
addAll(markdown.postTocText)
@@ -229,9 +228,11 @@ fun knit(markdownFileName: String) {
229228
println("WARNING: $markdownFile: ${apiRef.line}: Broken reference to [${apiRef.name}]")
230229
}
231230
}
231+
// write test output
232+
flushTestOut(markdownFile.parentFile, testOut, testOutLines)
232233
}
233234

234-
fun writeTest(testOut: PrintWriter, pgk: String, test: List<String>, predicate: String) {
235+
fun makeTest(testOutLines: MutableList<String>, pgk: String, test: List<String>, predicate: String) {
235236
val funName = buildString {
236237
var cap = true
237238
for (c in pgk) {
@@ -243,37 +244,35 @@ fun writeTest(testOut: PrintWriter, pgk: String, test: List<String>, predicate:
243244
}
244245
}
245246
}
246-
with (testOut) {
247-
println()
248-
println(" @Test")
249-
println(" fun test$funName() {")
250-
print (" test { $pgk.main(emptyArray()) }")
251-
when (predicate) {
252-
"" -> writeTestLines("verifyLines", test)
253-
STARTS_WITH_PREDICATE -> writeTestLines("verifyLinesStartWith", test)
254-
ARBITRARY_TIME_PREDICATE -> writeTestLines("verifyLinesArbitraryTime", test)
255-
FLEXIBLE_TIME_PREDICATE -> writeTestLines("verifyLinesFlexibleTime", test)
256-
FLEXIBLE_THREAD_PREDICATE -> writeTestLines("verifyLinesFlexibleThread", test)
257-
LINES_START_UNORDERED_PREDICATE -> writeTestLines("verifyLinesStartUnordered", test)
258-
LINES_START_PREDICATE -> writeTestLines("verifyLinesStart", test)
259-
else -> {
260-
println(".also { lines ->")
261-
println(" check($predicate)")
262-
println(" }")
263-
}
247+
testOutLines += ""
248+
testOutLines += " @Test"
249+
testOutLines += " fun test$funName() {"
250+
val prefix = " test { $pgk.main(emptyArray()) }"
251+
when (predicate) {
252+
"" -> makeTestLines(testOutLines, prefix, "verifyLines", test)
253+
STARTS_WITH_PREDICATE -> makeTestLines(testOutLines, prefix, "verifyLinesStartWith", test)
254+
ARBITRARY_TIME_PREDICATE -> makeTestLines(testOutLines, prefix, "verifyLinesArbitraryTime", test)
255+
FLEXIBLE_TIME_PREDICATE -> makeTestLines(testOutLines, prefix, "verifyLinesFlexibleTime", test)
256+
FLEXIBLE_THREAD_PREDICATE -> makeTestLines(testOutLines, prefix, "verifyLinesFlexibleThread", test)
257+
LINES_START_UNORDERED_PREDICATE -> makeTestLines(testOutLines, prefix, "verifyLinesStartUnordered", test)
258+
LINES_START_PREDICATE -> makeTestLines(testOutLines, prefix, "verifyLinesStart", test)
259+
else -> {
260+
testOutLines += prefix + ".also { lines ->"
261+
testOutLines += " check($predicate)"
262+
testOutLines += " }"
264263
}
265-
println(" }")
266264
}
265+
testOutLines += " }"
267266
}
268267

269-
private fun PrintWriter.writeTestLines(method: String, test: List<String>) {
270-
println(".$method(")
268+
private fun makeTestLines(testOutLines: MutableList<String>, prefix: String, method: String, test: List<String>) {
269+
testOutLines += "$prefix.$method("
271270
for ((index, testLine) in test.withIndex()) {
272271
val commaOpt = if (index < test.size - 1) "," else ""
273272
val escapedLine = testLine.replace("\"", "\\\"")
274-
println(" \"$escapedLine\"$commaOpt")
273+
testOutLines += " \"$escapedLine\"$commaOpt"
275274
}
276-
println(" )")
275+
testOutLines += " )"
277276
}
278277

279278
private fun makeReplacements(line: String, match: MatchResult): String {
@@ -285,11 +284,12 @@ private fun makeReplacements(line: String, match: MatchResult): String {
285284
return result
286285
}
287286

288-
private fun closeTestOut(testOut: PrintWriter?) {
289-
if (testOut != null) {
290-
testOut.println("}")
291-
testOut.close()
292-
}
287+
private fun flushTestOut(parentDir: File?, testOut: String?, testOutLines: MutableList<String>) {
288+
if (testOut == null) return
289+
val file = File(parentDir, testOut)
290+
testOutLines += "}"
291+
writeLinesIfNeeded(file, testOutLines)
292+
testOutLines.clear()
293293
}
294294

295295
private fun MarkdownTextReader.readUntil(marker: String): List<String> =
@@ -375,6 +375,15 @@ fun <T : LineNumberReader> File.withLineNumberReader(factory: (Reader) -> T, blo
375375
fun File.withMarkdownTextReader(block: MarkdownTextReader.() -> Unit): MarkdownTextReader =
376376
withLineNumberReader<MarkdownTextReader>(::MarkdownTextReader, block)
377377

378+
fun writeLinesIfNeeded(file: File, outLines: List<String>) {
379+
val oldLines = try {
380+
file.readLines()
381+
} catch (e: IOException) {
382+
emptyList<String>()
383+
}
384+
if (outLines != oldLines) writeLines(file, outLines)
385+
}
386+
378387
fun writeLines(file: File, lines: List<String>) {
379388
println(" Writing $file ...")
380389
file.parentFile?.mkdirs()
@@ -431,7 +440,7 @@ fun processApiIndex(
431440
): List<String> {
432441
val key = ApiIndexKey(docsRoot, pkg)
433442
val map = apiIndexCache.getOrPut(key, {
434-
print("Parsing API docs at $docsRoot: ")
443+
print("Parsing API docs at $docsRoot/$pkg: ")
435444
val result = loadApiIndex(docsRoot, pkg, pkg)
436445
println("${result.size} definitions")
437446
result

kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private fun sanitize(s: String, mode: SanitizeMode): String {
8383
res = res.replace(Regex("ForkJoinPool\\.commonPool-worker-[0-9]+"), "CommonPool")
8484
res = res.replace(Regex("ForkJoinPool-[0-9]+-worker-[0-9]+"), "CommonPool")
8585
res = res.replace(Regex("CommonPool-worker-[0-9]+"), "CommonPool")
86-
86+
res = res.replace(Regex("RxComputationThreadPool-[0-9]+"), "RxComputationThreadPool")
8787
}
8888
SanitizeMode.NONE -> {}
8989
}

0 commit comments

Comments
 (0)