diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt index ba314d0f9db..5f240960b7e 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/AuthIntegrationTest.kt @@ -17,10 +17,10 @@ package com.google.firebase.dataconnect import com.google.firebase.auth.FirebaseAuth -import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal import com.google.firebase.dataconnect.testutil.DataConnectBackend import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcServer +import com.google.firebase.dataconnect.testutil.awaitAuthReady import com.google.firebase.dataconnect.testutil.newInstance import com.google.firebase.dataconnect.testutil.property.arbitrary.dataConnect import com.google.firebase.dataconnect.testutil.schemas.PersonSchema @@ -202,7 +202,7 @@ class AuthIntegrationTest : DataConnectIntegrationTestBase() { } private suspend fun signIn() { - (personSchema.dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + personSchema.dataConnect.awaitAuthReady() val authResult = auth.run { signInAnonymously().await() } withClue("authResult.user returned from signInAnonymously()") { authResult.user.shouldNotBeNull() diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt index 8ff7715ddb9..394c2195332 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/GrpcMetadataIntegrationTest.kt @@ -23,7 +23,6 @@ import com.google.android.gms.tasks.Tasks import com.google.firebase.appcheck.AppCheckProvider import com.google.firebase.appcheck.AppCheckProviderFactory import com.google.firebase.appcheck.FirebaseAppCheck -import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal import com.google.firebase.dataconnect.generated.GeneratedConnector import com.google.firebase.dataconnect.generated.GeneratedMutation import com.google.firebase.dataconnect.generated.GeneratedQuery @@ -32,6 +31,8 @@ import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase import com.google.firebase.dataconnect.testutil.DataConnectTestAppCheckToken import com.google.firebase.dataconnect.testutil.FirebaseAuthBackend import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcServer +import com.google.firebase.dataconnect.testutil.awaitAppCheckReady +import com.google.firebase.dataconnect.testutil.awaitAuthReady import com.google.firebase.dataconnect.testutil.getFirebaseAppIdFromStrings import com.google.firebase.dataconnect.testutil.newInstance import com.google.firebase.dataconnect.util.SuspendingLazy @@ -138,7 +139,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldNotSendAuthMetadataWhenNotLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val queryRef = dataConnect.query("qryfyk7yfppfe", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -151,7 +152,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldNotSendAuthMetadataWhenNotLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val mutationRef = dataConnect.mutation("mutckjpte9v9j", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -165,7 +166,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldSendAuthMetadataWhenLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } firebaseAuthSignIn(dataConnect) @@ -179,7 +180,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldSendAuthMetadataWhenLoggedIn() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val mutationRef = dataConnect.mutation("mutayn7as5k7d", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -194,7 +195,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldNotSendAuthMetadataAfterLogout() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer(), serializer()) val metadatasJob1 = async { grpcServer.metadatas.first() } val metadatasJob2 = async { grpcServer.metadatas.take(2).last() } @@ -212,7 +213,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldNotSendAuthMetadataAfterLogout() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAuthReady() + dataConnect.awaitAuthReady() val mutationRef = dataConnect.mutation("mutvw945ag3vv", Unit, serializer(), serializer()) val metadatasJob1 = async { grpcServer.metadatas.first() } @@ -233,7 +234,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { // appcheck token is sent at all. val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val queryRef = dataConnect.query("qrybbeekpkkck", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -248,7 +249,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { // appcheck token is sent at all. val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val mutationRef = dataConnect.mutation("mutbs7hhxk39c", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } @@ -262,7 +263,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeQueryShouldSendAppCheckMetadataWhenAppCheckIsEnabled() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val queryRef = dataConnect.query("qryyarwrxe2fv", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } val appCheck = FirebaseAppCheck.getInstance(dataConnect.app) @@ -277,7 +278,7 @@ class GrpcMetadataIntegrationTest : DataConnectIntegrationTestBase() { fun executeMutationShouldSendAppCheckMetadataWhenAppCheckIsEnabled() = runTest { val grpcServer = inProcessDataConnectGrpcServer.newInstance() val dataConnect = dataConnectFactory.newInstance(grpcServer) - (dataConnect as FirebaseDataConnectInternal).awaitAppCheckReady() + dataConnect.awaitAppCheckReady() val mutationRef = dataConnect.mutation("mutz4hzqzpgb4", Unit, serializer(), serializer()) val metadatasJob = async { grpcServer.metadatas.first() } diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt new file mode 100644 index 00000000000..f73f1d8b50d --- /dev/null +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.firebase.dataconnect.testutil + +import com.google.firebase.dataconnect.FirebaseDataConnect +import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal + +suspend fun FirebaseDataConnect.awaitAuthReady() = + (this as FirebaseDataConnectInternal).awaitAuthReady() + +suspend fun FirebaseDataConnect.awaitAppCheckReady() = + (this as FirebaseDataConnectInternal).awaitAppCheckReady() diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt index e6f4049c359..9458a678bff 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt @@ -30,7 +30,6 @@ import com.google.firebase.inject.Provider import com.google.firebase.internal.api.FirebaseNoSignedInUserException import com.google.firebase.util.nextAlphanumericString import java.lang.ref.WeakReference -import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.coroutineContext import kotlin.random.Random import kotlinx.coroutines.CancellationException @@ -46,10 +45,9 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch -import kotlinx.coroutines.yield /** Base class that shares logic for managing the Auth token and AppCheck token. */ internal sealed class DataConnectCredentialsTokenManager( @@ -61,9 +59,6 @@ internal sealed class DataConnectCredentialsTokenManager( val instanceId: String get() = logger.nameWithId - private val _providerAvailable = MutableStateFlow(false) - val providerAvailable: StateFlow = _providerAvailable.asStateFlow() - @Suppress("LeakingThis") private val weakThis = WeakReference(this) private val coroutineScope = @@ -87,49 +82,39 @@ internal sealed class DataConnectCredentialsTokenManager( } } - private interface ProviderProvider { - val provider: T? - } - private sealed interface State { /** State indicating that [close] has been invoked. */ object Closed : State - /** State indicating that there is no outstanding "get token" request. */ - class Idle( - - /** - * The [InternalAuthProvider] or [InteropAppCheckTokenProvider]; may be null if the deferred - * has not yet given us a provider. - */ - override val provider: T?, - + sealed interface StateWithForceTokenRefresh : State { /** The value to specify for `forceRefresh` on the next invocation of [getToken]. */ val forceTokenRefresh: Boolean - ) : State, ProviderProvider + } - /** State indicating that there _is_ an outstanding "get token" request. */ - class Active( + /** State indicating that the token provider is not (yet?) available. */ + data class New(override val forceTokenRefresh: Boolean) : StateWithForceTokenRefresh - /** - * The [InternalAuthProvider] or [InteropAppCheckTokenProvider] that is performing the "get - * token" request. - */ + sealed interface StateWithProvider : State { + /** The token provider, [InternalAuthProvider] or [InteropAppCheckTokenProvider] */ + val provider: T + } + + /** State indicating that there is no outstanding "get token" request. */ + data class Idle(override val provider: T, override val forceTokenRefresh: Boolean) : + StateWithProvider, StateWithForceTokenRefresh + + /** State indicating that there _is_ an outstanding "get token" request. */ + data class Active( override val provider: T, /** The job that is performing the "get token" request. */ val job: Deferred>> - ) : State, ProviderProvider + ) : StateWithProvider } - /** - * The current state of this object. The value should only be changed in a compare-and-swap loop - * in order to be thread-safe. Such a loop should call `yield()` on each iteration to allow other - * coroutines to run on the thread. - */ - private val state = - AtomicReference>(State.Idle(provider = null, forceTokenRefresh = false)) + /** The current state of this object. */ + private val state = MutableStateFlow>(State.New(forceTokenRefresh = false)) /** * Adds the token listener to the given provider. @@ -168,19 +153,42 @@ internal sealed class DataConnectCredentialsTokenManager( setClosedState() } + /** + * Suspends until the token provider becomes available to this object. + * + * If [close] has been invoked, or is invoked _before_ a token provider becomes available, then + * this method returns normally, as if a token provider _had_ become available. + */ + suspend fun awaitTokenProvider() { + logger.debug { "awaitTokenProvider() start" } + val currentState = + state + .filter { + when (it) { + State.Closed -> true + is State.New -> false + is State.Idle -> true + is State.Active -> true + } + } + .first() + logger.debug { "awaitTokenProvider() done: currentState=$currentState" } + } + // This function must ONLY be called from close(). private fun setClosedState() { while (true) { - val oldState = state.get() - val providerProvider: ProviderProvider = + val oldState = state.value + val provider: T? = when (oldState) { is State.Closed -> return - is State.Idle -> oldState - is State.Active -> oldState + is State.New -> null + is State.Idle -> oldState.provider + is State.Active -> oldState.provider } if (state.compareAndSet(oldState, State.Closed)) { - providerProvider.provider?.let { removeTokenListener(it) } + provider?.let { removeTokenListener(it) } break } } @@ -191,27 +199,28 @@ internal sealed class DataConnectCredentialsTokenManager( * * If [close] has been called, this method does nothing. */ - suspend fun forceRefresh() { + fun forceRefresh() { logger.debug { "forceRefresh()" } while (true) { - val oldState = state.get() - val oldStateProviderProvider = + val oldState = state.value + val newState: State.StateWithForceTokenRefresh = when (oldState) { is State.Closed -> return - is State.Idle -> oldState + is State.New -> oldState.copy(forceTokenRefresh = true) + is State.Idle -> oldState.copy(forceTokenRefresh = true) is State.Active -> { val message = "needs token refresh (wgrwbrvjxt)" oldState.job.cancel(message, ForceRefresh(message)) - oldState + State.Idle(oldState.provider, forceTokenRefresh = true) } } - val newState = State.Idle(oldStateProviderProvider.provider, forceTokenRefresh = true) + check(newState.forceTokenRefresh) { + "newState.forceTokenRefresh should be true (error code gnvr2wx7nz)" + } if (state.compareAndSet(oldState, newState)) { break } - - yield() } } @@ -246,7 +255,7 @@ internal sealed class DataConnectCredentialsTokenManager( logger.debug { "$invocationId getToken(requestId=$requestId)" } while (true) { val attemptSequenceNumber = nextSequenceNumber() - val oldState = state.get() + val oldState = state.value val newState: State.Active = when (oldState) { @@ -257,13 +266,13 @@ internal sealed class DataConnectCredentialsTokenManager( } throw CredentialsTokenManagerClosedException(this) } - is State.Idle -> { - if (oldState.provider === null) { - logger.debug { - "$invocationId getToken() returns null (token provider is not (yet?) available)" - } - return null + is State.New -> { + logger.debug { + "$invocationId getToken() returns null (token provider is not (yet?) available)" } + return null + } + is State.Idle -> { newActiveState(invocationId, oldState.provider, oldState.forceTokenRefresh) } is State.Active -> { @@ -342,7 +351,7 @@ internal sealed class DataConnectCredentialsTokenManager( addTokenListener(newProvider) while (true) { - val oldState = state.get() + val oldState = state.value val newState = when (oldState) { is State.Closed -> { @@ -353,6 +362,7 @@ internal sealed class DataConnectCredentialsTokenManager( removeTokenListener(newProvider) break } + is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh) is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh) is State.Active -> { val newProviderClassName = newProvider::class.qualifiedName @@ -366,8 +376,6 @@ internal sealed class DataConnectCredentialsTokenManager( break } } - - _providerAvailable.value = true } /** diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt index 26e9ce49c51..d0ebe3f84c3 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt @@ -102,7 +102,7 @@ internal class DataConnectGrpcClient( ) } - private suspend inline fun T.retryOnGrpcUnauthenticatedError( + private inline fun T.retryOnGrpcUnauthenticatedError( requestId: String, kotlinMethodName: String, block: T.() -> R diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index d9bee50b89d..f49afda964f 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -54,8 +54,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -120,9 +118,6 @@ internal class FirebaseDataConnectImpl( } ) - private val authProviderAvailable = MutableStateFlow(false) - private val appCheckProviderAvailable = MutableStateFlow(false) - // Protects `closed`, `grpcClient`, `emulatorSettings`, and `queryManager`. private val mutex = Mutex() @@ -141,17 +136,7 @@ internal class FirebaseDataConnectImpl( ) override suspend fun awaitAuthReady() { - authProviderAvailable.first { it } - } - - init { - val name = CoroutineName("DataConnectAuth isProviderAvailable pipe for $instanceId") - coroutineScope.launch(name) { - dataConnectAuth.providerAvailable.collect { isProviderAvailable -> - logger.debug { "authProviderAvailable=$isProviderAvailable" } - authProviderAvailable.value = isProviderAvailable - } - } + dataConnectAuth.awaitTokenProvider() } private val dataConnectAppCheck: DataConnectAppCheck = @@ -163,17 +148,7 @@ internal class FirebaseDataConnectImpl( ) override suspend fun awaitAppCheckReady() { - appCheckProviderAvailable.first { it } - } - - init { - val name = CoroutineName("DataConnectAppCheck isProviderAvailable pipe for $instanceId") - coroutineScope.launch(name) { - dataConnectAppCheck.providerAvailable.collect { isProviderAvailable -> - logger.debug { "appCheckProviderAvailable=$isProviderAvailable" } - appCheckProviderAvailable.value = isProviderAvailable - } - } + dataConnectAppCheck.awaitTokenProvider() } private val lazyGrpcRPCs =