Skip to content

Commit 4462b92

Browse files
authored
Propagate converter exceptions instead of hanging indefinitely (#728)
Signed-off-by: Matt Ramotar <matt.ramotar@uber.com>
1 parent cfb4ae0 commit 4462b92

File tree

3 files changed

+290
-19
lines changed

3 files changed

+290
-19
lines changed

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/FetcherController.kt

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.mobilenativefoundation.store.store5.impl
1717

18+
import kotlinx.coroutines.CancellationException
1819
import kotlinx.coroutines.CoroutineScope
1920
import kotlinx.coroutines.NonCancellable
2021
import kotlinx.coroutines.async
@@ -78,10 +79,22 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
7879
flow { emitAll(realFetcher(key)) }.map {
7980
when (it) {
8081
is FetcherResult.Data -> {
81-
StoreReadResponse.Data(
82-
it.value,
83-
origin = StoreReadResponseOrigin.Fetcher(it.origin),
84-
) as StoreReadResponse<Network>
82+
try {
83+
val network = it.value
84+
val local = converter.fromNetworkToLocal(network)
85+
sourceOfTruth?.write(key, local)
86+
StoreReadResponse.Data(
87+
network,
88+
origin = StoreReadResponseOrigin.Fetcher(it.origin),
89+
) as StoreReadResponse<Network>
90+
} catch (exception: CancellationException) {
91+
throw exception
92+
} catch (exception: Throwable) {
93+
StoreReadResponse.Error.Exception(
94+
exception,
95+
origin = StoreReadResponseOrigin.Fetcher(it.origin),
96+
)
97+
}
8598
}
8699

87100
is FetcherResult.Error.Message ->
@@ -106,17 +119,15 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
106119
StoreReadResponseOrigin.Fetcher()
107120
emit(StoreReadResponse.NoNewData(origin))
108121
},
109-
/**
110-
* When enabled, downstream collectors are never closed, instead, they are kept active to
111-
* receive values dispatched by fetchers created after them. This makes [FetcherController]
112-
* act like a [SourceOfTruth] in the lack of a [SourceOfTruth] provided by the developer.
113-
*/
122+
// When enabled, downstream collectors are never closed.
123+
// Instead, they are kept active to receive values dispatched by fetchers created after them.
124+
// This makes FetcherController act like a SourceOfTruth in the lack of a SourceOfTruth provided by the developer.
114125
piggybackingDownstream = true,
115-
onEach = { response ->
116-
response.dataOrNull()?.let { network: Network ->
117-
val local: Local = converter.fromNetworkToLocal(network)
118-
sourceOfTruth?.write(key, local)
119-
}
126+
onEach = { _ ->
127+
// Exceptions thrown here propagate to the actor and close downstream channels silently.
128+
// This caused store.stream() and store.get() to hang indefinitely (see #660).
129+
// Consequently, we are intentionally performing no work here.
130+
// Conversion and SOT writes now happen in the source flow above.
120131
},
121132
)
122133
},

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
223223
}
224224

225225
val requestKeyToFetcherName: MutableMap<Key, String?> = mutableMapOf()
226+
// Track if network errored AND this is a fresh request where fallback behavior matters
227+
var networkErrorWithNoFallback = false
226228
// we use a merge implementation that gives the source of the flow so that we can decide
227229
// based on that.
228230
return networkFlow.merge(diskFlow).transform {
@@ -233,13 +235,21 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
233235
val responseOrigin = it.value.origin as StoreReadResponseOrigin.Fetcher
234236
requestKeyToFetcherName[request.key] = responseOrigin.name
235237

236-
val fallBackToSourceOfTruth =
237-
it.value is StoreReadResponse.Error && request.fallBackToSourceOfTruth
238+
// Track if network errored and fallback to disk is disabled for fresh requests
239+
if (it.value is StoreReadResponse.Error && skipDiskCache && !request.fallBackToSourceOfTruth) {
240+
networkErrorWithNoFallback = true
241+
} else if (it.value is StoreReadResponse.Data || it.value is StoreReadResponse.NoNewData) {
242+
// Reset on success so subsequent SOT emissions aren't incorrectly filtered
243+
networkErrorWithNoFallback = false
244+
}
238245

239-
if (it.value is StoreReadResponse.Data || it.value is StoreReadResponse.NoNewData || fallBackToSourceOfTruth) {
240-
// Unlocking disk only if network sent data or reported no new data
246+
if (it.value is StoreReadResponse.Data ||
247+
it.value is StoreReadResponse.NoNewData ||
248+
it.value is StoreReadResponse.Error
249+
) {
250+
// Unlocking disk only if network sent data, reported no new data, or returned an error
241251
// so that fresh data request never receives new fetcher data after
242-
// cached disk data.
252+
// cached disk data, and so that the flow can properly complete on errors.
243253
// This means that if the user asked for fresh data but the network returned
244254
// no new data we will still unblock disk.
245255
diskLock.complete(Unit)
@@ -254,6 +264,12 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
254264
// right, that is data from disk
255265
when (val diskData = it.value) {
256266
is StoreReadResponse.Data -> {
267+
// Skip disk data (SOT origin) if this was a fresh request that errored with fallback disabled.
268+
// But always emit fresh network data (Fetcher origin) even after prior errors.
269+
if (networkErrorWithNoFallback && diskData.origin !is StoreReadResponseOrigin.Fetcher) {
270+
return@transform
271+
}
272+
257273
val responseOriginWithFetcherName =
258274
diskData.origin.let { origin ->
259275
if (origin is StoreReadResponseOrigin.Fetcher) {

store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/FlowStoreTests.kt

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.mobilenativefoundation.store.store5.util.asSourceOfTruth
4343
import kotlin.test.Test
4444
import kotlin.test.assertContains
4545
import kotlin.test.assertEquals
46+
import kotlin.test.assertIs
4647

4748
@FlowPreview
4849
@ExperimentalCoroutinesApi
@@ -1094,4 +1095,247 @@ class FlowStoreTests {
10941095
)
10951096

10961097
private fun <Key : Any, Output : Any> StoreBuilder<Key, Output>.buildWithTestScope() = scope(testScope).build()
1098+
1099+
@Test
1100+
fun stream_givenConverterThrows_thenEmitsError() =
1101+
testScope.runTest {
1102+
// Given
1103+
val exception = IllegalStateException("Converter failed")
1104+
val persister = InMemoryPersister<Int, String>()
1105+
1106+
val pipeline =
1107+
StoreBuilder.from(
1108+
fetcher = Fetcher.of { _: Int -> "network" },
1109+
sourceOfTruth = persister.asSourceOfTruth(),
1110+
converter =
1111+
object : Converter<String, String, String> {
1112+
override fun fromNetworkToLocal(network: String): String {
1113+
throw exception
1114+
}
1115+
1116+
override fun fromOutputToLocal(output: String): String = output
1117+
},
1118+
).buildWithTestScope()
1119+
1120+
// When + Then
1121+
pipeline.stream(StoreReadRequest.fresh(1)).test {
1122+
assertEquals(
1123+
Loading(
1124+
origin = StoreReadResponseOrigin.Fetcher(),
1125+
),
1126+
awaitItem(),
1127+
)
1128+
1129+
val errorResponse = awaitItem()
1130+
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
1131+
assertEquals(exception.message, errorResponse.error.message)
1132+
}
1133+
}
1134+
1135+
@Test
1136+
fun stream_givenNamedFetcherAndConverterThrows_thenErrorContainsFetcherName() =
1137+
testScope.runTest {
1138+
// Given
1139+
val fetcherName = "TestFetcher"
1140+
val exception = IllegalStateException("Converter failed")
1141+
val persister = InMemoryPersister<Int, String>()
1142+
1143+
val pipeline =
1144+
StoreBuilder.from(
1145+
fetcher = Fetcher.of(name = fetcherName) { _: Int -> "network" },
1146+
sourceOfTruth = persister.asSourceOfTruth(),
1147+
converter =
1148+
object : Converter<String, String, String> {
1149+
override fun fromNetworkToLocal(network: String): String {
1150+
throw exception
1151+
}
1152+
1153+
override fun fromOutputToLocal(output: String): String = output
1154+
},
1155+
).buildWithTestScope()
1156+
1157+
// When + Then
1158+
pipeline.stream(StoreReadRequest.fresh(1)).test {
1159+
assertEquals(
1160+
Loading(
1161+
origin = StoreReadResponseOrigin.Fetcher(),
1162+
),
1163+
awaitItem(),
1164+
)
1165+
1166+
val errorResponse = awaitItem()
1167+
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
1168+
val origin = errorResponse.origin
1169+
assertIs<StoreReadResponseOrigin.Fetcher>(origin)
1170+
assertEquals(fetcherName, origin.name)
1171+
}
1172+
}
1173+
1174+
@Test
1175+
fun stream_givenConverterThrowsWithFreshRequest_thenFlowCompletes() =
1176+
testScope.runTest {
1177+
// Given: fresh() request skips disk cache and fallBackToSourceOfTruth defaults to false
1178+
val exception = IllegalStateException("Converter failed")
1179+
val persister = InMemoryPersister<Int, String>()
1180+
1181+
val pipeline =
1182+
StoreBuilder.from(
1183+
fetcher = Fetcher.of { _: Int -> "network" },
1184+
sourceOfTruth = persister.asSourceOfTruth(),
1185+
converter =
1186+
object : Converter<String, String, String> {
1187+
override fun fromNetworkToLocal(network: String): String {
1188+
throw exception
1189+
}
1190+
1191+
override fun fromOutputToLocal(output: String): String = output
1192+
},
1193+
).buildWithTestScope()
1194+
1195+
// When + Then: Flow should complete, not hang indefinitely
1196+
pipeline.stream(StoreReadRequest.fresh(1)).test {
1197+
assertEquals(
1198+
Loading(
1199+
origin = StoreReadResponseOrigin.Fetcher(),
1200+
),
1201+
awaitItem(),
1202+
)
1203+
1204+
val errorResponse = awaitItem()
1205+
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
1206+
assertEquals(exception.message, errorResponse.error.message)
1207+
cancelAndIgnoreRemainingEvents()
1208+
}
1209+
}
1210+
1211+
@Test
1212+
fun stream_givenConverterThrowsWithFallbackDisabled_thenDiskDataNotEmitted() =
1213+
testScope.runTest {
1214+
// Given: Pre-populate disk with data, then request fresh with fallBackToSourceOfTruth=false
1215+
val exception = IllegalStateException("Converter failed")
1216+
val persister = InMemoryPersister<Int, String>()
1217+
persister.write(1, "cached value")
1218+
1219+
val pipeline =
1220+
StoreBuilder.from(
1221+
fetcher = Fetcher.of { _: Int -> "network" },
1222+
sourceOfTruth = persister.asSourceOfTruth(),
1223+
converter =
1224+
object : Converter<String, String, String> {
1225+
override fun fromNetworkToLocal(network: String): String {
1226+
throw exception
1227+
}
1228+
1229+
override fun fromOutputToLocal(output: String): String = output
1230+
},
1231+
).buildWithTestScope()
1232+
1233+
// When: Request with fallBackToSourceOfTruth=false
1234+
pipeline.stream(StoreReadRequest.fresh(1, fallBackToSourceOfTruth = false)).test {
1235+
assertEquals(
1236+
Loading(origin = StoreReadResponseOrigin.Fetcher()),
1237+
awaitItem(),
1238+
)
1239+
1240+
// Then: Only error is emitted, no disk data (since fallback is disabled)
1241+
val errorResponse = awaitItem()
1242+
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
1243+
assertEquals(exception.message, errorResponse.error.message)
1244+
cancelAndIgnoreRemainingEvents()
1245+
}
1246+
}
1247+
1248+
@Test
1249+
fun stream_givenConverterThrowsWithFallbackEnabled_thenDiskDataEmitted() =
1250+
testScope.runTest {
1251+
// Given: Pre-populate disk with data, then request with fallBackToSourceOfTruth=true
1252+
val exception = IllegalStateException("Converter failed")
1253+
val persister = InMemoryPersister<Int, String>()
1254+
persister.write(1, "cached value")
1255+
1256+
val pipeline =
1257+
StoreBuilder.from(
1258+
fetcher = Fetcher.of { _: Int -> "network" },
1259+
sourceOfTruth = persister.asSourceOfTruth(),
1260+
converter =
1261+
object : Converter<String, String, String> {
1262+
override fun fromNetworkToLocal(network: String): String {
1263+
throw exception
1264+
}
1265+
1266+
override fun fromOutputToLocal(output: String): String = output
1267+
},
1268+
).buildWithTestScope()
1269+
1270+
// When: Request with fallBackToSourceOfTruth=true
1271+
pipeline.stream(StoreReadRequest.fresh(1, fallBackToSourceOfTruth = true)).test {
1272+
assertEquals(
1273+
Loading(origin = StoreReadResponseOrigin.Fetcher()),
1274+
awaitItem(),
1275+
)
1276+
1277+
// Then: Error is emitted
1278+
val errorResponse = awaitItem()
1279+
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
1280+
1281+
// And: Disk data is also emitted (since fallback is enabled)
1282+
val diskData = awaitItem()
1283+
assertIs<StoreReadResponse.Data<String>>(diskData)
1284+
assertEquals("cached value", diskData.value)
1285+
cancelAndIgnoreRemainingEvents()
1286+
}
1287+
}
1288+
1289+
@Test
1290+
fun stream_givenConverterFailsThenSucceeds_thenSecondRequestEmitsData() =
1291+
testScope.runTest {
1292+
// Given: Converter that fails on first attempt and succeeds on second
1293+
var attempts = 0
1294+
val exception = IllegalStateException("Converter failed")
1295+
val persister = InMemoryPersister<Int, String>()
1296+
1297+
val pipeline =
1298+
StoreBuilder.from(
1299+
fetcher = Fetcher.of { _: Int -> "network value" },
1300+
sourceOfTruth = persister.asSourceOfTruth(),
1301+
converter =
1302+
object : Converter<String, String, String> {
1303+
override fun fromNetworkToLocal(network: String): String {
1304+
attempts++
1305+
if (attempts == 1) {
1306+
throw exception
1307+
}
1308+
return network
1309+
}
1310+
1311+
override fun fromOutputToLocal(output: String): String = output
1312+
},
1313+
).buildWithTestScope()
1314+
1315+
// First request: fresh with fallback disabled (should error)
1316+
pipeline.stream(StoreReadRequest.fresh(1, fallBackToSourceOfTruth = false)).test {
1317+
assertEquals(
1318+
Loading(origin = StoreReadResponseOrigin.Fetcher()),
1319+
awaitItem(),
1320+
)
1321+
val errorResponse = awaitItem()
1322+
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
1323+
assertEquals(exception.message, errorResponse.error.message)
1324+
cancelAndIgnoreRemainingEvents()
1325+
}
1326+
1327+
// Second request: fresh again (should succeed and emit data)
1328+
pipeline.stream(StoreReadRequest.fresh(1, fallBackToSourceOfTruth = false)).test {
1329+
assertEquals(
1330+
Loading(origin = StoreReadResponseOrigin.Fetcher()),
1331+
awaitItem(),
1332+
)
1333+
1334+
// Should receive data (Fetcher origin from SOT), not be skipped
1335+
val dataResponse = awaitItem()
1336+
assertIs<StoreReadResponse.Data<String>>(dataResponse)
1337+
assertEquals("network value", dataResponse.value)
1338+
cancelAndIgnoreRemainingEvents()
1339+
}
1340+
}
10971341
}

0 commit comments

Comments
 (0)