1
+ // Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2
+ // Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3
+
4
+ package oracle .kubernetes .operator .calls ;
5
+
6
+ import io .kubernetes .client .ApiCallback ;
7
+ import io .kubernetes .client .ApiClient ;
8
+ import io .kubernetes .client .ApiException ;
9
+ import oracle .kubernetes .TestUtils ;
10
+ import oracle .kubernetes .operator .helpers .ClientPool ;
11
+ import oracle .kubernetes .operator .helpers .ResponseStep ;
12
+ import oracle .kubernetes .operator .work .Engine ;
13
+ import oracle .kubernetes .operator .work .Fiber ;
14
+ import oracle .kubernetes .operator .work .NextAction ;
15
+ import oracle .kubernetes .operator .work .Packet ;
16
+
17
+ import com .meterware .simplestub .Memento ;
18
+
19
+ import java .net .HttpURLConnection ;
20
+ import java .util .ArrayDeque ;
21
+ import java .util .ArrayList ;
22
+ import java .util .Collections ;
23
+ import java .util .Iterator ;
24
+ import java .util .List ;
25
+ import java .util .Map ;
26
+ import java .util .Queue ;
27
+ import java .util .SortedSet ;
28
+ import java .util .TreeSet ;
29
+ import java .util .concurrent .ScheduledExecutorService ;
30
+ import java .util .concurrent .ScheduledFuture ;
31
+ import java .util .concurrent .TimeUnit ;
32
+
33
+ import javax .annotation .Nonnull ;
34
+ import javax .annotation .Nullable ;
35
+
36
+ import org .junit .After ;
37
+ import org .junit .Before ;
38
+ import org .junit .Test ;
39
+
40
+ import static com .meterware .simplestub .Stub .createStrictStub ;
41
+ import static com .meterware .simplestub .Stub .createStub ;
42
+ import static oracle .kubernetes .operator .calls .AsyncRequestStep .RESPONSE_COMPONENT_NAME ;
43
+ import static org .hamcrest .MatcherAssert .assertThat ;
44
+ import static org .hamcrest .Matchers .equalTo ;
45
+ import static org .hamcrest .Matchers .hasKey ;
46
+ import static org .hamcrest .Matchers .not ;
47
+ import static org .hamcrest .Matchers .notNullValue ;
48
+ import static org .junit .Assert .assertTrue ;
49
+
50
+ public class AsyncRequestStepTest {
51
+
52
+ private static final int TIMEOUT_SECONDS = 10 ;
53
+ private static final int MAX_RETRY_COUNT = 2 ;
54
+ private Packet packet = new Packet ();
55
+ private Schedule schedule = createStrictStub (Schedule .class );
56
+ private Engine engine = new Engine (schedule );
57
+ private Fiber fiber = engine .createFiber ();
58
+ private RequestParams requestParams = new RequestParams ("testcall" , "junit" , "testName" , "body" );
59
+ private CallFactoryStub callFactory = new CallFactoryStub ();
60
+ private CompletionCallbackStub completionCallback = new CompletionCallbackStub ();
61
+ private TestStep nextStep = new TestStep ();
62
+ private ClientPool helper = ClientPool .getInstance ();
63
+ private List <Memento > mementos = new ArrayList <>();
64
+
65
+ private final AsyncRequestStep <Integer > asyncRequestStep
66
+ = new AsyncRequestStep <>(nextStep , requestParams , callFactory , helper , TIMEOUT_SECONDS , MAX_RETRY_COUNT , null , null , null );
67
+
68
+ @ Before
69
+ public void setUp () throws Exception {
70
+ mementos .add (TestUtils .silenceOperatorLogger ());
71
+
72
+ fiber .start (asyncRequestStep , packet , completionCallback );
73
+ }
74
+
75
+ @ After
76
+ public void tearDown () throws Exception {
77
+ for (Memento memento : mementos ) memento .revert ();
78
+ }
79
+
80
+ @ Test
81
+ public void afterFiberStarted_requestSent () throws Exception {
82
+ assertTrue (callFactory .invokedWith (requestParams ));
83
+ }
84
+
85
+ @ Test
86
+ public void afterFiber_timeoutStepScheduled () throws Exception {
87
+ assertTrue (schedule .containsStepAt (TIMEOUT_SECONDS , TimeUnit .SECONDS ));
88
+ }
89
+
90
+ @ Test
91
+ public void afterSuccessfulCallback_nextStepAppliedWithValue () throws Exception {
92
+ callFactory .sendSuccessfulCallback (17 );
93
+
94
+ assertThat (nextStep .result , equalTo (17 ));
95
+ }
96
+
97
+ @ Test
98
+ public void afterSuccessfulCallback_packetDoesNotContainsResponse () throws Exception {
99
+ schedule .execute (() -> callFactory .sendSuccessfulCallback (17 ));
100
+
101
+ assertThat (packet .getComponents (), not (hasKey (RESPONSE_COMPONENT_NAME )));
102
+ }
103
+
104
+ @ Test
105
+ public void afterFailedCallback_packetContainsRetryStrategy () throws Exception {
106
+ schedule .execute (() ->
107
+ callFactory .sendFailedCallback (new ApiException ("test failure" ), HttpURLConnection .HTTP_UNAVAILABLE ));
108
+
109
+ assertThat (packet .getComponents ().get (RESPONSE_COMPONENT_NAME ).getSPI (RetryStrategy .class ), notNullValue ());
110
+ }
111
+
112
+ // todo tests
113
+ // after timeout, packet contains retry strategy
114
+ // after either failure, setting time to before the timeout causes new request
115
+ // after new request, success leads to invocation
116
+
117
+
118
+ static class TestStep extends ResponseStep <Integer > {
119
+ private Integer result ;
120
+
121
+ TestStep () {
122
+ super (null );
123
+ }
124
+
125
+ @ Override
126
+ public NextAction onSuccess (Packet packet , Integer result , int statusCode , Map <String , List <String >> responseHeaders ) {
127
+ this .result = result ;
128
+ return null ;
129
+ }
130
+ }
131
+
132
+ @ SuppressWarnings ("SameParameterValue" )
133
+ static class CallFactoryStub implements CallFactory <Integer > {
134
+
135
+ private RequestParams requestParams ;
136
+ private ApiCallback <Integer > callback ;
137
+
138
+ private boolean invokedWith (RequestParams requestParams ) {
139
+ return requestParams == this .requestParams ;
140
+ }
141
+
142
+ private void sendSuccessfulCallback (Integer callbackValue ) {
143
+ callback .onSuccess (callbackValue , HttpURLConnection .HTTP_OK , Collections .emptyMap ());
144
+ }
145
+
146
+ private void sendFailedCallback (ApiException exception , int statusCode ) {
147
+ callback .onFailure (exception , statusCode , Collections .emptyMap ());
148
+ }
149
+
150
+ @ Override
151
+ public CancelableCall generate (RequestParams requestParams , ApiClient client , String cont , ApiCallback <Integer > callback ) throws ApiException {
152
+ this .requestParams = requestParams ;
153
+ this .callback = callback ;
154
+
155
+ return new CancelableCallStub ();
156
+ }
157
+ }
158
+
159
+ static class CancelableCallStub implements CancelableCall {
160
+ private boolean canceled ;
161
+
162
+ @ Override
163
+ public void cancel () {
164
+ canceled = true ;
165
+ }
166
+ }
167
+
168
+ static class CompletionCallbackStub implements Fiber .CompletionCallback {
169
+ private Packet packet ;
170
+ private Throwable throwable ;
171
+
172
+ @ Override
173
+ public void onCompletion (Packet packet ) {
174
+ this .packet = packet ;
175
+ }
176
+
177
+ @ Override
178
+ public void onThrowable (Packet packet , Throwable throwable ) {
179
+ this .packet = packet ;
180
+ this .throwable = throwable ;
181
+ }
182
+ }
183
+
184
+ static class ScheduledItem implements Comparable <ScheduledItem > {
185
+ private long atTime ;
186
+ private Runnable runnable ;
187
+
188
+ ScheduledItem (long atTime , Runnable runnable ) {
189
+ this .atTime = atTime ;
190
+ this .runnable = runnable ;
191
+ }
192
+
193
+ @ Override
194
+ public int compareTo (@ Nonnull ScheduledItem o ) {
195
+ return Long .compare (atTime , o .atTime );
196
+ }
197
+ }
198
+
199
+ static abstract class Schedule implements ScheduledExecutorService {
200
+ /** current time in milliseconds. */
201
+ private long currentTime = 0 ;
202
+
203
+ private SortedSet <ScheduledItem > scheduledItems = new TreeSet <>();
204
+ private Queue <Runnable > queue = new ArrayDeque <>();
205
+ private Runnable current ;
206
+
207
+ @ Override
208
+ @ Nonnull public ScheduledFuture <?> schedule (@ Nonnull Runnable command , long delay , @ Nonnull TimeUnit unit ) {
209
+ scheduledItems .add (new ScheduledItem (unit .toMillis (delay ), command ));
210
+ return createStub (ScheduledFuture .class );
211
+ }
212
+
213
+ @ Override
214
+ public void execute (@ Nullable Runnable command ) {
215
+ queue .add (command );
216
+ if (current == null )
217
+ runNextRunnable ();
218
+ }
219
+
220
+ private void runNextRunnable () {
221
+ while (queue .peek () != null ) {
222
+ current = queue .poll ();
223
+ current .run ();
224
+ current = null ;
225
+ }
226
+
227
+ }
228
+
229
+ void setTime (long time , TimeUnit unit ) {
230
+ long newTime = unit .toMillis (time );
231
+ if (newTime < currentTime )
232
+ throw new IllegalStateException ("Attempt to move clock backwards from " + currentTime + " to " + newTime );
233
+
234
+ for (Iterator <ScheduledItem > it = scheduledItems .iterator (); it .hasNext ();) {
235
+ ScheduledItem item = it .next ();
236
+ if (item .atTime > newTime ) break ;
237
+ it .remove ();
238
+ execute (item .runnable );
239
+ }
240
+
241
+ currentTime = newTime ;
242
+ }
243
+
244
+ boolean containsStepAt (int timeoutSeconds , TimeUnit unit ) {
245
+ for (ScheduledItem scheduledItem : scheduledItems )
246
+ if (scheduledItem .atTime == unit .toMillis (timeoutSeconds )) return true ;
247
+ return false ;
248
+ }
249
+ }
250
+ }
0 commit comments