Skip to content

Commit e9b91a7

Browse files
committed
Merge remote-tracking branch 'origin/main' into MutableStateFlowUseUpdateInsteadOfCompareAndSet
2 parents 2d092f8 + dbf5d01 commit e9b91a7

File tree

5 files changed

+86
-74
lines changed

5 files changed

+86
-74
lines changed

encoders/protoc-gen-firebase-encoders/src/main/kotlin/com/google/firebase/encoders/proto/codegen/Types.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ data class ProtoField(
190190

191191
val lowerCamelCaseName: String
192192
get() {
193-
return SNAKE_CASE_REGEX.replace(name) { it.value.replace("_", "").toUpperCase() }
193+
return SNAKE_CASE_REGEX.replace(name) { it.value.replace("_", "").uppercase() }
194194
}
195195

196196
val camelCaseName: String

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.firebase.dataconnect
1818

1919
import com.google.firebase.auth.FirebaseAuth
20+
import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
2021
import com.google.firebase.dataconnect.testutil.DataConnectBackend
2122
import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase
2223
import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcServer
@@ -127,6 +128,7 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {
127128
grpcServer.metadatas.map { it.get(firebaseAuthTokenHeader) }.toCollection(authTokens)
128129
}
129130
val dataConnect = dataConnectFactory.newInstance(auth.app, grpcServer)
131+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
130132
val operationName = Arb.dataConnect.operationName().next(rs)
131133
val queryRef =
132134
dataConnect.query(operationName, Unit, serializer<TestData>(), serializer<Unit>())
@@ -155,6 +157,7 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() {
155157
grpcServer.metadatas.map { it.get(firebaseAuthTokenHeader) }.toCollection(authTokens)
156158
}
157159
val dataConnect = dataConnectFactory.newInstance(auth.app, grpcServer)
160+
(dataConnect as FirebaseDataConnectInternal).awaitAuthReady()
158161
val operationName = Arb.dataConnect.operationName().next(rs)
159162
val mutationRef =
160163
dataConnect.mutation(operationName, Unit, serializer<TestData>(), serializer<Unit>())

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ import kotlinx.coroutines.flow.MutableStateFlow
4848
import kotlinx.coroutines.flow.filter
4949
import kotlinx.coroutines.flow.first
5050
import kotlinx.coroutines.flow.getAndUpdate
51-
import kotlinx.coroutines.flow.update
52-
import kotlinx.coroutines.flow.updateAndGet
5351
import kotlinx.coroutines.launch
5452

5553
/** Base class that shares logic for managing the Auth token and AppCheck token. */
@@ -151,9 +149,18 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
151149
*/
152150
fun close() {
153151
logger.debug { "close()" }
152+
154153
weakThis.clear()
155154
coroutineScope.cancel()
156-
setClosedState()
155+
156+
val oldState = state.getAndUpdate { State.Closed }
157+
when (oldState) {
158+
is State.Closed -> {}
159+
is State.New -> {}
160+
is State.StateWithProvider -> {
161+
removeTokenListener(oldState.provider)
162+
}
163+
}
157164
}
158165

159166
/**
@@ -178,37 +185,31 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
178185
logger.debug { "awaitTokenProvider() done: currentState=$currentState" }
179186
}
180187

181-
// This function must ONLY be called from close().
182-
private fun setClosedState() {
183-
val oldState = state.getAndUpdate { State.Closed }
184-
if (oldState is State.StateWithProvider<T>) {
185-
removeTokenListener(oldState.provider)
186-
}
187-
}
188-
189188
/**
190189
* Sets a flag to force-refresh the token upon the next call to [getToken].
191190
*
192191
* If [close] has been called, this method does nothing.
193192
*/
194193
fun forceRefresh() {
195194
logger.debug { "forceRefresh()" }
196-
val newState =
197-
state.updateAndGet { oldState ->
195+
val oldState =
196+
state.getAndUpdate { oldState ->
198197
when (oldState) {
199-
is State.Closed -> return
198+
is State.Closed -> State.Closed
200199
is State.New -> oldState.copy(forceTokenRefresh = true)
201200
is State.Idle -> oldState.copy(forceTokenRefresh = true)
202-
is State.Active -> {
203-
val message = "needs token refresh (wgrwbrvjxt)"
204-
oldState.job.cancel(message, ForceRefresh(message))
205-
State.Idle(oldState.provider, forceTokenRefresh = true)
206-
}
201+
is State.Active -> State.Idle(oldState.provider, forceTokenRefresh = true)
207202
}
208203
}
209204

210-
check(newState is State.StateWithForceTokenRefresh<T> && newState.forceTokenRefresh) {
211-
"newState.forceTokenRefresh should be true: $newState (error code gnvr2wx7nz)"
205+
when (oldState) {
206+
is State.Closed -> {}
207+
is State.New -> {}
208+
is State.Idle -> {}
209+
is State.Active -> {
210+
val message = "needs token refresh (wgrwbrvjxt)"
211+
oldState.job.cancel(message, ForceRefresh(message))
212+
}
212213
}
213214
}
214215

@@ -338,24 +339,30 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
338339
logger.debug { "onProviderAvailable(newProvider=$newProvider)" }
339340
addTokenListener(newProvider)
340341

341-
state.update { oldState ->
342-
when (oldState) {
343-
is State.Closed -> {
344-
logger.debug {
345-
"onProviderAvailable(newProvider=$newProvider)" +
346-
" unregistering token listener that was just added"
347-
}
348-
removeTokenListener(newProvider)
349-
oldState
342+
val oldState =
343+
state.getAndUpdate { oldState ->
344+
when (oldState) {
345+
is State.Closed -> State.Closed
346+
is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh)
347+
is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh)
348+
is State.Active -> State.Idle(newProvider, forceTokenRefresh = false)
350349
}
351-
is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh)
352-
is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh)
353-
is State.Active -> {
354-
val newProviderClassName = newProvider::class.qualifiedName
355-
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
356-
oldState.job.cancel(message, NewProvider(message))
357-
State.Idle(newProvider, forceTokenRefresh = false)
350+
}
351+
352+
when (oldState) {
353+
is State.Closed -> {
354+
logger.debug {
355+
"onProviderAvailable(newProvider=$newProvider)" +
356+
" unregistering token listener that was just added"
358357
}
358+
removeTokenListener(newProvider)
359+
}
360+
is State.New -> {}
361+
is State.Idle -> {}
362+
is State.Active -> {
363+
val newProviderClassName = newProvider::class.qualifiedName
364+
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
365+
oldState.job.cancel(message, NewProvider(message))
359366
}
360367
}
361368
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -407,38 +407,41 @@ internal class FirebaseDataConnectImpl(
407407
dataConnectAuth.close()
408408
dataConnectAppCheck.close()
409409

410-
// Start the job to asynchronously close the gRPC client.
411-
val newCloseJobRef =
412-
closeJob.updateAndGet { oldCloseJob ->
413-
oldCloseJob.ref?.let {
414-
if (!it.isCancelled) {
415-
return it
416-
}
417-
}
418-
419-
@OptIn(DelicateCoroutinesApi::class)
420-
val newCloseJob =
421-
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
422-
lazyGrpcRPCs.initializedValueOrNull?.close()
423-
}
410+
// Create the "close job" to asynchronously close the gRPC client.
411+
@OptIn(DelicateCoroutinesApi::class)
412+
val newCloseJob =
413+
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
414+
lazyGrpcRPCs.initializedValueOrNull?.close()
415+
}
416+
newCloseJob.invokeOnCompletion { exception ->
417+
if (exception === null) {
418+
logger.debug { "close() completed successfully" }
419+
} else {
420+
logger.warn(exception) { "close() failed" }
421+
}
422+
}
424423

425-
newCloseJob.invokeOnCompletion { exception ->
426-
if (exception === null) {
427-
logger.debug { "close() completed successfully" }
428-
} else {
429-
logger.warn(exception) { "close() failed" }
430-
}
424+
// Register the new "close job", unless there is a "close job" already in progress or one that
425+
// completed successfully.
426+
val updatedCloseJob =
427+
closeJob.updateAndGet { oldCloseJob ->
428+
if (oldCloseJob.ref !== null && !oldCloseJob.ref.isCancelled) {
429+
oldCloseJob
430+
} else {
431+
NullableReference(newCloseJob)
431432
}
432-
433-
NullableReference(newCloseJob)
434433
}
435434

436-
val newCloseJob =
437-
checkNotNull(newCloseJobRef.ref) {
438-
"newCloseJobRef.ref should not be null (error code j3gbhd6e4j)"
439-
}
440-
newCloseJob.start()
441-
return newCloseJob
435+
// If the update "close job" was the one that we created, then start it!
436+
if (updatedCloseJob.ref === newCloseJob) {
437+
newCloseJob.start()
438+
}
439+
440+
// Return the job "close job" that is active or already completed so that the caller can await
441+
// its result.
442+
return checkNotNull(updatedCloseJob.ref) {
443+
"updatedCloseJob.ref should not have been null (error code y5fk4ntdnd)"
444+
}
442445
}
443446

444447
// The generated SDK relies on equals() and hashCode() using object identity.

firebase-vertexai/src/main/kotlin/com/google/firebase/vertexai/type/LiveSession.kt

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import io.ktor.client.plugins.websocket.ClientWebSocketSession
2424
import io.ktor.websocket.Frame
2525
import io.ktor.websocket.close
2626
import io.ktor.websocket.readBytes
27+
import java.io.ByteArrayOutputStream
2728
import java.util.concurrent.ConcurrentLinkedQueue
2829
import kotlin.coroutines.CoroutineContext
2930
import kotlinx.coroutines.CoroutineScope
@@ -141,16 +142,14 @@ internal constructor(
141142
}
142143

143144
private suspend fun sendAudioDataToServer() {
144-
var offset = 0
145-
val audioBuffer = ByteArray(MIN_BUFFER_SIZE * 2)
145+
146+
val audioBufferStream = ByteArrayOutputStream()
146147
while (isRecording) {
147148
val receivedAudio = audioQueue.poll() ?: continue
148-
receivedAudio.copyInto(audioBuffer, offset)
149-
offset += receivedAudio.size
150-
if (offset >= MIN_BUFFER_SIZE) {
151-
sendMediaStream(listOf(MediaData(audioBuffer, "audio/pcm")))
152-
audioBuffer.fill(0)
153-
offset = 0
149+
audioBufferStream.write(receivedAudio)
150+
if (audioBufferStream.size() >= MIN_BUFFER_SIZE) {
151+
sendMediaStream(listOf(MediaData(audioBufferStream.toByteArray(), "audio/pcm")))
152+
audioBufferStream.reset()
154153
}
155154
}
156155
}

0 commit comments

Comments
 (0)