Skip to content

Commit bd41ac6

Browse files
committed
Introduced MessageReceiptReporter, a new class responsible for observing the local database for delivery receipts and reporting them to the backend.
This reporter collects receipts in batches and sends them periodically to reduce network traffic. After a successful API call, the reported receipts are removed from the local database. New tests are included to verify this functionality.
1 parent 173df4b commit bd41ac6

File tree

2 files changed

+259
-0
lines changed

2 files changed

+259
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.getstream.chat.android.client.receipts
18+
19+
import io.getstream.chat.android.client.ChatClient
20+
import io.getstream.chat.android.client.persistance.repository.MessageReceiptRepository
21+
import io.getstream.chat.android.models.Message
22+
import io.getstream.chat.android.models.MessageReceipt
23+
import io.getstream.log.taggedLogger
24+
import io.getstream.result.onSuccessSuspend
25+
import kotlinx.coroutines.CoroutineScope
26+
import kotlinx.coroutines.FlowPreview
27+
import kotlinx.coroutines.flow.filterNot
28+
import kotlinx.coroutines.flow.launchIn
29+
import kotlinx.coroutines.flow.map
30+
import kotlinx.coroutines.flow.onEach
31+
import kotlinx.coroutines.flow.sample
32+
33+
/**
34+
* Reports message delivery receipts to the server in batches of [MAX_BATCH_SIZE]
35+
* every [REPORT_INTERVAL_IN_MS] milliseconds.
36+
*/
37+
internal class MessageReceiptReporter(
38+
private val scope: CoroutineScope,
39+
private val chatClient: ChatClient,
40+
private val messageReceiptRepository: MessageReceiptRepository,
41+
) {
42+
43+
private val logger by taggedLogger("MessageReceiptReporter")
44+
45+
@OptIn(FlowPreview::class)
46+
fun init() {
47+
messageReceiptRepository.getAllByType(type = MessageReceipt.TYPE_DELIVERY, limit = MAX_BATCH_SIZE)
48+
.sample(REPORT_INTERVAL_IN_MS)
49+
.filterNot(List<MessageReceipt>::isEmpty)
50+
.map { receipts ->
51+
receipts.map { receipt ->
52+
Message(
53+
id = receipt.messageId,
54+
cid = receipt.cid,
55+
)
56+
}
57+
}
58+
.onEach { messages ->
59+
logger.d { "[init] Reporting delivery receipts for ${messages.size} messages…" }
60+
chatClient.markMessagesAsDelivered(messages)
61+
.execute()
62+
.onSuccessSuspend {
63+
val deliveredMessageIds = messages.map(Message::id)
64+
messageReceiptRepository.deleteByMessageIds(deliveredMessageIds)
65+
}
66+
}
67+
.launchIn(scope)
68+
}
69+
}
70+
71+
private const val REPORT_INTERVAL_IN_MS = 1000L
72+
private const val MAX_BATCH_SIZE = 100
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.getstream.chat.android.client.receipts
18+
19+
import io.getstream.chat.android.client.ChatClient
20+
import io.getstream.chat.android.client.persistance.repository.MessageReceiptRepository
21+
import io.getstream.chat.android.models.Message
22+
import io.getstream.chat.android.models.MessageReceipt
23+
import io.getstream.chat.android.randomMessageReceipt
24+
import io.getstream.chat.android.test.asCall
25+
import io.getstream.result.Error
26+
import kotlinx.coroutines.CoroutineScope
27+
import kotlinx.coroutines.ExperimentalCoroutinesApi
28+
import kotlinx.coroutines.cancel
29+
import kotlinx.coroutines.flow.MutableStateFlow
30+
import kotlinx.coroutines.test.advanceTimeBy
31+
import kotlinx.coroutines.test.runTest
32+
import org.junit.Test
33+
import org.mockito.kotlin.any
34+
import org.mockito.kotlin.doReturn
35+
import org.mockito.kotlin.mock
36+
import org.mockito.kotlin.never
37+
import org.mockito.kotlin.times
38+
import org.mockito.kotlin.verify
39+
import org.mockito.kotlin.verifyBlocking
40+
import org.mockito.kotlin.whenever
41+
import org.mockito.verification.VerificationMode
42+
43+
@OptIn(ExperimentalCoroutinesApi::class)
44+
internal class MessageReceiptReporterTest {
45+
46+
@Test
47+
fun `should fetch and send delivery receipts successfully`() = runTest {
48+
val receipts = listOf(
49+
randomMessageReceipt(),
50+
randomMessageReceipt(),
51+
)
52+
val messages = receipts.map { receipt ->
53+
Message(
54+
id = receipt.messageId,
55+
cid = receipt.cid,
56+
)
57+
}
58+
val fixture = Fixture()
59+
.givenMessageReceipts(receipts)
60+
.givenMarkMessagesAsDelivered(messages)
61+
val sut = fixture.get(backgroundScope)
62+
63+
sut.init()
64+
advanceTimeBy(1100) // Advance time to after the interval window
65+
66+
fixture.verifyMarkMessagesAsDeliveredCalled(messages = messages)
67+
val messageIds = messages.map(Message::id)
68+
fixture.verifyDeleteByMessageIdsCalled(messageIds = messageIds)
69+
}
70+
71+
@Test
72+
fun `should not delete receipts when marking messages as delivered fails`() = runTest {
73+
val fixture = Fixture()
74+
.givenMessageReceipts(listOf(randomMessageReceipt()))
75+
.givenMarkMessagesAsDelivered(error = mock())
76+
val sut = fixture.get(backgroundScope)
77+
78+
sut.init()
79+
advanceTimeBy(1100) // Allow initial execution
80+
81+
fixture.verifyDeleteByMessageIdsCalled(never())
82+
83+
// Keep processing subsequent success emissions
84+
fixture.givenMessageReceipts(listOf(randomMessageReceipt()))
85+
fixture.givenMarkMessagesAsDelivered()
86+
87+
advanceTimeBy(1100)
88+
89+
fixture.verifyMarkMessagesAsDeliveredCalled(times(2))
90+
fixture.verifyDeleteByMessageIdsCalled()
91+
}
92+
93+
@Test
94+
fun `should handle empty receipt list`() = runTest {
95+
val fixture = Fixture()
96+
.givenMessageReceipts(receipts = emptyList())
97+
val sut = fixture.get(backgroundScope)
98+
99+
sut.init()
100+
advanceTimeBy(1100)
101+
102+
fixture.verifyMarkMessagesAsDeliveredCalled(never())
103+
fixture.verifyDeleteByMessageIdsCalled(never())
104+
}
105+
106+
@Test
107+
fun `should execute periodically with correct delay`() = runTest {
108+
val fixture = Fixture()
109+
.givenMessageReceipts(listOf(randomMessageReceipt()))
110+
.givenMarkMessagesAsDelivered()
111+
val sut = fixture.get(backgroundScope)
112+
113+
sut.init()
114+
115+
// Trigger multiple emissions
116+
117+
advanceTimeBy(1100) // Collecting the first emission
118+
119+
// Trigger a new list
120+
fixture.givenMessageReceipts(listOf(randomMessageReceipt()))
121+
advanceTimeBy(1000) // Wait for delay
122+
123+
// Trigger a new list
124+
fixture.givenMessageReceipts(listOf(randomMessageReceipt()))
125+
advanceTimeBy(100) // Collecting the second emission
126+
127+
// Trigger a new list
128+
fixture.givenMessageReceipts(listOf(randomMessageReceipt()))
129+
advanceTimeBy(1000) // Wait for delay
130+
advanceTimeBy(100) // Collecting the third emission
131+
132+
fixture.verifyMarkMessagesAsDeliveredCalled(times(3))
133+
}
134+
135+
@Test
136+
fun `should stop execution when coroutine scope is cancelled`() = runTest {
137+
val fixture = Fixture()
138+
.givenMessageReceipts(listOf(randomMessageReceipt()))
139+
.givenMarkMessagesAsDelivered()
140+
val sut = fixture.get(backgroundScope)
141+
142+
sut.init()
143+
advanceTimeBy(1100) // Allow initial execution
144+
145+
backgroundScope.cancel()
146+
147+
// Trigger a new list
148+
fixture.givenMessageReceipts(listOf(randomMessageReceipt()))
149+
150+
advanceTimeBy(2000) // Try to advance time after cancellation
151+
152+
fixture.verifyMarkMessagesAsDeliveredCalled(times(1))
153+
}
154+
155+
private class Fixture {
156+
private val mockChatClient = mock<ChatClient>()
157+
158+
private val receiptsStateFlow = MutableStateFlow<List<MessageReceipt>>(emptyList())
159+
160+
private val mockMessageReceiptRepository = mock<MessageReceiptRepository> {
161+
onBlocking { getAllByType(type = MessageReceipt.TYPE_DELIVERY, limit = 100) } doReturn receiptsStateFlow
162+
}
163+
164+
fun givenMessageReceipts(receipts: List<MessageReceipt>) = apply {
165+
receiptsStateFlow.value = receipts
166+
}
167+
168+
fun givenMarkMessagesAsDelivered(messages: List<Message>? = null, error: Error? = null) = apply {
169+
whenever(mockChatClient.markMessagesAsDelivered(messages ?: any())) doReturn
170+
(error?.asCall() ?: Unit.asCall())
171+
}
172+
173+
fun verifyMarkMessagesAsDeliveredCalled(mode: VerificationMode = times(1), messages: List<Message>? = null) {
174+
verify(mockChatClient, mode).markMessagesAsDelivered(messages ?: any())
175+
}
176+
177+
fun verifyDeleteByMessageIdsCalled(mode: VerificationMode = times(1), messageIds: List<String>? = null) {
178+
verifyBlocking(mockMessageReceiptRepository, mode) { deleteByMessageIds(messageIds ?: any()) }
179+
}
180+
181+
fun get(scope: CoroutineScope) = MessageReceiptReporter(
182+
scope = scope,
183+
chatClient = mockChatClient,
184+
messageReceiptRepository = mockMessageReceiptRepository,
185+
)
186+
}
187+
}

0 commit comments

Comments
 (0)