Skip to content

Commit 24dfb49

Browse files
authored
Merge pull request #2419 from OneSignal/fix-flaky-operationrepo-execution-order
fix: Rare User and Subscription creates and updates processing out of order (introduced in 5.4.0)
2 parents fb0fc32 + cbaa874 commit 24dfb49

File tree

1 file changed

+16
-5
lines changed
  • OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl

1 file changed

+16
-5
lines changed

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.onesignal.core.internal.operations.impl
22

33
import com.onesignal.common.threading.WaiterWithValue
4-
import com.onesignal.common.threading.suspendifyOnIO
54
import com.onesignal.core.internal.config.ConfigModelStore
65
import com.onesignal.core.internal.operations.ExecutionResult
76
import com.onesignal.core.internal.operations.GroupComparisonType
@@ -14,7 +13,10 @@ import com.onesignal.debug.LogLevel
1413
import com.onesignal.debug.internal.logging.Logging
1514
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
1615
import kotlinx.coroutines.CompletableDeferred
16+
import kotlinx.coroutines.CoroutineScope
1717
import kotlinx.coroutines.delay
18+
import kotlinx.coroutines.launch
19+
import kotlinx.coroutines.newSingleThreadContext
1820
import kotlinx.coroutines.withTimeoutOrNull
1921
import java.util.UUID
2022
import kotlin.math.max
@@ -43,6 +45,14 @@ internal class OperationRepo(
4345
val previousWaitedTime: Long = 0,
4446
)
4547

48+
// The order of operation execution is critical to this OperationRepo
49+
// logic, all processing must be done on same thread to ensure this.
50+
// - This result of not following this is flaky tests, which inturn could
51+
// result in bugs in production.
52+
private val scope by lazy {
53+
CoroutineScope(newSingleThreadContext(name = "OSOperationRepoScope"))
54+
}
55+
4656
private val executorsMap: Map<String, IOperationExecutor>
4757
internal val queue = mutableListOf<OperationQueueItem>()
4858
private val waiter = WaiterWithValue<LoopWaiterMessage>()
@@ -92,7 +102,7 @@ internal class OperationRepo(
92102

93103
override fun start() {
94104
paused = false
95-
suspendifyOnIO {
105+
scope.launch {
96106
// load saved operations first then start processing the queue to ensure correct operation order
97107
loadSavedOperations()
98108
processQueueForever()
@@ -113,8 +123,7 @@ internal class OperationRepo(
113123
Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)")
114124

115125
operation.id = UUID.randomUUID().toString()
116-
// Use suspendifyOnIO to ensure non-blocking behavior for main thread
117-
suspendifyOnIO {
126+
scope.launch {
118127
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true)
119128
}
120129
}
@@ -127,7 +136,9 @@ internal class OperationRepo(
127136

128137
operation.id = UUID.randomUUID().toString()
129138
val waiter = WaiterWithValue<Boolean>()
130-
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
139+
scope.launch {
140+
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
141+
}
131142
return waiter.waitForWake()
132143
}
133144

0 commit comments

Comments
 (0)