8
8
import io .fabric8 .kubernetes .client .CustomResource ;
9
9
import io .fabric8 .kubernetes .client .Watcher ;
10
10
import org .assertj .core .api .Condition ;
11
- import org .junit .jupiter .api .Disabled ;
12
11
import org .junit .jupiter .api .Test ;
13
12
import org .mockito .invocation .InvocationOnMock ;
14
13
15
14
import java .time .LocalDateTime ;
16
15
import java .util .ArrayList ;
17
16
import java .util .Collections ;
18
17
import java .util .List ;
19
- import java .util .concurrent .CompletableFuture ;
20
18
21
- import static com .github .containersolutions .operator .processing .retry .GenericRetry .* ;
19
+ import static com .github .containersolutions .operator .processing .retry .GenericRetry .DEFAULT_INITIAL_INTERVAL ;
22
20
import static org .assertj .core .api .Assertions .assertThat ;
23
21
import static org .assertj .core .api .Assertions .atIndex ;
24
22
import static org .mockito .Mockito .*;
25
23
26
24
class EventSchedulerTest {
27
25
28
26
public static final int INVOCATION_DURATION = 80 ;
27
+ public static final int MAX_RETRY_ATTEMPTS = 3 ;
29
28
@ SuppressWarnings ("unchecked" )
30
29
private EventDispatcher eventDispatcher = mock (EventDispatcher .class );
31
30
32
- private EventScheduler eventScheduler = new EventScheduler (eventDispatcher , GenericRetry . defaultLimitedExponentialRetry ());
31
+ private EventScheduler eventScheduler = new EventScheduler (eventDispatcher , new GenericRetry (). setMaxAttempts ( MAX_RETRY_ATTEMPTS ). withLinearRetry ());
33
32
34
33
private List <EventProcessingDetail > eventProcessingList = Collections .synchronizedList (new ArrayList <>());
35
34
@@ -53,9 +52,8 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
53
52
CustomResource resource2 = sampleResource ();
54
53
resource2 .getMetadata ().setResourceVersion ("2" );
55
54
56
- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 ));
57
- Thread .sleep (50 );
58
- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 ));
55
+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
56
+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
59
57
60
58
waitTimeForExecution (2 );
61
59
assertThat (eventProcessingList ).hasSize (2 )
@@ -85,44 +83,41 @@ public void retriesEventsWithErrors() {
85
83
.has (new Condition <>(e -> e .getException () == null , "" ), atIndex (1 ));
86
84
}
87
85
88
- @ Disabled ( "Todo change according to new scheduling" )
86
+
89
87
@ Test
90
- public void schedulesEventIfOlderVersionIsAlreadyUnderProcessing () {
91
- normalDispatcherExecution ();
88
+ public void processesNewEventIfItIsReceivedAfterExecutionInError () {
92
89
CustomResource resource1 = sampleResource ();
93
90
CustomResource resource2 = sampleResource ();
94
91
resource2 .getMetadata ().setResourceVersion ("2" );
95
92
96
- doAnswer (invocation -> {
97
- Object [] args = invocation .getArguments ();
98
- LocalDateTime start = LocalDateTime .now ();
99
- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 ));
100
- Thread .sleep (INVOCATION_DURATION );
101
- LocalDateTime end = LocalDateTime .now ();
102
- eventProcessingList .add (new EventProcessingDetail ((Watcher .Action ) args [0 ], start , end , (CustomResource ) args [1 ]));
103
- return null ;
104
- }).doAnswer (this ::normalExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), any (CustomResource .class ));
93
+ doAnswer (this ::exceptionInExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), eq (resource1 ));
94
+ doAnswer (this ::normalExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), eq (resource2 ));
105
95
106
- CompletableFuture .runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 ));
96
+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource1 );
97
+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , resource2 );
107
98
108
99
waitTimeForExecution (2 );
100
+
109
101
assertThat (eventProcessingList ).hasSize (2 )
110
102
.matches (list -> eventProcessingList .get (0 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("1" ) &&
111
103
eventProcessingList .get (1 ).getCustomResource ().getMetadata ().getResourceVersion ().equals ("2" ),
112
104
"Events processed in correct order" )
113
105
.matches (list ->
114
106
eventProcessingList .get (0 ).getEndTime ().isBefore (eventProcessingList .get (1 ).startTime ),
115
107
"Start time of event 2 is after end time of event 1" );
108
+
109
+ assertThat (eventProcessingList .get (0 ).getException ()).isNotNull ();
110
+ assertThat (eventProcessingList .get (1 ).getException ()).isNull ();
116
111
}
117
112
118
113
@ Test
119
114
public void numberOfRetriesIsLimited () {
120
115
doAnswer (this ::exceptionInExecution ).when (eventDispatcher ).handleEvent (any (Watcher .Action .class ), any (CustomResource .class ));
121
116
122
- CompletableFuture . runAsync (() -> eventScheduler .eventReceived (Watcher .Action .MODIFIED , sampleResource () ));
117
+ eventScheduler .eventReceived (Watcher .Action .MODIFIED , sampleResource ());
123
118
124
- waitTimeForExecution (1 , DEFAULT_MAX_ATTEMPTS + 2 );
125
- assertThat (eventProcessingList ).hasSize (DEFAULT_MAX_ATTEMPTS );
119
+ waitTimeForExecution (1 , MAX_RETRY_ATTEMPTS + 2 );
120
+ assertThat (eventProcessingList ).hasSize (MAX_RETRY_ATTEMPTS );
126
121
}
127
122
128
123
public void normalDispatcherExecution () {
@@ -167,8 +162,8 @@ private void waitTimeForExecution(int numberOfEvents) {
167
162
168
163
private void waitTimeForExecution (int numberOfEvents , int retries ) {
169
164
try {
170
- Thread .sleep (( long ) ( 200 + ((INVOCATION_DURATION + 30 ) * numberOfEvents ) + (retries * (INVOCATION_DURATION + 100 )) +
171
- ( Math . pow ( DEFAULT_MULTIPLIER , retries ) * (DEFAULT_INITIAL_INTERVAL + 100 )) ));
165
+ Thread .sleep (200 + ((INVOCATION_DURATION + 30 ) * numberOfEvents ) + (retries * (INVOCATION_DURATION + 100 )) +
166
+ retries * (DEFAULT_INITIAL_INTERVAL + 100 ));
172
167
} catch (InterruptedException e ) {
173
168
throw new IllegalStateException (e );
174
169
}
0 commit comments