Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import org.json.JSONArray
Expand Down Expand Up @@ -72,6 +73,7 @@ private class Mocks {
Time(),
getNewRecordState(configModelStore),
),
recordPrivateCalls = true,
)
}
}
Expand Down Expand Up @@ -108,9 +110,9 @@ class OperationRepoTests : FunSpec({

cachedOperation.id = UUID.randomUUID().toString()
newOperation.id = UUID.randomUUID().toString()
val storeWaiter = Waiter()
every { operationModelStore.create(any()) } answers {
// simulate a prolonged loading from cache
Thread.sleep(1000)
runBlocking { storeWaiter.waitForWake() }
cachedOperation
}

Expand All @@ -131,16 +133,8 @@ class OperationRepoTests : FunSpec({

// Then
// insertion from the main thread is done without blocking
mainThread.join(500)
mainThread.state shouldBe Thread.State.TERMINATED

// Wait for the async enqueue to complete (give it more time)
var attempts = 0
while (operationRepo.queue.size == 0 && attempts < 50) {
Thread.sleep(10)
attempts++
}
operationRepo.queue.size shouldBe 1
mainThread.join()
storeWaiter.wake()

// after loading is completed, the cached operation should be at the beginning of the queue
backgroundThread.join()
Expand All @@ -150,7 +144,8 @@ class OperationRepoTests : FunSpec({

test("containsInstanceOf") {
// Given
val operationRepo = Mocks().operationRepo
val mocks = Mocks()
val operationRepo = mocks.operationRepo

open class MyOperation : Operation("MyOp") {
override val createComparisonKey = ""
Expand All @@ -165,13 +160,7 @@ class OperationRepoTests : FunSpec({
// When
operationRepo.start()
operationRepo.enqueue(MyOperation())

// Wait for the async enqueue to complete
var attempts = 0
while (!operationRepo.containsInstanceOf<MyOperation>() && attempts < 50) {
Thread.sleep(10)
attempts++
}
mocks.waitForInternalEnqueue()

// Then
operationRepo.containsInstanceOf<MyOperation>() shouldBe true
Expand All @@ -182,13 +171,16 @@ class OperationRepoTests : FunSpec({
test("ensure processQueueForever suspends when queue is empty") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 10

// When
mocks.operationRepo.start()
val response = mocks.operationRepo.enqueueAndWait(mockOperation())
// Must wait for background logic to spin to see how many times it
// will call getNextOps()
delay(1_000)
// KEEP: This delay must be kept as the implementation can change
// and this is the most reliable way to ensure waiting is happening.
// If this test as written in another way it could be fragile and/or
// pass when it shouldn't.
delay(500)

// Then
response shouldBe true
Expand Down Expand Up @@ -271,12 +263,15 @@ class OperationRepoTests : FunSpec({
val opRepo = mocks.operationRepo
coEvery {
mocks.executor.execute(any())
} returns ExecutionResponse(ExecutionResult.FAIL_RETRY, retryAfterSeconds = 1) andThen ExecutionResponse(ExecutionResult.SUCCESS)
} returns
ExecutionResponse(ExecutionResult.FAIL_RETRY, retryAfterSeconds = 1) andThen
ExecutionResponse(ExecutionResult.SUCCESS)

// When
opRepo.start()
opRepo.enqueue(mockOperation())
Thread.sleep(200) // Give time for the operation to be processed and retry delay to be set
mocks.waitForInternalEnqueue()

val response1 =
withTimeoutOrNull(500) {
opRepo.enqueueAndWait(mockOperation())
Expand Down Expand Up @@ -504,8 +499,7 @@ class OperationRepoTests : FunSpec({
mocks.operationRepo.enqueue(mockOperation())
val response =
withTimeoutOrNull(100) {
val value = mocks.operationRepo.enqueueAndWait(mockOperation(), flush = true)
value
mocks.operationRepo.enqueueAndWait(mockOperation(), flush = true)
}
response shouldBe true
}
Expand Down Expand Up @@ -571,24 +565,17 @@ class OperationRepoTests : FunSpec({
test("starting OperationModelStore should be processed, following normal delay rules") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 200
every { mocks.operationModelStore.list() } returns listOf(mockOperation())
val executeOperationsCall = mockExecuteOperations(mocks.operationRepo)
val operations = listOf(mockOperation())
every { mocks.operationModelStore.list() } returns operations

// When
mocks.operationRepo.start()
val immediateResult =
withTimeoutOrNull(200) {
executeOperationsCall.waitForWake()
}
val delayedResult =
withTimeoutOrNull(200) {
executeOperationsCall.waitForWake()
}
mocks.operationRepo.enqueueAndWait(mockOperationNonGroupable())

// Then - with parallel execution, timing may vary, so we just verify the operation eventually executes
val result = immediateResult ?: delayedResult
result shouldBe true
coVerifyOrder {
mocks.operationRepo["waitForNewOperationAndExecutionInterval"]()
mocks.executor.execute(operations)
}
}

test("ensure results from executeOperations are added to beginning of the queue") {
Expand Down Expand Up @@ -630,7 +617,6 @@ class OperationRepoTests : FunSpec({
test("execution of an operation with translation IDs delays follow up operations") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoPostCreateDelay = 100
val operation1 = mockOperation(groupComparisonType = GroupComparisonType.NONE)
operation1.id = "local-id1"
val operation2 = mockOperation(groupComparisonType = GroupComparisonType.NONE, applyToRecordId = "local-id1")
Expand Down Expand Up @@ -684,7 +670,6 @@ class OperationRepoTests : FunSpec({
test("execution of an operation with translation IDs removes the operation from queue before delay") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoPostCreateDelay = 100
val operation = mockOperation(groupComparisonType = GroupComparisonType.NONE)
val opId = operation.id
val idTranslation = mapOf("local-id1" to "id1")
Expand Down Expand Up @@ -751,14 +736,8 @@ class OperationRepoTests : FunSpec({
val op = mockOperation()
mocks.operationRepo.enqueue(op)

// Wait for the async enqueue to complete
var attempts = 0
while (mocks.operationRepo.queue.size == 0 && attempts < 50) {
Thread.sleep(10)
attempts++
}

// When
mocks.waitForInternalEnqueue()
mocks.operationRepo.loadSavedOperations()

// Then
Expand Down Expand Up @@ -797,7 +776,7 @@ class OperationRepoTests : FunSpec({
// When
opRepo.start()
opRepo.enqueue(mockOperation())
Thread.sleep(100) // Give time for the operation to be processed and retry delay to be set
mocks.waitForInternalEnqueue()
val response1 =
withTimeoutOrNull(999) {
opRepo.enqueueAndWait(mockOperation())
Expand All @@ -820,7 +799,6 @@ class OperationRepoTests : FunSpec({
test("translation IDs are applied before operations are grouped with correct execution order") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoPostCreateDelay = 100

// Track execution order using a list
val executionOrder = mutableListOf<String>()
Expand All @@ -833,13 +811,11 @@ class OperationRepoTests : FunSpec({
// Mock the translateIds call to track when translation happens
every { groupableOp1.translateIds(any()) } answers {
executionOrder.add("translate-groupable-1")
Unit
}

// Mock groupableOp2 to ensure it doesn't get translated
every { groupableOp2.translateIds(any()) } answers {
executionOrder.add("translate-groupable-2-unexpected")
Unit
}

// Mock all execution calls and track them
Expand Down Expand Up @@ -876,23 +852,6 @@ class OperationRepoTests : FunSpec({
mocks.operationRepo.enqueue(groupableOp1) // This needs translation
mocks.operationRepo.enqueueAndWait(groupableOp2) // This doesn't need translation but should be grouped

// Wait for all critical async operations to complete
// We need: execute-translation-source, translate-groupable-1, execute-grouped-operations
var attempts = 0
val maxAttempts = 200 // Increased timeout for CI/CD environments (200 * 20ms = 4 seconds)
while (attempts < maxAttempts) {
val hasTranslationSource = executionOrder.contains("execute-translation-source")
val hasTranslation = executionOrder.contains("translate-groupable-1")
val hasGroupedExecution = executionOrder.contains("execute-grouped-operations")

if (hasTranslationSource && hasTranslation && hasGroupedExecution) {
break // All critical events have occurred
}

Thread.sleep(20)
attempts++
}

// Then verify the critical execution order
executionOrder.size shouldBeGreaterThan 2 // At minimum: Translation source + translation + grouped execution (>= 3)

Expand Down Expand Up @@ -964,10 +923,17 @@ class OperationRepoTests : FunSpec({
val executeWaiter = WaiterWithValue<Boolean>()
coEvery { opRepo.executeOperations(any()) } coAnswers {
executeWaiter.wake(true)
delay(10)
firstArg<List<OperationRepo.OperationQueueItem>>().forEach { it.waiter?.wake(true) }
}
return executeWaiter
}
}
}

private fun Mocks.waitForInternalEnqueue() {
verify(timeout = 100) {
operationRepo["internalEnqueue"](
any<OperationQueueItem>(), any<Boolean>(), any<Boolean>(), any<Int>()
)
}
}