diff --git a/firebase-ai/src/main/kotlin/com/google/firebase/ai/common/util/android.kt b/firebase-ai/src/main/kotlin/com/google/firebase/ai/common/util/android.kt index 6179c8b52e9..e9b1736977c 100644 --- a/firebase-ai/src/main/kotlin/com/google/firebase/ai/common/util/android.kt +++ b/firebase-ai/src/main/kotlin/com/google/firebase/ai/common/util/android.kt @@ -19,7 +19,9 @@ package com.google.firebase.ai.common.util import android.media.AudioRecord import kotlin.time.Duration.Companion.milliseconds import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.isActive import kotlinx.coroutines.yield /** @@ -35,20 +37,18 @@ internal val AudioRecord.minBufferSize: Int * * Will yield when this instance is not recording. */ -internal fun AudioRecord.readAsFlow() = flow { +internal fun AudioRecord.readAsFlow() = callbackFlow { val buffer = ByteArray(minBufferSize) - while (true) { + while (isActive) { if (recordingState != AudioRecord.RECORDSTATE_RECORDING) { - // TODO(vguthal): Investigate if both yield and delay are required. - delay(10.milliseconds) - yield() + delay(0) continue } val bytesRead = read(buffer, 0, buffer.size) if (bytesRead > 0) { - emit(buffer.copyOf(bytesRead)) + send(buffer.copyOf(bytesRead)) } - yield() + delay(0) } } diff --git a/firebase-ai/src/main/kotlin/com/google/firebase/ai/type/LiveSession.kt b/firebase-ai/src/main/kotlin/com/google/firebase/ai/type/LiveSession.kt index f9507f5e8d3..a5b169d12aa 100644 --- a/firebase-ai/src/main/kotlin/com/google/firebase/ai/type/LiveSession.kt +++ b/firebase-ai/src/main/kotlin/com/google/firebase/ai/type/LiveSession.kt @@ -33,6 +33,7 @@ import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession import io.ktor.websocket.Frame import io.ktor.websocket.close import io.ktor.websocket.readBytes +import kotlinx.coroutines.CoroutineName import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean import kotlin.coroutines.CoroutineContext @@ -40,6 +41,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.catch @@ -49,7 +51,6 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.Serializable import kotlinx.serialization.encodeToString @@ -120,7 +121,6 @@ internal constructor( functionCallHandler: ((FunctionCallPart) -> FunctionResponsePart)? = null, enableInterruptions: Boolean = false, ) { - val context = firebaseApp.applicationContext if ( ContextCompat.checkSelfPermission(context, RECORD_AUDIO) != PackageManager.PERMISSION_GRANTED @@ -137,8 +137,8 @@ internal constructor( ) return@catchAsync } - - scope = CoroutineScope(blockingDispatcher + childJob()) + // TODO: maybe it should be THREAD_PRIORITY_AUDIO anyways for playback and recording (not network though) + scope = CoroutineScope(blockingDispatcher + childJob() + CoroutineName("LiveSession Scope")) audioHelper = AudioHelper.build() recordUserAudio() @@ -201,7 +201,7 @@ internal constructor( ) } ?.let { emit(it.toPublic()) } - yield() + delay(0) } } .onCompletion { stopAudioConversation() } @@ -326,7 +326,10 @@ internal constructor( ?.listenToRecording() ?.buffer(UNLIMITED) ?.accumulateUntil(MIN_BUFFER_SIZE) - ?.onEach { sendMediaStream(listOf(MediaData(it, "audio/pcm"))) } + ?.onEach { + sendMediaStream(listOf(MediaData(it, "audio/pcm"))) + delay(0) + } ?.catch { throw FirebaseAIException.from(it) } ?.launchIn(scope) } @@ -374,7 +377,7 @@ internal constructor( if (it.interrupted) { playBackQueue.clear() } else { - println("Sending audio parts") + println("Queuing audio parts from model") val audioParts = it.content?.parts?.filterIsInstance().orEmpty() for (part in audioParts) { playBackQueue.add(part.inlineData) @@ -390,7 +393,7 @@ internal constructor( } } } - .launchIn(CoroutineScope(Dispatchers.IO)) + .launchIn(scope) } /** @@ -401,7 +404,7 @@ internal constructor( * Launched asynchronously on [scope]. */ private fun listenForModelPlayback(enableInterruptions: Boolean = false) { - CoroutineScope(Dispatchers.IO).launch { + scope.launch { while (isActive) { val playbackData = playBackQueue.poll() if (playbackData == null) { @@ -410,8 +413,9 @@ internal constructor( if (!enableInterruptions) { audioHelper?.resumeRecording() } - yield() + delay(0) } else { + println("Playing audio data") /** * We pause the recording while the model is speaking to avoid interrupting it because of * no echo cancellation diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 74be10aa2ad..9f760b20104 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,7 +16,7 @@ benchmarkMacro = "1.3.4" browser = "1.3.0" cardview = "1.0.0" constraintlayout = "2.1.4" -coroutines = "1.9.0" +coroutines = "1.10.2" dagger = "2.51" # Don't bump above 2.51 as it causes a bug in AppDistro FeedbackSender JPEG code datastore = "1.1.7" dexmaker = "2.28.1"