@@ -91,15 +91,31 @@ internal class EvaluationEventsProcessorTest {
9191 private lateinit var fakeData: PrecomputedFlag
9292 private lateinit var fakeDDContext: DDContext
9393
94+ fun createEvaluationEventsProcessor (
95+ writer : EvaluationEventWriter ,
96+ timeProvider : TimeProvider ,
97+ scheduledExecutor : ScheduledExecutorService ,
98+ internalLogger : InternalLogger ,
99+ flushIntervalMs : Long = TEST_FLUSH_INTERVAL_MS ,
100+ maxAggregations : Int = TEST_MAX_AGGREGATIONS
101+ ): EvaluationEventsProcessor {
102+ return EvaluationEventsProcessor (
103+ writer = writer,
104+ timeProvider = timeProvider,
105+ scheduledExecutor = scheduledExecutor,
106+ internalLogger = internalLogger,
107+ flushIntervalMs = flushIntervalMs,
108+ maxAggregations = maxAggregations
109+ )
110+ }
111+
94112 @BeforeEach
95113 fun `set up` (forge : Forge ) {
96- testedProcessor = EvaluationEventsProcessor (
114+ testedProcessor = createEvaluationEventsProcessor (
97115 writer = mockWriter,
98116 timeProvider = mockTimeProvider,
99117 scheduledExecutor = mockScheduledExecutor,
100- internalLogger = mockInternalLogger,
101- flushIntervalMs = TEST_FLUSH_INTERVAL_MS ,
102- maxAggregations = TEST_MAX_AGGREGATIONS
118+ internalLogger = mockInternalLogger
103119 )
104120
105121 fakeContext = EvaluationContext (targetingKey = fakeTargetingKey)
@@ -361,12 +377,12 @@ internal class EvaluationEventsProcessorTest {
361377 }
362378
363379 @Test
364- fun `M create separate aggregations W processEvaluation() { different error codes }` () {
380+ fun `M create separate aggregations W processEvaluation() { different error codes }` (forge : Forge ) {
365381 // Given
366382 val errorCode1 = ErrorCode .FLAG_NOT_FOUND .name
367383 val errorCode2 = ErrorCode .PROVIDER_NOT_READY .name
368- val errorMessage1 = " Flag not found "
369- val errorMessage2 = " Provider not ready "
384+ val errorMessage1 = forge.anAlphabeticalString()
385+ val errorMessage2 = forge.anAlphabeticalString()
370386
371387 // When
372388 testedProcessor.processEvaluation(
@@ -493,7 +509,7 @@ internal class EvaluationEventsProcessorTest {
493509 fun `M flush automatically W processEvaluation() { size limit reached }` () {
494510 // Given
495511 val maxAggregations = 5
496- val testProcessor = EvaluationEventsProcessor (
512+ val testProcessor = createEvaluationEventsProcessor (
497513 writer = mockWriter,
498514 timeProvider = mockTimeProvider,
499515 scheduledExecutor = mockScheduledExecutor,
@@ -551,13 +567,14 @@ internal class EvaluationEventsProcessorTest {
551567 )
552568 testedProcessor.flush()
553569
554- // Then
570+ // Then - should have called writeAll twice, once per flush
555571 val eventCaptor = argumentCaptor<List <BatchedFlagEvaluations .FlagEvaluation >>()
556- verify(mockWriter).writeAll(eventCaptor.capture())
572+ verify(mockWriter, times( 2 ) ).writeAll(eventCaptor.capture())
557573
558- val events = eventCaptor.firstValue
559- assertThat(events).hasSize(2 )
560- assertThat(events.map { it.evaluationCount }).containsOnly(1L )
574+ // Flatten all events from both writeAll calls
575+ val allEvents = eventCaptor.allValues.flatten()
576+ assertThat(allEvents).hasSize(2 )
577+ assertThat(allEvents.map { it.evaluationCount }).containsOnly(1L )
561578 }
562579
563580 @Test
@@ -676,7 +693,7 @@ internal class EvaluationEventsProcessorTest {
676693 testedProcessor.schedulePeriodicFlush() // Initial schedule
677694 assertThat(scheduleCount).isEqualTo(1 )
678695
679- taskRunnable? .run () // Execute the scheduled task
696+ checkNotNull( taskRunnable) .run () // Execute the scheduled task
680697
681698 // Then - should have rescheduled itself
682699 assertThat(scheduleCount).isEqualTo(2 )
@@ -698,7 +715,7 @@ internal class EvaluationEventsProcessorTest {
698715
699716 // When
700717 testedProcessor.schedulePeriodicFlush()
701- taskRunnable? .run () // Execute with no evaluations to flush
718+ checkNotNull( taskRunnable) .run () // Execute with no evaluations to flush
702719
703720 // Then - should not write but should reschedule
704721 verify(mockWriter, never()).writeAll(any())
@@ -825,7 +842,7 @@ internal class EvaluationEventsProcessorTest {
825842 stopThread.start()
826843
827844 // Execute the scheduled task (which will try to reschedule)
828- taskRunnable? .run ()
845+ checkNotNull( taskRunnable) .run ()
829846
830847 stopThread.join()
831848
@@ -1011,16 +1028,13 @@ internal class EvaluationEventsProcessorTest {
10111028 val slowWriteLatch = CountDownLatch (1 )
10121029 val writeStartedLatch = CountDownLatch (1 )
10131030 val slowWriter = object : EvaluationEventWriter {
1014- override fun write (event : BatchedFlagEvaluations .FlagEvaluation ) {
1015- writeAll(listOf (event))
1016- }
10171031 override fun writeAll (events : List <BatchedFlagEvaluations .FlagEvaluation >) {
10181032 writeStartedLatch.countDown()
10191033 slowWriteLatch.await() // Block until released
10201034 }
10211035 }
10221036
1023- val slowProcessor = EvaluationEventsProcessor (
1037+ val slowProcessor = createEvaluationEventsProcessor (
10241038 writer = slowWriter,
10251039 timeProvider = mockTimeProvider,
10261040 scheduledExecutor = mockScheduledExecutor,
@@ -1122,40 +1136,27 @@ internal class EvaluationEventsProcessorTest {
11221136
11231137 @Test
11241138 fun `M continue accepting evaluations W flush() { during flush operation }` () {
1125- // Given - populate initial evaluations
1126- repeat(50 ) { index ->
1127- val context = EvaluationContext (targetingKey = " initial-$index " )
1128- testedProcessor.processEvaluation(
1129- fakeFlagKey,
1130- context,
1131- fakeDDContext,
1132- fakeData.variationKey,
1133- fakeData.allocationKey,
1134- fakeData.reason,
1135- null ,
1136- null
1137- )
1138- }
1139+ // Given - stub scheduler to avoid reschedule interactions
1140+ whenever(mockScheduledExecutor.schedule(any<Runnable >(), any(), any())) doReturn mockScheduledFuture
11391141
11401142 // Create a slow writer to simulate long flush
11411143 val flushInProgress = CountDownLatch (1 )
11421144 val continueFlush = CountDownLatch (1 )
11431145 val writeCount = java.util.concurrent.atomic.AtomicInteger (0 )
1146+ val firstCallFlag = java.util.concurrent.atomic.AtomicBoolean (true )
11441147
11451148 val slowWriter = object : EvaluationEventWriter {
1146- override fun write (event : BatchedFlagEvaluations .FlagEvaluation ) {
1147- writeAll(listOf (event))
1148- }
11491149 override fun writeAll (events : List <BatchedFlagEvaluations .FlagEvaluation >) {
1150+ val isFirstCall = firstCallFlag.compareAndSet(true , false )
11501151 writeCount.addAndGet(events.size)
1151- if (writeCount.get() == 1 ) {
1152+ if (isFirstCall ) {
11521153 flushInProgress.countDown() // Signal first write started
11531154 continueFlush.await() // Block until signaled
11541155 }
11551156 }
11561157 }
11571158
1158- val slowProcessor = EvaluationEventsProcessor (
1159+ val slowProcessor = createEvaluationEventsProcessor (
11591160 writer = slowWriter,
11601161 timeProvider = mockTimeProvider,
11611162 scheduledExecutor = mockScheduledExecutor,
0 commit comments