@@ -15,36 +15,28 @@ import dev.gitlive.firebase.encode
15
15
import kotlinx.coroutines.CompletableDeferred
16
16
import kotlinx.coroutines.channels.awaitClose
17
17
import kotlinx.coroutines.channels.trySendBlocking
18
- import kotlinx.coroutines.coroutineScope
19
- import kotlinx.coroutines.flow.Flow
20
- import kotlinx.coroutines.flow.callbackFlow
21
- import kotlinx.coroutines.flow.filter
22
- import kotlinx.coroutines.flow.produceIn
23
- import kotlinx.coroutines.selects.select
24
- import kotlinx.coroutines.tasks.asDeferred
18
+ import kotlinx.coroutines.flow.*
25
19
import kotlinx.coroutines.tasks.await
26
20
import kotlinx.serialization.DeserializationStrategy
27
21
import kotlinx.serialization.KSerializer
28
22
import kotlinx.serialization.SerializationStrategy
29
23
import java.util.*
30
-
31
- suspend fun <T > Task<T>.awaitWhileOnline (): T = coroutineScope {
32
-
33
- val notConnected = Firebase .database
34
- .reference(" .info/connected" )
35
- .valueEvents
36
- .filter { ! it.value<Boolean >() }
37
- .produceIn(this )
38
-
39
- try {
40
- select<T > {
41
- asDeferred().onAwait { it }
42
- notConnected.onReceive { throw DatabaseException (" Database not connected" , null ) }
24
+ import kotlin.time.Duration.Companion.seconds
25
+
26
+ suspend fun <T > Task<T>.awaitWhileOnline (): T =
27
+ merge(
28
+ flow { emit(await()) },
29
+ flow<T > {
30
+ Firebase .database
31
+ .reference(" .info/connected" )
32
+ .valueEvents
33
+ .debounce(2 .seconds)
34
+ .first { ! it.value<Boolean >() }
35
+ throw DatabaseException (" Database not connected" , null )
43
36
}
44
- } finally {
45
- notConnected.cancel()
46
- }
47
- }
37
+ )
38
+ .first()
39
+
48
40
49
41
actual val Firebase .database
50
42
by lazy { FirebaseDatabase (com.google.firebase.database.FirebaseDatabase .getInstance()) }
0 commit comments