@@ -31,6 +31,7 @@ import io.mockk.spyk
3131import io.mockk.verify
3232import kotlinx.coroutines.delay
3333import kotlinx.coroutines.launch
34+ import kotlinx.coroutines.runBlocking
3435import kotlinx.coroutines.withTimeout
3536import kotlinx.coroutines.withTimeoutOrNull
3637import org.json.JSONArray
@@ -72,6 +73,7 @@ private class Mocks {
7273 Time (),
7374 getNewRecordState(configModelStore),
7475 ),
76+ recordPrivateCalls = true ,
7577 )
7678 }
7779}
@@ -108,9 +110,9 @@ class OperationRepoTests : FunSpec({
108110
109111 cachedOperation.id = UUID .randomUUID().toString()
110112 newOperation.id = UUID .randomUUID().toString()
113+ val storeWaiter = Waiter ()
111114 every { operationModelStore.create(any()) } answers {
112- // simulate a prolonged loading from cache
113- Thread .sleep(1000)
115+ runBlocking { storeWaiter.waitForWake() }
114116 cachedOperation
115117 }
116118
@@ -131,16 +133,8 @@ class OperationRepoTests : FunSpec({
131133
132134 // Then
133135 // insertion from the main thread is done without blocking
134- mainThread.join(500)
135- mainThread.state shouldBe Thread .State .TERMINATED
136-
137- // Wait for the async enqueue to complete (give it more time)
138- var attempts = 0
139- while (operationRepo.queue.size == 0 && attempts < 50) {
140- Thread .sleep(10)
141- attempts++
142- }
143- operationRepo.queue.size shouldBe 1
136+ mainThread.join()
137+ storeWaiter.wake()
144138
145139 // after loading is completed, the cached operation should be at the beginning of the queue
146140 backgroundThread.join()
@@ -150,7 +144,8 @@ class OperationRepoTests : FunSpec({
150144
151145 test("containsInstanceOf") {
152146 // Given
153- val operationRepo = Mocks ().operationRepo
147+ val mocks = Mocks ()
148+ val operationRepo = mocks.operationRepo
154149
155150 open class MyOperation : Operation ("MyOp ") {
156151 override val createComparisonKey = " "
@@ -165,13 +160,7 @@ class OperationRepoTests : FunSpec({
165160 // When
166161 operationRepo.start()
167162 operationRepo.enqueue(MyOperation ())
168-
169- // Wait for the async enqueue to complete
170- var attempts = 0
171- while (!operationRepo.containsInstanceOf<MyOperation >() && attempts < 50) {
172- Thread .sleep(10)
173- attempts++
174- }
163+ mocks.waitForInternalEnqueue()
175164
176165 // Then
177166 operationRepo.containsInstanceOf<MyOperation >() shouldBe true
@@ -182,13 +171,16 @@ class OperationRepoTests : FunSpec({
182171 test("ensure processQueueForever suspends when queue is empty") {
183172 // Given
184173 val mocks = Mocks ()
174+ mocks.configModelStore.model.opRepoExecutionInterval = 10
185175
186176 // When
187177 mocks.operationRepo.start()
188178 val response = mocks.operationRepo.enqueueAndWait(mockOperation())
189- // Must wait for background logic to spin to see how many times it
190- // will call getNextOps()
191- delay(1_000)
179+ // KEEP: This delay must be kept as the implementation can change
180+ // and this is the most reliable way to ensure waiting is happening.
181+ // If this test as written in another way it could be fragile and/or
182+ // pass when it shouldn't.
183+ delay(500)
192184
193185 // Then
194186 response shouldBe true
@@ -271,12 +263,15 @@ class OperationRepoTests : FunSpec({
271263 val opRepo = mocks.operationRepo
272264 coEvery {
273265 mocks.executor.execute(any())
274- } returns ExecutionResponse (ExecutionResult .FAIL_RETRY , retryAfterSeconds = 1) andThen ExecutionResponse (ExecutionResult .SUCCESS )
266+ } returns
267+ ExecutionResponse (ExecutionResult .FAIL_RETRY , retryAfterSeconds = 1) andThen
268+ ExecutionResponse (ExecutionResult .SUCCESS )
275269
276270 // When
277271 opRepo.start()
278272 opRepo.enqueue(mockOperation())
279- Thread .sleep(200) // Give time for the operation to be processed and retry delay to be set
273+ mocks.waitForInternalEnqueue()
274+
280275 val response1 =
281276 withTimeoutOrNull(500) {
282277 opRepo.enqueueAndWait(mockOperation())
@@ -504,8 +499,7 @@ class OperationRepoTests : FunSpec({
504499 mocks.operationRepo.enqueue(mockOperation())
505500 val response =
506501 withTimeoutOrNull(100) {
507- val value = mocks.operationRepo.enqueueAndWait(mockOperation(), flush = true)
508- value
502+ mocks.operationRepo.enqueueAndWait(mockOperation(), flush = true)
509503 }
510504 response shouldBe true
511505 }
@@ -571,24 +565,17 @@ class OperationRepoTests : FunSpec({
571565 test("starting OperationModelStore should be processed, following normal delay rules") {
572566 // Given
573567 val mocks = Mocks ()
574- mocks.configModelStore.model.opRepoExecutionInterval = 200
575- every { mocks.operationModelStore.list() } returns listOf(mockOperation())
576- val executeOperationsCall = mockExecuteOperations(mocks.operationRepo)
568+ val operations = listOf(mockOperation())
569+ every { mocks.operationModelStore.list() } returns operations
577570
578571 // When
579572 mocks.operationRepo.start()
580- val immediateResult =
581- withTimeoutOrNull(200) {
582- executeOperationsCall.waitForWake()
583- }
584- val delayedResult =
585- withTimeoutOrNull(200) {
586- executeOperationsCall.waitForWake()
587- }
573+ mocks.operationRepo.enqueueAndWait(mockOperationNonGroupable())
588574
589- // Then - with parallel execution, timing may vary, so we just verify the operation eventually executes
590- val result = immediateResult ? : delayedResult
591- result shouldBe true
575+ coVerifyOrder {
576+ mocks.operationRepo[" waitForNewOperationAndExecutionInterval" ]()
577+ mocks.executor.execute(operations)
578+ }
592579 }
593580
594581 test("ensure results from executeOperations are added to beginning of the queue") {
@@ -630,7 +617,6 @@ class OperationRepoTests : FunSpec({
630617 test("execution of an operation with translation IDs delays follow up operations") {
631618 // Given
632619 val mocks = Mocks ()
633- mocks.configModelStore.model.opRepoPostCreateDelay = 100
634620 val operation1 = mockOperation(groupComparisonType = GroupComparisonType .NONE )
635621 operation1.id = " local-id1"
636622 val operation2 = mockOperation(groupComparisonType = GroupComparisonType .NONE , applyToRecordId = "local-id1")
@@ -684,7 +670,6 @@ class OperationRepoTests : FunSpec({
684670 test("execution of an operation with translation IDs removes the operation from queue before delay") {
685671 // Given
686672 val mocks = Mocks ()
687- mocks.configModelStore.model.opRepoPostCreateDelay = 100
688673 val operation = mockOperation(groupComparisonType = GroupComparisonType .NONE )
689674 val opId = operation.id
690675 val idTranslation = mapOf("local-id1" to "id1")
@@ -751,14 +736,8 @@ class OperationRepoTests : FunSpec({
751736 val op = mockOperation()
752737 mocks.operationRepo.enqueue(op)
753738
754- // Wait for the async enqueue to complete
755- var attempts = 0
756- while (mocks.operationRepo.queue.size == 0 && attempts < 50) {
757- Thread .sleep(10)
758- attempts++
759- }
760-
761739 // When
740+ mocks.waitForInternalEnqueue()
762741 mocks.operationRepo.loadSavedOperations()
763742
764743 // Then
@@ -797,7 +776,7 @@ class OperationRepoTests : FunSpec({
797776 // When
798777 opRepo.start()
799778 opRepo.enqueue(mockOperation())
800- Thread .sleep(100) // Give time for the operation to be processed and retry delay to be set
779+ mocks.waitForInternalEnqueue()
801780 val response1 =
802781 withTimeoutOrNull(999) {
803782 opRepo.enqueueAndWait(mockOperation())
@@ -820,7 +799,6 @@ class OperationRepoTests : FunSpec({
820799 test("translation IDs are applied before operations are grouped with correct execution order") {
821800 // Given
822801 val mocks = Mocks ()
823- mocks.configModelStore.model.opRepoPostCreateDelay = 100
824802
825803 // Track execution order using a list
826804 val executionOrder = mutableListOf<String >()
@@ -833,13 +811,11 @@ class OperationRepoTests : FunSpec({
833811 // Mock the translateIds call to track when translation happens
834812 every { groupableOp1.translateIds(any()) } answers {
835813 executionOrder.add("translate-groupable-1")
836- Unit
837814 }
838815
839816 // Mock groupableOp2 to ensure it doesn't get translated
840817 every { groupableOp2.translateIds(any()) } answers {
841818 executionOrder.add("translate-groupable-2-unexpected")
842- Unit
843819 }
844820
845821 // Mock all execution calls and track them
@@ -876,23 +852,6 @@ class OperationRepoTests : FunSpec({
876852 mocks.operationRepo.enqueue(groupableOp1) // This needs translation
877853 mocks.operationRepo.enqueueAndWait(groupableOp2) // This doesn't need translation but should be grouped
878854
879- // Wait for all critical async operations to complete
880- // We need: execute-translation-source, translate-groupable-1, execute-grouped-operations
881- var attempts = 0
882- val maxAttempts = 200 // Increased timeout for CI/CD environments (200 * 20ms = 4 seconds)
883- while (attempts < maxAttempts) {
884- val hasTranslationSource = executionOrder.contains("execute-translation-source")
885- val hasTranslation = executionOrder.contains("translate-groupable-1")
886- val hasGroupedExecution = executionOrder.contains("execute-grouped-operations")
887-
888- if (hasTranslationSource && hasTranslation && hasGroupedExecution) {
889- break // All critical events have occurred
890- }
891-
892- Thread .sleep(20)
893- attempts++
894- }
895-
896855 // Then verify the critical execution order
897856 executionOrder.size shouldBeGreaterThan 2 // At minimum: Translation source + translation + grouped execution (>= 3)
898857
@@ -964,10 +923,17 @@ class OperationRepoTests : FunSpec({
964923 val executeWaiter = WaiterWithValue <Boolean >()
965924 coEvery { opRepo.executeOperations(any()) } coAnswers {
966925 executeWaiter.wake(true )
967- delay(10 )
968926 firstArg<List <OperationRepo .OperationQueueItem >>().forEach { it.waiter?.wake(true ) }
969927 }
970928 return executeWaiter
971929 }
972930 }
973931}
932+
933+ private fun Mocks.waitForInternalEnqueue () {
934+ verify(timeout = 100 ) {
935+ operationRepo[" internalEnqueue" ](
936+ any<OperationQueueItem >(), any<Boolean >(), any<Boolean >(), any<Int >()
937+ )
938+ }
939+ }
0 commit comments