@@ -41,7 +41,6 @@ import kotlinx.coroutines.FlowPreview
4141import kotlinx.coroutines.TimeoutCancellationException
4242import kotlinx.coroutines.async
4343import kotlinx.coroutines.flow.catch
44- import kotlinx.coroutines.flow.collect
4544import kotlinx.coroutines.flow.distinctUntilChanged
4645import kotlinx.coroutines.flow.first
4746import kotlinx.coroutines.flow.flowOn
@@ -446,8 +445,18 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {
446445 schema.getPerson(personId).withDataDeserializer(serializer<GetPersonDataNoName2 >())
447446
448447 turbineScope {
449- val noName1Flow = noName1Query.subscribe().flow.distinctUntilChanged().testIn(backgroundScope)
450- val noName2Flow = noName2Query.subscribe().flow.distinctUntilChanged().testIn(backgroundScope)
448+ val noName1Flow =
449+ noName1Query
450+ .subscribe()
451+ .flow
452+ .distinctUntilChanged(::areEquivalentQuerySubscriptionResults)
453+ .testIn(backgroundScope)
454+ val noName2Flow =
455+ noName2Query
456+ .subscribe()
457+ .flow
458+ .distinctUntilChanged(::areEquivalentQuerySubscriptionResults)
459+ .testIn(backgroundScope)
451460 withClue(" noName1Flow-0" ) { noName1Flow.awaitPersonWithName(" Name0" ) }
452461 withClue(" noName2Flow-0" ) { noName2Flow.awaitPersonWithName(" Name0" ) }
453462
@@ -636,5 +645,37 @@ class QuerySubscriptionIntegrationTest : DataConnectIntegrationTestBase() {
636645 val person = withClue(" data.person" ) { data.person.shouldNotBeNull() }
637646 withClue(" person.name" ) { person.name shouldBe name }
638647 }
648+
649+ /* *
650+ * Returns `true` if, and only if, the receiver is a non-null instance of
651+ * [DataConnectOperationException] that indicates that the failure is due to decoding of the
652+ * server response failed.
653+ */
654+ fun Throwable?.isDecodingServerResponseFailed (): Boolean =
655+ this is DataConnectOperationException && this .response.errors.isEmpty()
656+
657+ /* *
658+ * Returns `true` if, and only if, the receiver's result is a failure that indicates that
659+ * decoding of the server response failed.
660+ */
661+ fun QuerySubscriptionResult <* , * >.isDecodingServerResponseFailed (): Boolean =
662+ result.exceptionOrNull().isDecodingServerResponseFailed()
663+
664+ /* *
665+ * Checks if two [QuerySubscriptionResult] instances are "equivalent"; that is, they are both
666+ * equal when compared using the `==` operator, or they are both failures due to decoding of the
667+ * server response failed.
668+ *
669+ * This is useful when testing flows because the same decoding failure can happen more than once
670+ * in a row based on other asynchronous operations but testing for "distinctness" will consider
671+ * those two failures as "distinct" when the test wants them to be treated as "equal".
672+ *
673+ * Googlers see b/399380932 for full details and history of the flaky test that this fixes.
674+ */
675+ fun areEquivalentQuerySubscriptionResults (
676+ old : QuerySubscriptionResult <* , * >,
677+ new : QuerySubscriptionResult <* , * >
678+ ): Boolean =
679+ (old == new) || (old.isDecodingServerResponseFailed() && new.isDecodingServerResponseFailed())
639680 }
640681}
0 commit comments