From cbaa874a5000c521ff05284afa2dfdf35205c567 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Fri, 21 Nov 2025 14:30:30 -0500 Subject: [PATCH] fix: ensure OperationRepo is executed in order Made a new internal coroutine scope with a single thread to the OperationRepo to ensure operations are alway executed in the order they are queued in. This mostly fixes the flaky OperationRepoTest class. Release 5.4.0 may have introduced bugs related to this, however they would have been rare and very hard to reproduce. --- .../internal/operations/impl/OperationRepo.kt | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 10d3b4dfa5..4439b688e3 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -1,7 +1,6 @@ package com.onesignal.core.internal.operations.impl import com.onesignal.common.threading.WaiterWithValue -import com.onesignal.common.threading.suspendifyOnIO import com.onesignal.core.internal.config.ConfigModelStore import com.onesignal.core.internal.operations.ExecutionResult import com.onesignal.core.internal.operations.GroupComparisonType @@ -14,7 +13,10 @@ import com.onesignal.debug.LogLevel import com.onesignal.debug.internal.logging.Logging import com.onesignal.user.internal.operations.impl.states.NewRecordsState import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.withTimeoutOrNull import java.util.UUID import kotlin.math.max @@ -43,6 +45,14 @@ internal class OperationRepo( val previousWaitedTime: Long = 0, ) + // The order of operation execution is critical to this OperationRepo + // logic, all processing must be done on same thread to ensure this. + // - This result of not following this is flaky tests, which inturn could + // result in bugs in production. + private val scope by lazy { + CoroutineScope(newSingleThreadContext(name = "OSOperationRepoScope")) + } + private val executorsMap: Map internal val queue = mutableListOf() private val waiter = WaiterWithValue() @@ -92,7 +102,7 @@ internal class OperationRepo( override fun start() { paused = false - suspendifyOnIO { + scope.launch { // load saved operations first then start processing the queue to ensure correct operation order loadSavedOperations() processQueueForever() @@ -113,8 +123,7 @@ internal class OperationRepo( Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)") operation.id = UUID.randomUUID().toString() - // Use suspendifyOnIO to ensure non-blocking behavior for main thread - suspendifyOnIO { + scope.launch { internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true) } } @@ -127,7 +136,9 @@ internal class OperationRepo( operation.id = UUID.randomUUID().toString() val waiter = WaiterWithValue() - internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true) + scope.launch { + internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true) + } return waiter.waitForWake() }