Skip to content

Commit 12e96cd

Browse files
authored
Implement ObservableValue<T>.asFlow() (#1789)
* Implement ObservableValue<T>.asFlow() Fixes #1695
1 parent 6862afc commit 12e96cd

File tree

7 files changed

+273
-2
lines changed

7 files changed

+273
-2
lines changed

ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
2+
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
3+
}
4+
15
public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
26
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
37
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.javafx
6+
7+
import javafx.beans.value.ChangeListener
8+
import javafx.beans.value.ObservableValue
9+
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.channels.awaitClose
11+
import kotlinx.coroutines.flow.*
12+
13+
/**
14+
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
15+
* its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
16+
* succession, only the last one will be emitted.
17+
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
18+
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
19+
* All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
20+
* This flow emits at least the initial value.
21+
*
22+
* ### Operator fusion
23+
*
24+
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
25+
* [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
26+
*/
27+
@ExperimentalCoroutinesApi
28+
public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
29+
val listener = ChangeListener<T> { _, _, newValue ->
30+
try {
31+
offer(newValue)
32+
} catch (e: CancellationException) {
33+
// In case the event fires after the channel is closed
34+
}
35+
}
36+
addListener(listener)
37+
send(value)
38+
awaitClose {
39+
removeListener(listener)
40+
}
41+
}.flowOn(Dispatchers.JavaFx).conflate()

ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private class PulseTimer : AnimationTimer() {
116116
}
117117
}
118118

119-
/** @return [true] if initialized successfully, and [false] if no display is detected */
119+
/** @return true if initialized successfully, and false if no display is detected */
120120
internal fun initPlatform(): Boolean = PlatformInitializer.success
121121

122122
// Lazily try to initialize JavaFx platform just once

ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt renamed to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import javafx.application.*
88
import kotlinx.coroutines.*
99
import org.junit.*
1010

11-
class JavaFxTest : TestBase() {
11+
class JavaFxDispatcherTest : TestBase() {
1212
@Before
1313
fun setup() {
1414
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.property.SimpleIntegerProperty
4+
import kotlinx.coroutines.TestBase
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.flow.*
7+
import org.junit.Before
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
12+
class JavaFxObservableAsFlowTest : TestBase() {
13+
14+
@Before
15+
fun setup() {
16+
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
17+
}
18+
19+
@Test
20+
fun testFlowOrder() = runTest {
21+
if (!initPlatform()) {
22+
println("Skipping JavaFxTest in headless environment")
23+
return@runTest // ignore test in headless environments
24+
}
25+
26+
val integerProperty = SimpleIntegerProperty(0)
27+
val n = 1000
28+
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
29+
newSingleThreadContext("setter").use { pool ->
30+
launch(pool) {
31+
for (i in 1..n) {
32+
launch(Dispatchers.JavaFx) {
33+
integerProperty.set(i)
34+
}
35+
}
36+
}
37+
var i = -1
38+
flow.collect { j ->
39+
assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
40+
i = j
41+
}
42+
}
43+
}
44+
45+
@Test
46+
fun testConflation() = runTest {
47+
if (!initPlatform()) {
48+
println("Skipping JavaFxTest in headless environment")
49+
return@runTest // ignore test in headless environments
50+
}
51+
52+
withContext(Dispatchers.JavaFx) {
53+
val END_MARKER = -1
54+
val integerProperty = SimpleIntegerProperty(0)
55+
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
56+
launch {
57+
yield() // to subscribe to [integerProperty]
58+
yield() // send 0
59+
integerProperty.set(1)
60+
expect(3)
61+
yield() // send 1
62+
expect(5)
63+
integerProperty.set(2)
64+
for (i in (-100..-2)) {
65+
integerProperty.set(i) // should be skipped due to conflation
66+
}
67+
integerProperty.set(3)
68+
expect(6)
69+
yield() // send 2 and 3
70+
integerProperty.set(-1)
71+
}
72+
expect(1)
73+
flow.collect { i ->
74+
when (i) {
75+
0 -> expect(2)
76+
1 -> expect(4)
77+
2 -> expect(7)
78+
3 -> expect(8)
79+
else -> fail("i is $i")
80+
}
81+
}
82+
finish(9)
83+
}
84+
}
85+
86+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.property.SimpleIntegerProperty
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.flow.first
6+
import org.junit.*
7+
8+
class JavaFxStressTest : TestBase() {
9+
10+
@Before
11+
fun setup() {
12+
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
13+
}
14+
15+
@get:Rule
16+
val pool = ExecutorRule(1)
17+
18+
@Test
19+
fun testCancellationRace() = runTest {
20+
if (!initPlatform()) {
21+
println("Skipping JavaFxTest in headless environment")
22+
return@runTest // ignore test in headless environments
23+
}
24+
25+
val integerProperty = SimpleIntegerProperty(0)
26+
val flow = integerProperty.asFlow()
27+
var i = 1
28+
val n = 1000 * stressTestMultiplier
29+
repeat (n) {
30+
launch(pool) {
31+
flow.first()
32+
}
33+
withContext(Dispatchers.JavaFx) {
34+
integerProperty.set(i)
35+
}
36+
i += 1
37+
}
38+
}
39+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package examples
2+
3+
import javafx.application.Application
4+
import javafx.scene.Scene
5+
import javafx.scene.control.*
6+
import javafx.scene.layout.GridPane
7+
import javafx.stage.Stage
8+
import javafx.beans.property.SimpleStringProperty
9+
import javafx.event.EventHandler
10+
import kotlinx.coroutines.*
11+
import kotlinx.coroutines.flow.*
12+
import kotlinx.coroutines.javafx.*
13+
import kotlin.coroutines.CoroutineContext
14+
15+
fun main(args: Array<String>) {
16+
Application.launch(FxAsFlowApp::class.java, *args)
17+
}
18+
19+
/**
20+
* Adapted from
21+
* https://github.com/ReactiveX/RxJavaFX/blob/a78ca7d15f7d82d201df8fafb6eba732ec17e327/src/test/java/io/reactivex/rxjavafx/RxJavaFXTest.java
22+
*/
23+
class FxAsFlowApp: Application(), CoroutineScope {
24+
25+
private var job = Job()
26+
override val coroutineContext: CoroutineContext
27+
get() = JavaFx + job
28+
29+
private val incrementButton = Button("Increment")
30+
private val incrementLabel = Label("")
31+
private val textInput = TextField()
32+
private val flippedTextLabel = Label()
33+
private val spinner = Spinner<Int>()
34+
private val spinnerChangesLabel = Label()
35+
36+
public override fun start( primaryStage: Stage) {
37+
val gridPane = GridPane()
38+
gridPane.apply {
39+
hgap = 10.0
40+
vgap = 10.0
41+
add(incrementButton, 0, 0)
42+
add(incrementLabel, 1, 0)
43+
add(textInput, 0, 1)
44+
add(flippedTextLabel, 1, 1)
45+
add(spinner, 0, 2)
46+
add(spinnerChangesLabel, 1, 2)
47+
}
48+
val scene = Scene(gridPane)
49+
primaryStage.apply {
50+
width = 275.0
51+
setScene(scene)
52+
show()
53+
}
54+
}
55+
56+
public override fun stop() {
57+
super.stop()
58+
job.cancel()
59+
job = Job()
60+
}
61+
62+
init {
63+
// Initializing the "Increment" button
64+
val stringProperty = SimpleStringProperty()
65+
var i = 0
66+
incrementButton.onAction = EventHandler {
67+
i += 1
68+
stringProperty.set(i.toString())
69+
}
70+
launch {
71+
stringProperty.asFlow().collect {
72+
if (it != null) {
73+
stringProperty.set(it)
74+
}
75+
}
76+
}
77+
incrementLabel.textProperty().bind(stringProperty)
78+
// Initializing the reversed text field
79+
val stringProperty2 = SimpleStringProperty()
80+
launch {
81+
textInput.textProperty().asFlow().collect {
82+
if (it != null) {
83+
stringProperty2.set(it.reversed())
84+
}
85+
}
86+
}
87+
flippedTextLabel.textProperty().bind(stringProperty2)
88+
// Initializing the spinner
89+
spinner.valueFactory = SpinnerValueFactory.IntegerSpinnerValueFactory(0, 100)
90+
spinner.isEditable = true
91+
val stringProperty3 = SimpleStringProperty()
92+
launch {
93+
spinner.valueProperty().asFlow().collect {
94+
if (it != null) {
95+
stringProperty3.set("NEW: $it")
96+
}
97+
}
98+
}
99+
spinnerChangesLabel.textProperty().bind(stringProperty3)
100+
}
101+
}

0 commit comments

Comments
 (0)