11package com.onesignal.core.internal.operations.impl
22
33import com.onesignal.common.threading.WaiterWithValue
4- import com.onesignal.common.threading.suspendifyOnIO
54import com.onesignal.core.internal.config.ConfigModelStore
65import com.onesignal.core.internal.operations.ExecutionResult
76import com.onesignal.core.internal.operations.GroupComparisonType
@@ -14,7 +13,10 @@ import com.onesignal.debug.LogLevel
1413import com.onesignal.debug.internal.logging.Logging
1514import com.onesignal.user.internal.operations.impl.states.NewRecordsState
1615import kotlinx.coroutines.CompletableDeferred
16+ import kotlinx.coroutines.CoroutineScope
1717import kotlinx.coroutines.delay
18+ import kotlinx.coroutines.launch
19+ import kotlinx.coroutines.newSingleThreadContext
1820import kotlinx.coroutines.withTimeoutOrNull
1921import java.util.UUID
2022import kotlin.math.max
@@ -43,6 +45,14 @@ internal class OperationRepo(
4345 val previousWaitedTime : Long = 0 ,
4446 )
4547
48+ // The order of operation execution is critical to this OperationRepo
49+ // logic, all processing must be done on same thread to ensure this.
50+ // - This result of not following this is flaky tests, which inturn could
51+ // result in bugs in production.
52+ private val scope by lazy {
53+ CoroutineScope (newSingleThreadContext(name = " OSOperationRepoScope" ))
54+ }
55+
4656 private val executorsMap: Map <String , IOperationExecutor >
4757 internal val queue = mutableListOf<OperationQueueItem >()
4858 private val waiter = WaiterWithValue <LoopWaiterMessage >()
@@ -92,7 +102,7 @@ internal class OperationRepo(
92102
93103 override fun start () {
94104 paused = false
95- suspendifyOnIO {
105+ scope.launch {
96106 // load saved operations first then start processing the queue to ensure correct operation order
97107 loadSavedOperations()
98108 processQueueForever()
@@ -113,8 +123,7 @@ internal class OperationRepo(
113123 Logging .log(LogLevel .DEBUG , " OperationRepo.enqueue(operation: $operation , flush: $flush )" )
114124
115125 operation.id = UUID .randomUUID().toString()
116- // Use suspendifyOnIO to ensure non-blocking behavior for main thread
117- suspendifyOnIO {
126+ scope.launch {
118127 internalEnqueue(OperationQueueItem (operation, bucket = enqueueIntoBucket), flush, true )
119128 }
120129 }
@@ -127,7 +136,9 @@ internal class OperationRepo(
127136
128137 operation.id = UUID .randomUUID().toString()
129138 val waiter = WaiterWithValue <Boolean >()
130- internalEnqueue(OperationQueueItem (operation, waiter, bucket = enqueueIntoBucket), flush, true )
139+ scope.launch {
140+ internalEnqueue(OperationQueueItem (operation, waiter, bucket = enqueueIntoBucket), flush, true )
141+ }
131142 return waiter.waitForWake()
132143 }
133144
0 commit comments