Skip to content

Commit 415215c

Browse files
committed
dataconnect: Use MutableStateFlow.update() instead of MutableStateFlow.compareAndSet() directly for improved readability and less potential for bugs
1 parent 633e68f commit 415215c

File tree

5 files changed

+71
-94
lines changed

5 files changed

+71
-94
lines changed

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ import kotlinx.coroutines.ensureActive
4747
import kotlinx.coroutines.flow.MutableStateFlow
4848
import kotlinx.coroutines.flow.filter
4949
import kotlinx.coroutines.flow.first
50+
import kotlinx.coroutines.flow.getAndUpdate
51+
import kotlinx.coroutines.flow.update
52+
import kotlinx.coroutines.flow.updateAndGet
5053
import kotlinx.coroutines.launch
5154

5255
/** Base class that shares logic for managing the Auth token and AppCheck token. */
@@ -177,20 +180,9 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
177180

178181
// This function must ONLY be called from close().
179182
private fun setClosedState() {
180-
while (true) {
181-
val oldState = state.value
182-
val provider: T? =
183-
when (oldState) {
184-
is State.Closed -> return
185-
is State.New -> null
186-
is State.Idle -> oldState.provider
187-
is State.Active -> oldState.provider
188-
}
189-
190-
if (state.compareAndSet(oldState, State.Closed)) {
191-
provider?.let { removeTokenListener(it) }
192-
break
193-
}
183+
val oldState = state.getAndUpdate { State.Closed }
184+
if (oldState is State.StateWithProvider<T>) {
185+
removeTokenListener(oldState.provider)
194186
}
195187
}
196188

@@ -201,9 +193,8 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
201193
*/
202194
fun forceRefresh() {
203195
logger.debug { "forceRefresh()" }
204-
while (true) {
205-
val oldState = state.value
206-
val newState: State.StateWithForceTokenRefresh<T> =
196+
val newState =
197+
state.updateAndGet { oldState ->
207198
when (oldState) {
208199
is State.Closed -> return
209200
is State.New -> oldState.copy(forceTokenRefresh = true)
@@ -214,13 +205,10 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
214205
State.Idle(oldState.provider, forceTokenRefresh = true)
215206
}
216207
}
217-
218-
check(newState.forceTokenRefresh) {
219-
"newState.forceTokenRefresh should be true (error code gnvr2wx7nz)"
220-
}
221-
if (state.compareAndSet(oldState, newState)) {
222-
break
223208
}
209+
210+
check(newState is State.StateWithForceTokenRefresh<T> && newState.forceTokenRefresh) {
211+
"newState.forceTokenRefresh should be true: $newState (error code gnvr2wx7nz)"
224212
}
225213
}
226214

@@ -350,30 +338,24 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
350338
logger.debug { "onProviderAvailable(newProvider=$newProvider)" }
351339
addTokenListener(newProvider)
352340

353-
while (true) {
354-
val oldState = state.value
355-
val newState =
356-
when (oldState) {
357-
is State.Closed -> {
358-
logger.debug {
359-
"onProviderAvailable(newProvider=$newProvider)" +
360-
" unregistering token listener that was just added"
361-
}
362-
removeTokenListener(newProvider)
363-
break
364-
}
365-
is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh)
366-
is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh)
367-
is State.Active -> {
368-
val newProviderClassName = newProvider::class.qualifiedName
369-
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
370-
oldState.job.cancel(message, NewProvider(message))
371-
State.Idle(newProvider, forceTokenRefresh = false)
341+
state.update { oldState ->
342+
when (oldState) {
343+
is State.Closed -> {
344+
logger.debug {
345+
"onProviderAvailable(newProvider=$newProvider)" +
346+
" unregistering token listener that was just added"
372347
}
348+
removeTokenListener(newProvider)
349+
oldState
350+
}
351+
is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh)
352+
is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh)
353+
is State.Active -> {
354+
val newProviderClassName = newProvider::class.qualifiedName
355+
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
356+
oldState.job.cancel(message, NewProvider(message))
357+
State.Idle(newProvider, forceTokenRefresh = false)
373358
}
374-
375-
if (state.compareAndSet(oldState, newState)) {
376-
break
377359
}
378360
}
379361
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import kotlinx.coroutines.async
5454
import kotlinx.coroutines.cancel
5555
import kotlinx.coroutines.flow.MutableStateFlow
5656
import kotlinx.coroutines.flow.collect
57+
import kotlinx.coroutines.flow.updateAndGet
5758
import kotlinx.coroutines.runBlocking
5859
import kotlinx.coroutines.sync.Mutex
5960
import kotlinx.coroutines.sync.withLock
@@ -407,34 +408,37 @@ internal class FirebaseDataConnectImpl(
407408
dataConnectAppCheck.close()
408409

409410
// Start the job to asynchronously close the gRPC client.
410-
while (true) {
411-
val oldCloseJob = closeJob.value
412-
413-
oldCloseJob.ref?.let {
414-
if (!it.isCancelled) {
415-
return it
411+
val newCloseJobRef =
412+
closeJob.updateAndGet { oldCloseJob ->
413+
oldCloseJob.ref?.let {
414+
if (!it.isCancelled) {
415+
return it
416+
}
416417
}
417-
}
418418

419-
@OptIn(DelicateCoroutinesApi::class)
420-
val newCloseJob =
421-
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
422-
lazyGrpcRPCs.initializedValueOrNull?.close()
423-
}
419+
@OptIn(DelicateCoroutinesApi::class)
420+
val newCloseJob =
421+
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
422+
lazyGrpcRPCs.initializedValueOrNull?.close()
423+
}
424424

425-
newCloseJob.invokeOnCompletion { exception ->
426-
if (exception === null) {
427-
logger.debug { "close() completed successfully" }
428-
} else {
429-
logger.warn(exception) { "close() failed" }
425+
newCloseJob.invokeOnCompletion { exception ->
426+
if (exception === null) {
427+
logger.debug { "close() completed successfully" }
428+
} else {
429+
logger.warn(exception) { "close() failed" }
430+
}
430431
}
432+
433+
NullableReference(newCloseJob)
431434
}
432435

433-
if (closeJob.compareAndSet(oldCloseJob, NullableReference(newCloseJob))) {
434-
newCloseJob.start()
435-
return newCloseJob
436+
val newCloseJob =
437+
checkNotNull(newCloseJobRef.ref) {
438+
"newCloseJobRef.ref should not be null (error code j3gbhd6e4j)"
436439
}
437-
}
440+
newCloseJob.start()
441+
return newCloseJob
438442
}
439443

440444
// The generated SDK relies on equals() and hashCode() using object identity.

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kotlinx.coroutines.cancelAndJoin
2727
import kotlinx.coroutines.flow.Flow
2828
import kotlinx.coroutines.flow.MutableStateFlow
2929
import kotlinx.coroutines.flow.channelFlow
30+
import kotlinx.coroutines.flow.update
3031
import kotlinx.coroutines.launch
3132

3233
internal class QuerySubscriptionImpl<Data, Variables>(query: QueryRefImpl<Data, Variables>) :
@@ -80,22 +81,17 @@ internal class QuerySubscriptionImpl<Data, Variables>(query: QueryRefImpl<Data,
8081
}
8182

8283
private fun updateLastResult(prospectiveLastResult: QuerySubscriptionResultImpl) {
83-
// Update the last result in a compare-and-swap loop so that there is no possibility of
84-
// clobbering a newer result with an older result, compared using their sequence numbers.
8584
// TODO: Fix this so that results from an old query do not clobber results from a new query,
8685
// as set by a call to update()
87-
while (true) {
88-
val currentLastResult = _lastResult.value
89-
if (currentLastResult.ref != null) {
90-
val currentSequenceNumber = currentLastResult.ref.sequencedResult.sequenceNumber
91-
val prospectiveSequenceNumber = prospectiveLastResult.sequencedResult.sequenceNumber
92-
if (currentSequenceNumber >= prospectiveSequenceNumber) {
93-
return
94-
}
95-
}
96-
97-
if (_lastResult.compareAndSet(currentLastResult, NullableReference(prospectiveLastResult))) {
98-
return
86+
_lastResult.update { currentLastResult ->
87+
if (
88+
currentLastResult.ref != null &&
89+
currentLastResult.ref.sequencedResult.sequenceNumber >=
90+
prospectiveLastResult.sequencedResult.sequenceNumber
91+
) {
92+
currentLastResult
93+
} else {
94+
NullableReference(prospectiveLastResult)
9995
}
10096
}
10197
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import kotlinx.coroutines.channels.BufferOverflow
3131
import kotlinx.coroutines.flow.MutableSharedFlow
3232
import kotlinx.coroutines.flow.MutableStateFlow
3333
import kotlinx.coroutines.flow.onSubscription
34+
import kotlinx.coroutines.flow.update
3435
import kotlinx.coroutines.withContext
3536
import kotlinx.serialization.DeserializationStrategy
3637
import kotlinx.serialization.modules.SerializersModule
@@ -84,17 +85,14 @@ internal class RegisteredDataDeserializer<T>(
8485
lazyDeserialize(requestId, sequencedResult)
8586
)
8687

87-
// Use a compare-and-swap ("CAS") loop to ensure that an old update never clobbers a newer one.
88-
while (true) {
89-
val currentUpdate = latestUpdate.value
88+
latestUpdate.update { currentUpdate ->
9089
if (
9190
currentUpdate.ref !== null &&
9291
currentUpdate.ref.sequenceNumber > sequencedResult.sequenceNumber
9392
) {
94-
break // don't clobber a newer update with an older one
95-
}
96-
if (latestUpdate.compareAndSet(currentUpdate, NullableReference(newUpdate))) {
97-
break
93+
currentUpdate // don't clobber a newer update with an older one
94+
} else {
95+
NullableReference(newUpdate)
9896
}
9997
}
10098

firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.google.firebase.dataconnect.testutil
1919
import kotlinx.coroutines.flow.MutableStateFlow
2020
import kotlinx.coroutines.flow.filter
2121
import kotlinx.coroutines.flow.first
22+
import kotlinx.coroutines.flow.update
2223

2324
/**
2425
* An implementation of [java.util.concurrent.CountDownLatch] that suspends instead of blocking.
@@ -60,14 +61,10 @@ class SuspendingCountDownLatch(count: Int) {
6061
* @throws IllegalStateException if called when the count has already reached zero.
6162
*/
6263
fun countDown(): SuspendingCountDownLatch {
63-
while (true) {
64-
val oldValue = _count.value
64+
_count.update { oldValue ->
6565
check(oldValue > 0) { "countDown() called too many times (oldValue=$oldValue)" }
66-
67-
val newValue = oldValue - 1
68-
if (_count.compareAndSet(oldValue, newValue)) {
69-
return this
70-
}
66+
oldValue - 1
7167
}
68+
return this
7269
}
7370
}

0 commit comments

Comments
 (0)