From e0feba939275c0ac3a388d22a648647d2b419827 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Fri, 4 Apr 2025 06:34:08 +0000 Subject: [PATCH 01/15] DataConnectCredentialsTokenManager.kt: use MutableStateFlow instead of AtomicReference --- .../core/DataConnectCredentialsTokenManager.kt | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index e6f4049c359..cf127d571e7 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -30,7 +30,6 @@ import com.google.firebase.inject.Provider import com.google.firebase.internal.api.FirebaseNoSignedInUserException import com.google.firebase.util.nextAlphanumericString import java.lang.ref.WeakReference -import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.coroutineContext import kotlin.random.Random import kotlinx.coroutines.CancellationException @@ -129,7 +128,7 @@ internal sealed class DataConnectCredentialsTokenManager( * coroutines to run on the thread. */ private val state = - AtomicReference>(State.Idle(provider = null, forceTokenRefresh = false)) + MutableStateFlow>(State.Idle(provider = null, forceTokenRefresh = false)) /** * Adds the token listener to the given provider. @@ -171,7 +170,7 @@ internal sealed class DataConnectCredentialsTokenManager( // This function must ONLY be called from close(). private fun setClosedState() { while (true) { - val oldState = state.get() + val oldState = state.value val providerProvider: ProviderProvider = when (oldState) { is State.Closed -> return @@ -194,7 +193,7 @@ internal sealed class DataConnectCredentialsTokenManager( suspend fun forceRefresh() { logger.debug { "forceRefresh()" } while (true) { - val oldState = state.get() + val oldState = state.value val oldStateProviderProvider = when (oldState) { is State.Closed -> return @@ -246,7 +245,7 @@ internal sealed class DataConnectCredentialsTokenManager( logger.debug { "$invocationId getToken(requestId=$requestId)" } while (true) { val attemptSequenceNumber = nextSequenceNumber() - val oldState = state.get() + val oldState = state.value val newState: State.Active = when (oldState) { @@ -342,7 +341,7 @@ internal sealed class DataConnectCredentialsTokenManager( addTokenListener(newProvider) while (true) { - val oldState = state.get() + val oldState = state.value val newState = when (oldState) { is State.Closed -> { From 2adbdfe7f2282e81cc302d64f17f89d1e92c950d Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Fri, 4 Apr 2025 07:09:30 +0000 Subject: [PATCH 02/15] DataConnectCredentialsTokenManager.kt: clean up State sealed interface --- .../DataConnectCredentialsTokenManager.kt | 84 +++++++++---------- .../dataconnect/core/DataConnectGrpcClient.kt | 2 +- 2 files changed, 39 insertions(+), 47 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index cf127d571e7..de9638d82c3 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -48,7 +48,6 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.launch -import kotlinx.coroutines.yield /** Base class that shares logic for managing the Auth token and AppCheck token. */ internal sealed class DataConnectCredentialsTokenManager( @@ -86,49 +85,39 @@ internal sealed class DataConnectCredentialsTokenManager( } } - private interface ProviderProvider { - val provider: T? - } - private sealed interface State { /** State indicating that [close] has been invoked. */ object Closed : State - /** State indicating that there is no outstanding "get token" request. */ - class Idle( - - /** - * The [InternalAuthProvider] or [InteropAppCheckTokenProvider]; may be null if the deferred - * has not yet given us a provider. - */ - override val provider: T?, - + sealed interface StateWithForceTokenRefresh : State { /** The value to specify for `forceRefresh` on the next invocation of [getToken]. */ val forceTokenRefresh: Boolean - ) : State, ProviderProvider + } - /** State indicating that there _is_ an outstanding "get token" request. */ - class Active( + /** State indicating that the token provider is not (yet?) available. */ + data class New(override val forceTokenRefresh: Boolean) : StateWithForceTokenRefresh + + sealed interface StateWithProvider : State { + /** The token provider, [InternalAuthProvider] or [InteropAppCheckTokenProvider] */ + val provider: T + } + + /** State indicating that there is no outstanding "get token" request. */ + data class Idle(override val provider: T, override val forceTokenRefresh: Boolean) : + StateWithProvider, StateWithForceTokenRefresh - /** - * The [InternalAuthProvider] or [InteropAppCheckTokenProvider] that is performing the "get - * token" request. - */ + /** State indicating that there _is_ an outstanding "get token" request. */ + data class Active( override val provider: T, /** The job that is performing the "get token" request. */ val job: Deferred>> - ) : State, ProviderProvider + ) : StateWithProvider } - /** - * The current state of this object. The value should only be changed in a compare-and-swap loop - * in order to be thread-safe. Such a loop should call `yield()` on each iteration to allow other - * coroutines to run on the thread. - */ - private val state = - MutableStateFlow>(State.Idle(provider = null, forceTokenRefresh = false)) + /** The current state of this object. */ + private val state = MutableStateFlow>(State.New(forceTokenRefresh = false)) /** * Adds the token listener to the given provider. @@ -171,15 +160,16 @@ internal sealed class DataConnectCredentialsTokenManager( private fun setClosedState() { while (true) { val oldState = state.value - val providerProvider: ProviderProvider = + val provider: T? = when (oldState) { is State.Closed -> return - is State.Idle -> oldState - is State.Active -> oldState + is State.New -> null + is State.Idle -> oldState.provider + is State.Active -> oldState.provider } if (state.compareAndSet(oldState, State.Closed)) { - providerProvider.provider?.let { removeTokenListener(it) } + provider?.let { removeTokenListener(it) } break } } @@ -190,27 +180,28 @@ internal sealed class DataConnectCredentialsTokenManager( * * If [close] has been called, this method does nothing. */ - suspend fun forceRefresh() { + fun forceRefresh() { logger.debug { "forceRefresh()" } while (true) { val oldState = state.value - val oldStateProviderProvider = + val newState: State.StateWithForceTokenRefresh = when (oldState) { is State.Closed -> return - is State.Idle -> oldState + is State.New -> oldState.copy(forceTokenRefresh = true) + is State.Idle -> oldState.copy(forceTokenRefresh = true) is State.Active -> { val message = "needs token refresh (wgrwbrvjxt)" oldState.job.cancel(message, ForceRefresh(message)) - oldState + State.Idle(oldState.provider, forceTokenRefresh = true) } } - val newState = State.Idle(oldStateProviderProvider.provider, forceTokenRefresh = true) + check(newState.forceTokenRefresh) { + "newState.forceTokenRefresh should be true (error code gnvr2wx7nz)" + } if (state.compareAndSet(oldState, newState)) { break } - - yield() } } @@ -256,13 +247,13 @@ internal sealed class DataConnectCredentialsTokenManager( } throw CredentialsTokenManagerClosedException(this) } - is State.Idle -> { - if (oldState.provider === null) { - logger.debug { - "$invocationId getToken() returns null (token provider is not (yet?) available)" - } - return null + is State.New -> { + logger.debug { + "$invocationId getToken() returns null (token provider is not (yet?) available)" } + return null + } + is State.Idle -> { newActiveState(invocationId, oldState.provider, oldState.forceTokenRefresh) } is State.Active -> { @@ -352,6 +343,7 @@ internal sealed class DataConnectCredentialsTokenManager( removeTokenListener(newProvider) break } + is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh) is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh) is State.Active -> { val newProviderClassName = newProvider::class.qualifiedName diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt index 26e9ce49c51..d0ebe3f84c3 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt @@ -102,7 +102,7 @@ internal class DataConnectGrpcClient( ) } - private suspend inline fun T.retryOnGrpcUnauthenticatedError( + private inline fun T.retryOnGrpcUnauthenticatedError( requestId: String, kotlinMethodName: String, block: T.() -> R From ee8fbf3642dd980859aefe07ed0fd3ed2376fddf Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Fri, 4 Apr 2025 07:21:51 +0000 Subject: [PATCH 03/15] DataConnectCredentialsTokenManager.kt: awaitTokenProvider() added --- .../DataConnectCredentialsTokenManager.kt | 31 ++++++++++++++----- .../core/FirebaseDataConnectImpl.kt | 29 ++--------------- 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index de9638d82c3..9458a678bff 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -45,8 +45,8 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch /** Base class that shares logic for managing the Auth token and AppCheck token. */ @@ -59,9 +59,6 @@ internal sealed class DataConnectCredentialsTokenManager( val instanceId: String get() = logger.nameWithId - private val _providerAvailable = MutableStateFlow(false) - val providerAvailable: StateFlow = _providerAvailable.asStateFlow() - @Suppress("LeakingThis") private val weakThis = WeakReference(this) private val coroutineScope = @@ -156,6 +153,28 @@ internal sealed class DataConnectCredentialsTokenManager( setClosedState() } + /** + * Suspends until the token provider becomes available to this object. + * + * If [close] has been invoked, or is invoked _before_ a token provider becomes available, then + * this method returns normally, as if a token provider _had_ become available. + */ + suspend fun awaitTokenProvider() { + logger.debug { "awaitTokenProvider() start" } + val currentState = + state + .filter { + when (it) { + State.Closed -> true + is State.New -> false + is State.Idle -> true + is State.Active -> true + } + } + .first() + logger.debug { "awaitTokenProvider() done: currentState=$currentState" } + } + // This function must ONLY be called from close(). private fun setClosedState() { while (true) { @@ -357,8 +376,6 @@ internal sealed class DataConnectCredentialsTokenManager( break } } - - _providerAvailable.value = true } /** diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index d9bee50b89d..f49afda964f 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -54,8 +54,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -120,9 +118,6 @@ internal class FirebaseDataConnectImpl( } ) - private val authProviderAvailable = MutableStateFlow(false) - private val appCheckProviderAvailable = MutableStateFlow(false) - // Protects `closed`, `grpcClient`, `emulatorSettings`, and `queryManager`. private val mutex = Mutex() @@ -141,17 +136,7 @@ internal class FirebaseDataConnectImpl( ) override suspend fun awaitAuthReady() { - authProviderAvailable.first { it } - } - - init { - val name = CoroutineName("DataConnectAuth isProviderAvailable pipe for $instanceId") - coroutineScope.launch(name) { - dataConnectAuth.providerAvailable.collect { isProviderAvailable -> - logger.debug { "authProviderAvailable=$isProviderAvailable" } - authProviderAvailable.value = isProviderAvailable - } - } + dataConnectAuth.awaitTokenProvider() } private val dataConnectAppCheck: DataConnectAppCheck = @@ -163,17 +148,7 @@ internal class FirebaseDataConnectImpl( ) override suspend fun awaitAppCheckReady() { - appCheckProviderAvailable.first { it } - } - - init { - val name = CoroutineName("DataConnectAppCheck isProviderAvailable pipe for $instanceId") - coroutineScope.launch(name) { - dataConnectAppCheck.providerAvailable.collect { isProviderAvailable -> - logger.debug { "appCheckProviderAvailable=$isProviderAvailable" } - appCheckProviderAvailable.value = isProviderAvailable - } - } + dataConnectAppCheck.awaitTokenProvider() } private val lazyGrpcRPCs = From 68f3adab8f247e6e3654501825b62ef648d56963 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Fri, 4 Apr 2025 07:30:51 +0000 Subject: [PATCH 04/15] FirebaseDataConnectInternalExts.kt: add extension functions: FirebaseDataConnect.awaitAuthReady() and FirebaseDataConnect.awaitAppCheckReady() --- .../dataconnect/AuthIntegrationTest.kt | 4 +-- .../GrpcMetadataIntegrationTest.kt | 23 ++++++++-------- .../FirebaseDataConnectInternalExts.kt | 26 +++++++++++++++++++ 3 files changed, 40 insertions(+), 13 deletions(-) create mode 100644 firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt index ba314d0f9db..5f240960b7e 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt @@ -17,10 +17,10 @@ package com.google.firebase.dataconnect import com.google.firebase.auth.FirebaseAuth -import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal import com.google.firebase.dataconnect.testutil.DataConnectBackend import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcServer +import com.google.firebase.dataconnect.testutil.awaitAuthReady import com.google.firebase.dataconnect.testutil.newInstance import com.google.firebase.dataconnect.testutil.property.arbitrary.dataConnect import com.google.firebase.dataconnect.testutil.schemas.PersonSchema @@ -202,7 +202,7 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() { } private suspend fun signIn() { - (personSchema.dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + personSchema.dataConnect.awaitAuthReady() val authResult = auth.run { signInAnonymously().await() } withClue("authResult.user returned from signInAnonymously()") { authResult.user.shouldNotBeNull() diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt index 8ff7715ddb9..394c2195332 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt @@ -23,7 +23,6 @@ import com.google.android.gms.tasks.Tasks import com.google.firebase.appcheck.AppCheckProvider import com.google.firebase.appcheck.AppCheckProviderFactory import com.google.firebase.appcheck.FirebaseAppCheck -import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal import com.google.firebase.dataconnect.generated.GeneratedConnector import com.google.firebase.dataconnect.generated.GeneratedMutation import com.google.firebase.dataconnect.generated.GeneratedQuery @@ -32,6 +31,8 @@ import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase import com.google.firebase.dataconnect.testutil.DataConnectTestAppCheckToken import com.google.firebase.dataconnect.testutil.FirebaseAuthBackend import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcServer +import com.google.firebase.dataconnect.testutil.awaitAppCheckReady +import com.google.firebase.dataconnect.testutil.awaitAuthReady import com.google.firebase.dataconnect.testutil.getFirebaseAppIdFromStrings import com.google.firebase.dataconnect.testutil.newInstance import com.google.firebase.dataconnect.util.SuspendingLazy @@ -138,7 +139,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldNotSendAuthMetadataWhenNotLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val queryRef = dataConnect.query("qryfyk7yfppfe", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -151,7 +152,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldNotSendAuthMetadataWhenNotLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val mutationRef = dataConnect.mutation("mutckjpte9v9j", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -165,7 +166,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldSendAuthMetadataWhenLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } firebaseAuthSignIn(dataConnect) @@ -179,7 +180,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldSendAuthMetadataWhenLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val mutationRef = dataConnect.mutation("mutayn7as5k7d", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -194,7 +195,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldNotSendAuthMetadataAfterLogout() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer(), serializer()) val metadatasJob1 = async { grpcServer.metadatas.first() } val metadatasJob2 = async { grpcServer.metadatas.take(2).last() } @@ -212,7 +213,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldNotSendAuthMetadataAfterLogout() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val mutationRef = dataConnect.mutation("mutvw945ag3vv", Unit, serializer(), serializer()) val metadatasJob1 = async { grpcServer.metadatas.first() } @@ -233,7 +234,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { // appcheck token is sent at all. val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val queryRef = dataConnect.query("qrybbeekpkkck", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -248,7 +249,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { // appcheck token is sent at all. val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val mutationRef = dataConnect.mutation("mutbs7hhxk39c", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -262,7 +263,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldSendAppCheckMetadataWhenAppCheckIsEnabled() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } val appCheck = FirebaseAppCheck.getInstance(dataConnect.app) @@ -277,7 +278,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldSendAppCheckMetadataWhenAppCheckIsEnabled() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val mutationRef = dataConnect.mutation("mutz4hzqzpgb4", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt new file mode 100644 index 00000000000..f73f1d8b50d --- /dev/null +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.firebase.dataconnect.testutil + +import com.google.firebase.dataconnect.FirebaseDataConnect +import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal + +suspend fun FirebaseDataConnect.awaitAuthReady() = + (this as FirebaseDataConnectInternal).awaitAuthReady() + +suspend fun FirebaseDataConnect.awaitAppCheckReady() = + (this as FirebaseDataConnectInternal).awaitAppCheckReady() From 6010e498d6460c2f31d917ce9931aac6413c9abc Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Fri, 4 Apr 2025 12:42:23 -0400 Subject: [PATCH 05/15] empty commit to re-trigger github actions From 415215c1fa6fe4d91f356b00bf346e6d346c0e58 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Fri, 4 Apr 2025 19:53:29 +0000 Subject: [PATCH 06/15] dataconnect: Use MutableStateFlow.update() instead of MutableStateFlow.compareAndSet() directly for improved readability and less potential for bugs --- .../DataConnectCredentialsTokenManager.kt | 72 +++++++------------ .../core/FirebaseDataConnectImpl.kt | 46 ++++++------ .../dataconnect/core/QuerySubscriptionImpl.kt | 24 +++---- .../querymgr/RegisteredDataDeserialzer.kt | 12 ++-- .../testutil/SuspendingCountDownLatch.kt | 11 ++- 5 files changed, 71 insertions(+), 94 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index 9458a678bff..df99cca553a 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -47,6 +47,9 @@ import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.getAndUpdate +import kotlinx.coroutines.flow.update +import kotlinx.coroutines.flow.updateAndGet import kotlinx.coroutines.launch /** Base class that shares logic for managing the Auth token and AppCheck token. */ @@ -177,20 +180,9 @@ internal sealed class DataConnectCredentialsTokenManager( // This function must ONLY be called from close(). private fun setClosedState() { - while (true) { - val oldState = state.value - val provider: T? = - when (oldState) { - is State.Closed -> return - is State.New -> null - is State.Idle -> oldState.provider - is State.Active -> oldState.provider - } - - if (state.compareAndSet(oldState, State.Closed)) { - provider?.let { removeTokenListener(it) } - break - } + val oldState = state.getAndUpdate { State.Closed } + if (oldState is State.StateWithProvider) { + removeTokenListener(oldState.provider) } } @@ -201,9 +193,8 @@ internal sealed class DataConnectCredentialsTokenManager( */ fun forceRefresh() { logger.debug { "forceRefresh()" } - while (true) { - val oldState = state.value - val newState: State.StateWithForceTokenRefresh = + val newState = + state.updateAndGet { oldState -> when (oldState) { is State.Closed -> return is State.New -> oldState.copy(forceTokenRefresh = true) @@ -214,13 +205,10 @@ internal sealed class DataConnectCredentialsTokenManager( State.Idle(oldState.provider, forceTokenRefresh = true) } } - - check(newState.forceTokenRefresh) { - "newState.forceTokenRefresh should be true (error code gnvr2wx7nz)" - } - if (state.compareAndSet(oldState, newState)) { - break } + + check(newState is State.StateWithForceTokenRefresh && newState.forceTokenRefresh) { + "newState.forceTokenRefresh should be true: $newState (error code gnvr2wx7nz)" } } @@ -350,30 +338,24 @@ internal sealed class DataConnectCredentialsTokenManager( logger.debug { "onProviderAvailable(newProvider=$newProvider)" } addTokenListener(newProvider) - while (true) { - val oldState = state.value - val newState = - when (oldState) { - is State.Closed -> { - logger.debug { - "onProviderAvailable(newProvider=$newProvider)" + - " unregistering token listener that was just added" - } - removeTokenListener(newProvider) - break - } - is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh) - is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh) - is State.Active -> { - val newProviderClassName = newProvider::class.qualifiedName - val message = "a new provider $newProviderClassName is available (symhxtmazy)" - oldState.job.cancel(message, NewProvider(message)) - State.Idle(newProvider, forceTokenRefresh = false) + state.update { oldState -> + when (oldState) { + is State.Closed -> { + logger.debug { + "onProviderAvailable(newProvider=$newProvider)" + + " unregistering token listener that was just added" } + removeTokenListener(newProvider) + oldState + } + is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh) + is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh) + is State.Active -> { + val newProviderClassName = newProvider::class.qualifiedName + val message = "a new provider $newProviderClassName is available (symhxtmazy)" + oldState.job.cancel(message, NewProvider(message)) + State.Idle(newProvider, forceTokenRefresh = false) } - - if (state.compareAndSet(oldState, newState)) { - break } } } diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index f49afda964f..3a7569967de 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -54,6 +54,7 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.updateAndGet import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -407,34 +408,37 @@ internal class FirebaseDataConnectImpl( dataConnectAppCheck.close() // Start the job to asynchronously close the gRPC client. - while (true) { - val oldCloseJob = closeJob.value - - oldCloseJob.ref?.let { - if (!it.isCancelled) { - return it + val newCloseJobRef = + closeJob.updateAndGet { oldCloseJob -> + oldCloseJob.ref?.let { + if (!it.isCancelled) { + return it + } } - } - @OptIn(DelicateCoroutinesApi::class) - val newCloseJob = - GlobalScope.async(start = CoroutineStart.LAZY) { - lazyGrpcRPCs.initializedValueOrNull?.close() - } + @OptIn(DelicateCoroutinesApi::class) + val newCloseJob = + GlobalScope.async(start = CoroutineStart.LAZY) { + lazyGrpcRPCs.initializedValueOrNull?.close() + } - newCloseJob.invokeOnCompletion { exception -> - if (exception === null) { - logger.debug { "close() completed successfully" } - } else { - logger.warn(exception) { "close() failed" } + newCloseJob.invokeOnCompletion { exception -> + if (exception === null) { + logger.debug { "close() completed successfully" } + } else { + logger.warn(exception) { "close() failed" } + } } + + NullableReference(newCloseJob) } - if (closeJob.compareAndSet(oldCloseJob, NullableReference(newCloseJob))) { - newCloseJob.start() - return newCloseJob + val newCloseJob = + checkNotNull(newCloseJobRef.ref) { + "newCloseJobRef.ref should not be null (error code j3gbhd6e4j)" } - } + newCloseJob.start() + return newCloseJob } // The generated SDK relies on equals() and hashCode() using object identity. diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt index ceeb861cab8..2ca9aea6771 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch internal class QuerySubscriptionImpl(query: QueryRefImpl) : @@ -80,22 +81,17 @@ internal class QuerySubscriptionImpl(query: QueryRefImpl= prospectiveSequenceNumber) { - return - } - } - - if (_lastResult.compareAndSet(currentLastResult, NullableReference(prospectiveLastResult))) { - return + _lastResult.update { currentLastResult -> + if ( + currentLastResult.ref != null && + currentLastResult.ref.sequencedResult.sequenceNumber >= + prospectiveLastResult.sequencedResult.sequenceNumber + ) { + currentLastResult + } else { + NullableReference(prospectiveLastResult) } } } diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt index 3f94a7f95a0..1fa6d94eae4 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt @@ -31,6 +31,7 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.flow.update import kotlinx.coroutines.withContext import kotlinx.serialization.DeserializationStrategy import kotlinx.serialization.modules.SerializersModule @@ -84,17 +85,14 @@ internal class RegisteredDataDeserializer( lazyDeserialize(requestId, sequencedResult) ) - // Use a compare-and-swap ("CAS") loop to ensure that an old update never clobbers a newer one. - while (true) { - val currentUpdate = latestUpdate.value + latestUpdate.update { currentUpdate -> if ( currentUpdate.ref !== null && currentUpdate.ref.sequenceNumber > sequencedResult.sequenceNumber ) { - break // don't clobber a newer update with an older one - } - if (latestUpdate.compareAndSet(currentUpdate, NullableReference(newUpdate))) { - break + currentUpdate // don't clobber a newer update with an older one + } else { + NullableReference(newUpdate) } } diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt index 7098a390886..a4ff85d04f0 100644 --- a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt @@ -19,6 +19,7 @@ package com.google.firebase.dataconnect.testutil import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.update /** * An implementation of [java.util.concurrent.CountDownLatch] that suspends instead of blocking. @@ -60,14 +61,10 @@ class SuspendingCountDownLatch(count: Int) { * @throws IllegalStateException if called when the count has already reached zero. */ fun countDown(): SuspendingCountDownLatch { - while (true) { - val oldValue = _count.value + _count.update { oldValue -> check(oldValue > 0) { "countDown() called too many times (oldValue=$oldValue)" } - - val newValue = oldValue - 1 - if (_count.compareAndSet(oldValue, newValue)) { - return this - } + oldValue - 1 } + return this } } From c9d70a3e1be9c149dbe56c6dba2c90c5d26f6bdf Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 9 Apr 2025 19:52:33 +0000 Subject: [PATCH 07/15] FirebaseDataConnectImpl.kt: fix minor typo in comment --- .../google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 38cdcdcf12c..c96c990b7c0 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -432,7 +432,7 @@ internal class FirebaseDataConnectImpl( } } - // If the update "close job" was the one that we created, then start it! + // If the updated "close job" was the one that we created, then start it! if (updatedCloseJob.ref === newCloseJob) { newCloseJob.start() } From 9658ae466a113134c1f4ff6febd1b9324b1ce505 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 10 Apr 2025 03:15:47 -0400 Subject: [PATCH 08/15] FirebaseDataConnectImpl.kt: use MutableStateFlow to store state, rather than mutexes --- .../core/FirebaseDataConnectImpl.kt | 361 ++++++++++-------- .../dataconnect/core/MutationRefImpl.kt | 3 +- .../firebase/dataconnect/core/QueryRefImpl.kt | 2 +- .../dataconnect/core/QuerySubscriptionImpl.kt | 4 +- .../core/MutationRefImplUnitTest.kt | 23 +- .../dataconnect/core/QueryRefImplUnitTest.kt | 9 +- 6 files changed, 220 insertions(+), 182 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index c96c990b7c0..9f7838638e9 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -33,8 +33,6 @@ import com.google.firebase.dataconnect.querymgr.LiveQueries import com.google.firebase.dataconnect.querymgr.LiveQuery import com.google.firebase.dataconnect.querymgr.QueryManager import com.google.firebase.dataconnect.querymgr.RegisteredDataDeserializer -import com.google.firebase.dataconnect.util.NullableReference -import com.google.firebase.dataconnect.util.SuspendingLazy import com.google.firebase.util.nextAlphanumericString import com.google.protobuf.Struct import java.util.concurrent.Executor @@ -54,10 +52,9 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.update import kotlinx.coroutines.flow.updateAndGet import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.serialization.DeserializationStrategy import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.modules.SerializersModule @@ -71,8 +68,8 @@ internal interface FirebaseDataConnectInternal : FirebaseDataConnect { val nonBlockingExecutor: Executor val nonBlockingDispatcher: CoroutineDispatcher - val lazyGrpcClient: SuspendingLazy - val lazyQueryManager: SuspendingLazy + val grpcClient: DataConnectGrpcClient + val queryManager: QueryManager suspend fun awaitAuthReady() suspend fun awaitAppCheckReady() @@ -119,15 +116,6 @@ internal class FirebaseDataConnectImpl( } ) - // Protects `closed`, `grpcClient`, `emulatorSettings`, and `queryManager`. - private val mutex = Mutex() - - // All accesses to this variable _must_ have locked `mutex`. - private var emulatorSettings: EmulatedServiceSettings? = null - - // All accesses to this variable _must_ have locked `mutex`. - private var closed = false - private val dataConnectAuth: DataConnectAuth = DataConnectAuth( deferredAuthProvider = deferredAuthProvider, @@ -152,127 +140,171 @@ internal class FirebaseDataConnectImpl( dataConnectAppCheck.awaitTokenProvider() } - private val lazyGrpcRPCs = - SuspendingLazy(mutex) { - if (closed) throw IllegalStateException("FirebaseDataConnect instance has been closed") + private sealed interface State { + data class New(val emulatorSettings: EmulatedServiceSettings?) : State { + constructor() : this(null) + } + data class Initialized( + val grpcRPCs: DataConnectGrpcRPCs, + val grpcClient: DataConnectGrpcClient, + val queryManager: QueryManager + ) : State + data class Closing(val grpcRPCs: DataConnectGrpcRPCs, val closeJob: Deferred) : State + object Closed : State + } - data class DataConnectBackendInfo( - val host: String, - val sslEnabled: Boolean, - val isEmulator: Boolean - ) - val backendInfoFromSettings = - DataConnectBackendInfo( - host = settings.host, - sslEnabled = settings.sslEnabled, - isEmulator = false - ) - val backendInfoFromEmulatorSettings = - emulatorSettings?.run { - DataConnectBackendInfo(host = "$host:$port", sslEnabled = false, isEmulator = true) - } - val backendInfo = - if (backendInfoFromEmulatorSettings == null) { - backendInfoFromSettings - } else { - if (!settings.isDefaultHost()) { - logger.warn( - "Host has been set in DataConnectSettings and useEmulator, " + - "emulator host will be used." - ) + private val state = MutableStateFlow(State.New()) + + override val grpcClient: DataConnectGrpcClient + get() = initialize().grpcClient + override val queryManager: QueryManager + get() = initialize().queryManager + + private fun initialize(): State.Initialized { + val newState = + state.updateAndGet { oldState -> + when (oldState) { + is State.New -> { + val grpcRPCs = createDataConnectGrpcRPCs(oldState.emulatorSettings) + val grpcClient = createDataConnectGrpcClient(grpcRPCs) + val queryManager = createQueryManager(grpcClient) + State.Initialized(grpcRPCs, grpcClient, queryManager) } - backendInfoFromEmulatorSettings + is State.Initialized -> oldState + is State.Closing -> oldState + is State.Closed -> oldState } + } - logger.debug { "connecting to Data Connect backend: $backendInfo" } - val grpcMetadata = - DataConnectGrpcMetadata.forSystemVersions( - firebaseApp = app, - dataConnectAuth = dataConnectAuth, - dataConnectAppCheck = dataConnectAppCheck, - connectorLocation = config.location, - parentLogger = logger, - ) - val dataConnectGrpcRPCs = - DataConnectGrpcRPCs( - context = context, - host = backendInfo.host, - sslEnabled = backendInfo.sslEnabled, - blockingCoroutineDispatcher = blockingDispatcher, - grpcMetadata = grpcMetadata, - parentLogger = logger, + return when (newState) { + is State.New -> + throw IllegalStateException( + "newState should be Initialized, but got New (error code sh2rf4wwjx)" ) + is State.Initialized -> newState + is State.Closing, + State.Closed -> throw IllegalStateException("FirebaseDataConnect instance has been closed") + } + } - if (backendInfo.isEmulator) { - logEmulatorVersion(dataConnectGrpcRPCs) - streamEmulatorErrors(dataConnectGrpcRPCs) + private fun createDataConnectGrpcRPCs( + emulatorSettings: EmulatedServiceSettings? + ): DataConnectGrpcRPCs { + data class DataConnectBackendInfo( + val host: String, + val sslEnabled: Boolean, + val isEmulator: Boolean + ) + val backendInfoFromSettings = + DataConnectBackendInfo( + host = settings.host, + sslEnabled = settings.sslEnabled, + isEmulator = false + ) + val backendInfoFromEmulatorSettings = + emulatorSettings?.run { + DataConnectBackendInfo(host = "$host:$port", sslEnabled = false, isEmulator = true) + } + val backendInfo = + if (backendInfoFromEmulatorSettings == null) { + backendInfoFromSettings + } else { + if (!settings.isDefaultHost()) { + logger.warn( + "Host has been set in DataConnectSettings and useEmulator, " + + "emulator host will be used." + ) + } + backendInfoFromEmulatorSettings } - dataConnectGrpcRPCs - } - - override val lazyGrpcClient = - SuspendingLazy(mutex) { - DataConnectGrpcClient( - projectId = projectId, - connector = config, - grpcRPCs = lazyGrpcRPCs.getLocked(), + logger.debug { "connecting to Data Connect backend: $backendInfo" } + val grpcMetadata = + DataConnectGrpcMetadata.forSystemVersions( + firebaseApp = app, dataConnectAuth = dataConnectAuth, dataConnectAppCheck = dataConnectAppCheck, - logger = Logger("DataConnectGrpcClient").apply { debug { "created by $instanceId" } }, + connectorLocation = config.location, + parentLogger = logger, + ) + val dataConnectGrpcRPCs = + DataConnectGrpcRPCs( + context = context, + host = backendInfo.host, + sslEnabled = backendInfo.sslEnabled, + blockingCoroutineDispatcher = blockingDispatcher, + grpcMetadata = grpcMetadata, + parentLogger = logger, ) - } - override val lazyQueryManager = - SuspendingLazy(mutex) { - if (closed) throw IllegalStateException("FirebaseDataConnect instance has been closed") - val grpcClient = lazyGrpcClient.getLocked() - - val registeredDataDeserializerFactory = - object : LiveQuery.RegisteredDataDeserializerFactory { - override fun newInstance( - dataDeserializer: DeserializationStrategy, - dataSerializersModule: SerializersModule?, - parentLogger: Logger - ) = - RegisteredDataDeserializer( - dataDeserializer = dataDeserializer, - dataSerializersModule = dataSerializersModule, - blockingCoroutineDispatcher = blockingDispatcher, - parentLogger = parentLogger, - ) - } - val liveQueryFactory = - object : LiveQueries.LiveQueryFactory { - override fun newLiveQuery( - key: LiveQuery.Key, - operationName: String, - variables: Struct, - parentLogger: Logger - ) = - LiveQuery( - key = key, - operationName = operationName, - variables = variables, - parentCoroutineScope = coroutineScope, - nonBlockingCoroutineDispatcher = nonBlockingDispatcher, - grpcClient = grpcClient, - registeredDataDeserializerFactory = registeredDataDeserializerFactory, - parentLogger = parentLogger, - ) - } - val liveQueries = LiveQueries(liveQueryFactory, blockingDispatcher, parentLogger = logger) - QueryManager(liveQueries) + if (backendInfo.isEmulator) { + logEmulatorVersion(dataConnectGrpcRPCs) + streamEmulatorErrors(dataConnectGrpcRPCs) } + return dataConnectGrpcRPCs + } + + private fun createDataConnectGrpcClient(grpcRPCs: DataConnectGrpcRPCs): DataConnectGrpcClient = + DataConnectGrpcClient( + projectId = projectId, + connector = config, + grpcRPCs = grpcRPCs, + dataConnectAuth = dataConnectAuth, + dataConnectAppCheck = dataConnectAppCheck, + logger = Logger("DataConnectGrpcClient").apply { debug { "created by $instanceId" } }, + ) + + private fun createQueryManager(grpcClient: DataConnectGrpcClient): QueryManager { + val registeredDataDeserializerFactory = + object : LiveQuery.RegisteredDataDeserializerFactory { + override fun newInstance( + dataDeserializer: DeserializationStrategy, + dataSerializersModule: SerializersModule?, + parentLogger: Logger + ) = + RegisteredDataDeserializer( + dataDeserializer = dataDeserializer, + dataSerializersModule = dataSerializersModule, + blockingCoroutineDispatcher = blockingDispatcher, + parentLogger = parentLogger, + ) + } + val liveQueryFactory = + object : LiveQueries.LiveQueryFactory { + override fun newLiveQuery( + key: LiveQuery.Key, + operationName: String, + variables: Struct, + parentLogger: Logger + ) = + LiveQuery( + key = key, + operationName = operationName, + variables = variables, + parentCoroutineScope = coroutineScope, + nonBlockingCoroutineDispatcher = nonBlockingDispatcher, + grpcClient = grpcClient, + registeredDataDeserializerFactory = registeredDataDeserializerFactory, + parentLogger = parentLogger, + ) + } + val liveQueries = LiveQueries(liveQueryFactory, blockingDispatcher, parentLogger = logger) + return QueryManager(liveQueries) + } + override fun useEmulator(host: String, port: Int): Unit = runBlocking { - mutex.withLock { - if (lazyGrpcClient.initializedValueOrNull != null) { - throw IllegalStateException( - "Cannot call useEmulator() after instance has already been initialized." - ) + state.update { oldState -> + when (oldState) { + is State.New -> + oldState.copy(emulatorSettings = EmulatedServiceSettings(host = host, port = port)) + is State.Initialized -> + throw IllegalStateException( + "Cannot call useEmulator() after instance has already been initialized." + ) + is State.Closing -> oldState + is State.Closed -> oldState } - emulatorSettings = EmulatedServiceSettings(host = host, port = port) } } @@ -380,19 +412,9 @@ internal class FirebaseDataConnectImpl( ) } - private val closeJob = MutableStateFlow(NullableReference>(null)) - override fun close() { logger.debug { "close() called" } - @Suppress("DeferredResultUnused") runBlocking { nonBlockingClose() } - } - override suspend fun suspendingClose() { - logger.debug { "suspendingClose() called" } - nonBlockingClose().await() - } - - private suspend fun nonBlockingClose(): Deferred { coroutineScope.cancel() // Remove the reference to this `FirebaseDataConnect` instance from the @@ -400,47 +422,70 @@ internal class FirebaseDataConnectImpl( // called with the same arguments that a new instance of `FirebaseDataConnect` will be created. creator.remove(this) - mutex.withLock { closed = true } - // Close Auth and AppCheck synchronously to avoid race conditions with auth callbacks. // Since close() is re-entrant, this is safe even if they have already been closed. dataConnectAuth.close() dataConnectAppCheck.close() - // Create the "close job" to asynchronously close the gRPC client. - @OptIn(DelicateCoroutinesApi::class) - val newCloseJob = - GlobalScope.async(start = CoroutineStart.LAZY) { - lazyGrpcRPCs.initializedValueOrNull?.close() - } - newCloseJob.invokeOnCompletion { exception -> - if (exception === null) { - logger.debug { "close() completed successfully" } - } else { - logger.warn(exception) { "close() failed" } + fun createCloseJob(grpcRPCs: DataConnectGrpcRPCs): Deferred { + @OptIn(DelicateCoroutinesApi::class) + val closeJob = GlobalScope.async(start = CoroutineStart.LAZY) { grpcRPCs.close() } + closeJob.invokeOnCompletion { exception -> + if (exception !== null) { + logger.warn(exception) { "close() failed" } + } else { + logger.debug { "close() completed successfully" } + state.update { oldState -> + check(oldState is State.Closing) { + "oldState is ${oldState}, but expected Closed (error code hsee7gfxvz)" + } + check(oldState.closeJob === closeJob) { + "oldState.closeJob is ${oldState.closeJob}, but expected $closeJob (error code n3x86pr6qn)" + } + State.Closed + } + } } + return closeJob } - // Register the new "close job", unless there is a "close job" already in progress or one that - // completed successfully. - val updatedCloseJob = - closeJob.updateAndGet { oldCloseJob -> - if (oldCloseJob.ref !== null && !oldCloseJob.ref.isCancelled) { - oldCloseJob - } else { - NullableReference(newCloseJob) + val newState = + state.updateAndGet { oldState -> + when (oldState) { + is State.New -> State.Closed + is State.Initialized -> + State.Closing(oldState.grpcRPCs, createCloseJob(oldState.grpcRPCs)) + is State.Closing -> + if (oldState.closeJob.isCancelled) { + oldState.copy(closeJob = createCloseJob(oldState.grpcRPCs)) + } else { + oldState + } + is State.Closed -> oldState } } - // If the updated "close job" was the one that we created, then start it! - if (updatedCloseJob.ref === newCloseJob) { - newCloseJob.start() + if (newState is State.Closing) { + newState.closeJob.start() } + } + + override suspend fun suspendingClose() { + logger.debug { "suspendingClose() called" } - // Return the job "close job" that is active or already completed so that the caller can await - // its result. - return checkNotNull(updatedCloseJob.ref) { - "updatedCloseJob.ref should not have been null (error code y5fk4ntdnd)" + close() + + when (val state = state.value) { + is State.Initialized -> + throw IllegalStateException( + "state.value should be Closed or Closing, but got Initialized (error code n3x86pr6qn)" + ) + is State.New -> + throw IllegalStateException( + "state.value should be Closed or Closing, but got New (error code mr6vccmvcf)" + ) + is State.Closing -> state.closeJob.await() + State.Closed -> {} } } diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt index edd978a2ec9..8e5684c2fe6 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt @@ -60,8 +60,7 @@ internal class MutationRefImpl( override suspend fun execute(): MutationResultImpl { val requestId = "mut" + Random.nextAlphanumericString(length = 10) - return dataConnect.lazyGrpcClient - .get() + return dataConnect.grpcClient .executeMutation( requestId = requestId, operationName = operationName, diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt index 53e247e3d14..1b630a12cfd 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt @@ -49,7 +49,7 @@ internal class QueryRefImpl( variablesSerializersModule = variablesSerializersModule, ) { override suspend fun execute(): QueryResultImpl = - dataConnect.lazyQueryManager.get().execute(this).let { QueryResultImpl(it.ref.getOrThrow()) } + dataConnect.queryManager.execute(this).let { QueryResultImpl(it.ref.getOrThrow()) } override fun subscribe(): QuerySubscription = QuerySubscriptionImpl(this) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt index 2ca9aea6771..7ec870e93ce 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt @@ -58,7 +58,7 @@ internal class QuerySubscriptionImpl(query: QueryRefImpl val querySubscriptionResult = QuerySubscriptionResultImpl(query, sequencedResult) send(querySubscriptionResult) @@ -70,7 +70,7 @@ internal class QuerySubscriptionImpl(query: QueryRefImpl(relaxed = true) { every { blockingDispatcher } returns UnconfinedTestDispatcher(testScheduler) - every { lazyGrpcClient } returns - SuspendingLazy { - mockk { - coEvery { - executeMutation( - capture(requestIdSlot), - capture(operationNameSlot), - capture(variablesSlot), - capture(callerSdkTypeSlot), - ) - } returns result.getOrThrow() - } + every { grpcClient } returns + mockk { + coEvery { + executeMutation( + capture(requestIdSlot), + capture(operationNameSlot), + capture(variablesSlot), + capture(callerSdkTypeSlot), + ) + } returns result.getOrThrow() } } } diff --git a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt index fc6baffe601..904779cb4ce 100644 --- a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt @@ -29,7 +29,6 @@ import com.google.firebase.dataconnect.testutil.property.arbitrary.queryRefImpl import com.google.firebase.dataconnect.testutil.property.arbitrary.shouldHavePropertiesEqualTo import com.google.firebase.dataconnect.testutil.shouldContainWithNonAbuttingText import com.google.firebase.dataconnect.util.SequencedReference -import com.google.firebase.dataconnect.util.SuspendingLazy import io.kotest.assertions.assertSoftly import io.kotest.assertions.throwables.shouldThrow import io.kotest.assertions.withClue @@ -577,11 +576,9 @@ class QueryRefImplUnitTest { querySlot: CapturingSlot> ): FirebaseDataConnectInternal = mockk(relaxed = true) { - every { lazyQueryManager } returns - SuspendingLazy { - mockk { - coEvery { execute(capture(querySlot)) } returns SequencedReference(123, result) - } + every { queryManager } returns + mockk { + coEvery { execute(capture(querySlot)) } returns SequencedReference(123, result) } } } From 3aef735192af10bcb52490f9722e9c02e55fbb04 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 15 Apr 2025 14:06:08 -0400 Subject: [PATCH 09/15] minor cleanup --- .../core/FirebaseDataConnectImpl.kt | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index c96c990b7c0..d05ca685db2 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -421,27 +421,28 @@ internal class FirebaseDataConnectImpl( } } - // Register the new "close job", unless there is a "close job" already in progress or one that - // completed successfully. - val updatedCloseJob = - closeJob.updateAndGet { oldCloseJob -> - if (oldCloseJob.ref !== null && !oldCloseJob.ref.isCancelled) { - oldCloseJob + // Register the new "close job". Do not overwrite a close job that is already in progress (to + // avoid having more than one close job in progress at a time) or a close job that completed + // successfully (since there is nothing to do if a previous close job was successful). + val updatedCloseJobRef = + closeJob.updateAndGet { oldCloseJobRef: NullableReference> -> + if (oldCloseJobRef.ref !== null && !oldCloseJobRef.ref.isCancelled) { + oldCloseJobRef } else { NullableReference(newCloseJob) } } - // If the updated "close job" was the one that we created, then start it! - if (updatedCloseJob.ref === newCloseJob) { - newCloseJob.start() - } + // Start the updated "close job" (if it was already started then start() is a no-op). + val updatedCloseJob = + checkNotNull(updatedCloseJobRef.ref) { + "internal error: closeJob.updateAndGet() returned a NullableReference whose 'ref' " + + "property was null; however it should NOT have been null (error code y5fk4ntdnd)" + } + updatedCloseJob.start() - // Return the job "close job" that is active or already completed so that the caller can await - // its result. - return checkNotNull(updatedCloseJob.ref) { - "updatedCloseJob.ref should not have been null (error code y5fk4ntdnd)" - } + // Return the "close job", which _may_ already be completed, so the caller can await it. + return updatedCloseJob } // The generated SDK relies on equals() and hashCode() using object identity. From c762afd83d98297664649f3422f2850955f16d6d Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 15 Apr 2025 14:20:06 -0400 Subject: [PATCH 10/15] minor cleanup --- .../core/FirebaseDataConnectImpl.kt | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 9f7838638e9..2ba55f22d00 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -414,7 +414,15 @@ internal class FirebaseDataConnectImpl( override fun close() { logger.debug { "close() called" } + @Suppress("DeferredResultUnused") closeInternal() + } + + override suspend fun suspendingClose() { + logger.debug { "suspendingClose() called" } + closeInternal()?.await() + } + private fun closeInternal(): Deferred? { coroutineScope.cancel() // Remove the reference to this `FirebaseDataConnect` instance from the @@ -437,10 +445,11 @@ internal class FirebaseDataConnectImpl( logger.debug { "close() completed successfully" } state.update { oldState -> check(oldState is State.Closing) { - "oldState is ${oldState}, but expected Closed (error code hsee7gfxvz)" + "oldState is ${oldState}, but expected Closing (error code hsee7gfxvz)" } check(oldState.closeJob === closeJob) { - "oldState.closeJob is ${oldState.closeJob}, but expected $closeJob (error code n3x86pr6qn)" + "oldState.closeJob is ${oldState.closeJob}, but expected $closeJob " + + "(error code n3x86pr6qn)" } State.Closed } @@ -461,31 +470,19 @@ internal class FirebaseDataConnectImpl( } else { oldState } - is State.Closed -> oldState + is State.Closed -> State.Closed } } - if (newState is State.Closing) { - newState.closeJob.start() - } - } - - override suspend fun suspendingClose() { - logger.debug { "suspendingClose() called" } - - close() - - when (val state = state.value) { - is State.Initialized -> - throw IllegalStateException( - "state.value should be Closed or Closing, but got Initialized (error code n3x86pr6qn)" - ) + return when (newState) { + is State.Initialized, is State.New -> throw IllegalStateException( - "state.value should be Closed or Closing, but got New (error code mr6vccmvcf)" + "internal error: newState is $newState, but expected Closing or Closed " + + "(error code n3x86pr6qn)" ) - is State.Closing -> state.closeJob.await() - State.Closed -> {} + is State.Closing -> newState.closeJob.apply { start() } + is State.Closed -> null } } From 9f3b7942b05a51a73132c495f9c4a203d57eb5a0 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 16 Apr 2025 13:53:41 -0400 Subject: [PATCH 11/15] Rename "old" state variables to "current" state, as suggested by Copilot: https://github.com/firebase/firebase-android-sdk/pull/6840#pullrequestreview-2773350309 --- .../core/DataConnectCredentialsTokenManager.kt | 18 +++++++++--------- .../core/FirebaseDataConnectImpl.kt | 6 +++--- .../testutil/SuspendingCountDownLatch.kt | 6 +++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index 10d9a1ce4f9..15b3a729821 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -193,12 +193,12 @@ internal sealed class DataConnectCredentialsTokenManager( fun forceRefresh() { logger.debug { "forceRefresh()" } val oldState = - state.getAndUpdate { oldState -> - when (oldState) { + state.getAndUpdate { currentState -> + when (currentState) { is State.Closed -> State.Closed - is State.New -> oldState.copy(forceTokenRefresh = true) - is State.Idle -> oldState.copy(forceTokenRefresh = true) - is State.Active -> State.Idle(oldState.provider, forceTokenRefresh = true) + is State.New -> currentState.copy(forceTokenRefresh = true) + is State.Idle -> currentState.copy(forceTokenRefresh = true) + is State.Active -> State.Idle(currentState.provider, forceTokenRefresh = true) } } @@ -340,11 +340,11 @@ internal sealed class DataConnectCredentialsTokenManager( addTokenListener(newProvider) val oldState = - state.getAndUpdate { oldState -> - when (oldState) { + state.getAndUpdate { currentState -> + when (currentState) { is State.Closed -> State.Closed - is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh) - is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh) + is State.New -> State.Idle(newProvider, currentState.forceTokenRefresh) + is State.Idle -> State.Idle(newProvider, currentState.forceTokenRefresh) is State.Active -> State.Idle(newProvider, forceTokenRefresh = false) } } diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index d05ca685db2..9ad64cc6054 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -425,9 +425,9 @@ internal class FirebaseDataConnectImpl( // avoid having more than one close job in progress at a time) or a close job that completed // successfully (since there is nothing to do if a previous close job was successful). val updatedCloseJobRef = - closeJob.updateAndGet { oldCloseJobRef: NullableReference> -> - if (oldCloseJobRef.ref !== null && !oldCloseJobRef.ref.isCancelled) { - oldCloseJobRef + closeJob.updateAndGet { currentCloseJobRef: NullableReference> -> + if (currentCloseJobRef.ref !== null && !currentCloseJobRef.ref.isCancelled) { + currentCloseJobRef } else { NullableReference(newCloseJob) } diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt index a4ff85d04f0..b7828e0c36a 100644 --- a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt @@ -61,9 +61,9 @@ class SuspendingCountDownLatch(count: Int) { * @throws IllegalStateException if called when the count has already reached zero. */ fun countDown(): SuspendingCountDownLatch { - _count.update { oldValue -> - check(oldValue > 0) { "countDown() called too many times (oldValue=$oldValue)" } - oldValue - 1 + _count.update { currentValue -> + check(currentValue > 0) { "countDown() called too many times (currentValue=$currentValue)" } + currentValue - 1 } return this } From 7e482199260167bc39d7922884a68cbc6b5b9d43 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 16 Apr 2025 13:53:41 -0400 Subject: [PATCH 12/15] Rename "old" state variables to "current" state, as suggested by Copilot: https://github.com/firebase/firebase-android-sdk/pull/6840#pullrequestreview-2773350309 --- .../DataConnectCredentialsTokenManager.kt | 18 ++++---- .../core/FirebaseDataConnectImpl.kt | 44 +++++++++---------- .../testutil/SuspendingCountDownLatch.kt | 6 +-- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index 10d9a1ce4f9..15b3a729821 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -193,12 +193,12 @@ internal sealed class DataConnectCredentialsTokenManager( fun forceRefresh() { logger.debug { "forceRefresh()" } val oldState = - state.getAndUpdate { oldState -> - when (oldState) { + state.getAndUpdate { currentState -> + when (currentState) { is State.Closed -> State.Closed - is State.New -> oldState.copy(forceTokenRefresh = true) - is State.Idle -> oldState.copy(forceTokenRefresh = true) - is State.Active -> State.Idle(oldState.provider, forceTokenRefresh = true) + is State.New -> currentState.copy(forceTokenRefresh = true) + is State.Idle -> currentState.copy(forceTokenRefresh = true) + is State.Active -> State.Idle(currentState.provider, forceTokenRefresh = true) } } @@ -340,11 +340,11 @@ internal sealed class DataConnectCredentialsTokenManager( addTokenListener(newProvider) val oldState = - state.getAndUpdate { oldState -> - when (oldState) { + state.getAndUpdate { currentState -> + when (currentState) { is State.Closed -> State.Closed - is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh) - is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh) + is State.New -> State.Idle(newProvider, currentState.forceTokenRefresh) + is State.Idle -> State.Idle(newProvider, currentState.forceTokenRefresh) is State.Active -> State.Idle(newProvider, forceTokenRefresh = false) } } diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 2ba55f22d00..accd230aff4 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -162,17 +162,17 @@ internal class FirebaseDataConnectImpl( private fun initialize(): State.Initialized { val newState = - state.updateAndGet { oldState -> - when (oldState) { + state.updateAndGet { currentState -> + when (currentState) { is State.New -> { - val grpcRPCs = createDataConnectGrpcRPCs(oldState.emulatorSettings) + val grpcRPCs = createDataConnectGrpcRPCs(currentState.emulatorSettings) val grpcClient = createDataConnectGrpcClient(grpcRPCs) val queryManager = createQueryManager(grpcClient) State.Initialized(grpcRPCs, grpcClient, queryManager) } - is State.Initialized -> oldState - is State.Closing -> oldState - is State.Closed -> oldState + is State.Initialized -> currentState + is State.Closing -> currentState + is State.Closed -> currentState } } @@ -294,16 +294,16 @@ internal class FirebaseDataConnectImpl( } override fun useEmulator(host: String, port: Int): Unit = runBlocking { - state.update { oldState -> - when (oldState) { + state.update { currentState -> + when (currentState) { is State.New -> - oldState.copy(emulatorSettings = EmulatedServiceSettings(host = host, port = port)) + currentState.copy(emulatorSettings = EmulatedServiceSettings(host = host, port = port)) is State.Initialized -> throw IllegalStateException( "Cannot call useEmulator() after instance has already been initialized." ) - is State.Closing -> oldState - is State.Closed -> oldState + is State.Closing -> currentState + is State.Closed -> currentState } } } @@ -443,12 +443,12 @@ internal class FirebaseDataConnectImpl( logger.warn(exception) { "close() failed" } } else { logger.debug { "close() completed successfully" } - state.update { oldState -> - check(oldState is State.Closing) { - "oldState is ${oldState}, but expected Closing (error code hsee7gfxvz)" + state.update { currentState -> + check(currentState is State.Closing) { + "currentState is ${currentState}, but expected Closing (error code hsee7gfxvz)" } - check(oldState.closeJob === closeJob) { - "oldState.closeJob is ${oldState.closeJob}, but expected $closeJob " + + check(currentState.closeJob === closeJob) { + "currentState.closeJob is ${currentState.closeJob}, but expected $closeJob " + "(error code n3x86pr6qn)" } State.Closed @@ -459,16 +459,16 @@ internal class FirebaseDataConnectImpl( } val newState = - state.updateAndGet { oldState -> - when (oldState) { + state.updateAndGet { currentState -> + when (currentState) { is State.New -> State.Closed is State.Initialized -> - State.Closing(oldState.grpcRPCs, createCloseJob(oldState.grpcRPCs)) + State.Closing(currentState.grpcRPCs, createCloseJob(currentState.grpcRPCs)) is State.Closing -> - if (oldState.closeJob.isCancelled) { - oldState.copy(closeJob = createCloseJob(oldState.grpcRPCs)) + if (currentState.closeJob.isCancelled) { + currentState.copy(closeJob = createCloseJob(currentState.grpcRPCs)) } else { - oldState + currentState } is State.Closed -> State.Closed } diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt index a4ff85d04f0..b7828e0c36a 100644 --- a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt @@ -61,9 +61,9 @@ class SuspendingCountDownLatch(count: Int) { * @throws IllegalStateException if called when the count has already reached zero. */ fun countDown(): SuspendingCountDownLatch { - _count.update { oldValue -> - check(oldValue > 0) { "countDown() called too many times (oldValue=$oldValue)" } - oldValue - 1 + _count.update { currentValue -> + check(currentValue > 0) { "countDown() called too many times (currentValue=$currentValue)" } + currentValue - 1 } return this } From b1f01b0530ea88f93a4c351975422270485d9435 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 16 Apr 2025 14:48:35 -0400 Subject: [PATCH 13/15] DataConnectCredentialsTokenManager.kt: verify the new state in forceRefresh() to avoid unintentional internal state corruption. This was suggested by Copilot: https://github.com/firebase/firebase-android-sdk/pull/6840#pullrequestreview-2773477393 --- .../DataConnectCredentialsTokenManager.kt | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index 15b3a729821..d96e544d6af 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -194,12 +194,23 @@ internal sealed class DataConnectCredentialsTokenManager( logger.debug { "forceRefresh()" } val oldState = state.getAndUpdate { currentState -> - when (currentState) { - is State.Closed -> State.Closed - is State.New -> currentState.copy(forceTokenRefresh = true) - is State.Idle -> currentState.copy(forceTokenRefresh = true) - is State.Active -> State.Idle(currentState.provider, forceTokenRefresh = true) + val newState = + when (currentState) { + is State.Closed -> State.Closed + is State.New -> currentState.copy(forceTokenRefresh = true) + is State.Idle -> currentState.copy(forceTokenRefresh = true) + is State.Active -> State.Idle(currentState.provider, forceTokenRefresh = true) + } + + check(newState is State.Closed || newState is State.StateWithForceTokenRefresh) { + "internal error gbazc7qr66: newState should have been Closed or " + + "StateWithForceTokenRefresh, but got: $newState" + } + check((newState as? State.StateWithForceTokenRefresh)?.forceTokenRefresh !== false) { + "internal error fnzwyrsez2: newState.forceTokenRefresh should have been true" } + + newState } when (oldState) { From 7946f5a764004ea3210dc7acb39aef4f7aa0e6c6 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 16 Apr 2025 15:47:50 -0400 Subject: [PATCH 14/15] fix build due to bad merge --- .../core/FirebaseDataConnectImpl.kt | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index eeeb9a3c869..accd230aff4 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -484,29 +484,6 @@ internal class FirebaseDataConnectImpl( is State.Closing -> newState.closeJob.apply { start() } is State.Closed -> null } - - // Register the new "close job". Do not overwrite a close job that is already in progress (to - // avoid having more than one close job in progress at a time) or a close job that completed - // successfully (since there is nothing to do if a previous close job was successful). - val updatedCloseJobRef = - closeJob.updateAndGet { currentCloseJobRef: NullableReference> -> - if (currentCloseJobRef.ref !== null && !currentCloseJobRef.ref.isCancelled) { - currentCloseJobRef - } else { - NullableReference(newCloseJob) - } - } - - // Start the updated "close job" (if it was already started then start() is a no-op). - val updatedCloseJob = - checkNotNull(updatedCloseJobRef.ref) { - "internal error: closeJob.updateAndGet() returned a NullableReference whose 'ref' " + - "property was null; however it should NOT have been null (error code y5fk4ntdnd)" - } - updatedCloseJob.start() - - // Return the "close job", which _may_ already be completed, so the caller can await it. - return updatedCloseJob } // The generated SDK relies on equals() and hashCode() using object identity. From b8f0fc2c44c77060d6017200d8a8faea2fd67e26 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 22 Apr 2025 11:48:40 -0400 Subject: [PATCH 15/15] CHANGELOG.md entry added --- firebase-dataconnect/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/firebase-dataconnect/CHANGELOG.md b/firebase-dataconnect/CHANGELOG.md index ef07e21a09d..3fb6b3e9e8f 100644 --- a/firebase-dataconnect/CHANGELOG.md +++ b/firebase-dataconnect/CHANGELOG.md @@ -1,5 +1,7 @@ # Unreleased - +* [changed] Code robustness improvements related to state management in + `FirebaseDataConnect` objects. + ([#6861](https://github.com/firebase/firebase-android-sdk/pull/6861)) # 16.0.0 * [changed] DataConnectOperationException added, enabling support for partial