Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.google.firebase.ai.common.util
import android.media.AudioRecord
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.yield

/**
Expand All @@ -35,20 +37,18 @@ internal val AudioRecord.minBufferSize: Int
*
* Will yield when this instance is not recording.
*/
internal fun AudioRecord.readAsFlow() = flow {
internal fun AudioRecord.readAsFlow() = callbackFlow {
val buffer = ByteArray(minBufferSize)

while (true) {
while (isActive) {
if (recordingState != AudioRecord.RECORDSTATE_RECORDING) {
// TODO(vguthal): Investigate if both yield and delay are required.
delay(10.milliseconds)
yield()
delay(0)
continue
}
val bytesRead = read(buffer, 0, buffer.size)
if (bytesRead > 0) {
emit(buffer.copyOf(bytesRead))
send(buffer.copyOf(bytesRead))
}
yield()
delay(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
import io.ktor.websocket.Frame
import io.ktor.websocket.close
import io.ktor.websocket.readBytes
import kotlinx.coroutines.CoroutineName
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
Expand All @@ -49,7 +51,6 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
Expand Down Expand Up @@ -120,7 +121,6 @@ internal constructor(
functionCallHandler: ((FunctionCallPart) -> FunctionResponsePart)? = null,
enableInterruptions: Boolean = false,
) {

val context = firebaseApp.applicationContext
if (
ContextCompat.checkSelfPermission(context, RECORD_AUDIO) != PackageManager.PERMISSION_GRANTED
Expand All @@ -137,8 +137,8 @@ internal constructor(
)
return@catchAsync
}

scope = CoroutineScope(blockingDispatcher + childJob())
// TODO: maybe it should be THREAD_PRIORITY_AUDIO anyways for playback and recording (not network though)
scope = CoroutineScope(blockingDispatcher + childJob() + CoroutineName("LiveSession Scope"))
audioHelper = AudioHelper.build()

recordUserAudio()
Expand Down Expand Up @@ -201,7 +201,7 @@ internal constructor(
)
}
?.let { emit(it.toPublic()) }
yield()
delay(0)
}
}
.onCompletion { stopAudioConversation() }
Expand Down Expand Up @@ -326,7 +326,10 @@ internal constructor(
?.listenToRecording()
?.buffer(UNLIMITED)
?.accumulateUntil(MIN_BUFFER_SIZE)
?.onEach { sendMediaStream(listOf(MediaData(it, "audio/pcm"))) }
?.onEach {
sendMediaStream(listOf(MediaData(it, "audio/pcm")))
delay(0)
}
?.catch { throw FirebaseAIException.from(it) }
?.launchIn(scope)
}
Expand Down Expand Up @@ -374,7 +377,7 @@ internal constructor(
if (it.interrupted) {
playBackQueue.clear()
} else {
println("Sending audio parts")
println("Queuing audio parts from model")
val audioParts = it.content?.parts?.filterIsInstance<InlineDataPart>().orEmpty()
for (part in audioParts) {
playBackQueue.add(part.inlineData)
Expand All @@ -390,7 +393,7 @@ internal constructor(
}
}
}
.launchIn(CoroutineScope(Dispatchers.IO))
.launchIn(scope)
}

/**
Expand All @@ -401,7 +404,7 @@ internal constructor(
* Launched asynchronously on [scope].
*/
private fun listenForModelPlayback(enableInterruptions: Boolean = false) {
CoroutineScope(Dispatchers.IO).launch {
scope.launch {
while (isActive) {
val playbackData = playBackQueue.poll()
if (playbackData == null) {
Expand All @@ -410,8 +413,9 @@ internal constructor(
if (!enableInterruptions) {
audioHelper?.resumeRecording()
}
yield()
delay(0)
} else {
println("Playing audio data")
/**
* We pause the recording while the model is speaking to avoid interrupting it because of
* no echo cancellation
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ benchmarkMacro = "1.3.4"
browser = "1.3.0"
cardview = "1.0.0"
constraintlayout = "2.1.4"
coroutines = "1.9.0"
coroutines = "1.10.2"
dagger = "2.51" # Don't bump above 2.51 as it causes a bug in AppDistro FeedbackSender JPEG code
datastore = "1.1.7"
dexmaker = "2.28.1"
Expand Down
Loading