Skip to content

Commit a2fa02f

Browse files
committed
working
1 parent b63b807 commit a2fa02f

File tree

1 file changed

+12
-9
lines changed
  • firebase-vertexai/src/main/kotlin/com/google/firebase/vertexai/type

1 file changed

+12
-9
lines changed

firebase-vertexai/src/main/kotlin/com/google/firebase/vertexai/type/LiveSession.kt

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import java.util.concurrent.ConcurrentLinkedQueue
1010
import kotlinx.coroutines.CoroutineScope
1111
import kotlinx.coroutines.Dispatchers
1212
import kotlinx.coroutines.cancel
13+
import kotlinx.coroutines.channels.Channel
1314
import kotlinx.coroutines.delay
1415
import kotlinx.coroutines.flow.Flow
1516
import kotlinx.coroutines.flow.flow
17+
import kotlinx.coroutines.flow.receiveAsFlow
1618
import kotlinx.coroutines.launch
1719
import kotlinx.coroutines.runBlocking
1820
import kotlinx.serialization.SerialName
@@ -29,8 +31,8 @@ internal constructor(
2931

3032
private val audioQueue = ConcurrentLinkedQueue<ByteArray>()
3133
private val playBackQueue = ConcurrentLinkedQueue<ByteArray>()
32-
private var stopReceiving = false
3334
private var startedReceiving = false
35+
private var receiveChannel: Channel<Frame> = Channel()
3436

3537
@Serializable
3638
internal data class ClientContent(
@@ -143,10 +145,10 @@ internal constructor(
143145

144146
public fun stopReceiving() {
145147
if(!startedReceiving) {
146-
stopReceiving = false
147148
return
148149
}
149-
stopReceiving = true
150+
receiveChannel.cancel()
151+
receiveChannel = Channel()
150152
startedReceiving = false
151153
}
152154

@@ -159,15 +161,16 @@ internal constructor(
159161
throw SessionAlreadyReceivingException()
160162
}
161163

164+
val flowReceive = session!!.incoming.receiveAsFlow()
165+
CoroutineScope(Dispatchers.IO).launch {
166+
flowReceive.collect {
167+
receiveChannel.send(it)
168+
}
169+
}
162170
return flow {
163171
startedReceiving = true
164172
while (true) {
165-
println(stopReceiving)
166-
if(stopReceiving) {
167-
stopReceiving = false
168-
break
169-
}
170-
val message = session!!.incoming.receive()
173+
val message = receiveChannel.receive()
171174
val receivedBytes = (message as Frame.Binary).readBytes()
172175
val receivedJson = receivedBytes.toString(Charsets.UTF_8)
173176
if (receivedJson.contains("interrupted")) {

0 commit comments

Comments
 (0)