19
19
import static com .github .stefanbirkner .systemlambda .SystemLambda .tapSystemErrAndOutNormalized ;
20
20
import static org .assertj .core .api .Assertions .assertThat ;
21
21
import static org .assertj .core .api .Assertions .assertThatThrownBy ;
22
- import static org .assertj .core .api .Assertions .catchThrowableOfType ;
22
+ import static org .assertj .core .api .Assertions .catchThrowable ;
23
23
import static org .assertj .core .api .Assertions .entry ;
24
24
import static org .mockito .Mockito .doThrow ;
25
25
import static org .mockito .Mockito .inOrder ;
29
29
import static org .mockito .Mockito .verifyNoInteractions ;
30
30
import static org .mockito .Mockito .when ;
31
31
32
+ import java .util .Map ;
33
+
32
34
import org .apache .pulsar .client .admin .PulsarAdmin ;
33
35
import org .apache .pulsar .client .admin .PulsarAdminException ;
34
36
import org .apache .pulsar .client .api .PulsarClientException ;
37
+ import org .assertj .core .api .InstanceOfAssertFactories ;
35
38
import org .junit .jupiter .api .BeforeEach ;
36
39
import org .junit .jupiter .api .Nested ;
37
40
import org .junit .jupiter .api .Test ;
@@ -92,6 +95,15 @@ void setupSharedMocks() throws PulsarClientException, PulsarAdminException {
92
95
when (source1 .type ()).thenReturn (FunctionType .SOURCE );
93
96
}
94
97
98
+ @ SafeVarargs
99
+ @ SuppressWarnings ("varargs" )
100
+ private void assertThatPulsarFunctionExceptionFailedWith (Throwable thrown ,
101
+ Map .Entry <? extends PulsarFunctionOperations <?>, ? extends Exception >... expectedFunctionAndErrors ) {
102
+ assertThat (thrown ).isInstanceOf (PulsarFunctionException .class )
103
+ .extracting ("failures" , InstanceOfAssertFactories .MAP )
104
+ .containsExactly (expectedFunctionAndErrors );
105
+ }
106
+
95
107
@ Nested
96
108
class ProperCreateUpdateApiCalled {
97
109
@@ -228,22 +240,21 @@ class WithFailFast {
228
240
void firstProcessedFunctionFails () throws PulsarAdminException {
229
241
var ex = new PulsarAdminException ("BOOM" );
230
242
when (function1 .functionExists (pulsarAdmin )).thenThrow (ex );
231
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
232
- PulsarFunctionException .class );
233
- assertThat (thrown .getFailures ()).containsExactly (entry (function1 , ex ));
243
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
244
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (function1 , ex ));
234
245
verify (function1 , never ()).create (pulsarAdmin );
235
246
verify (function1 , never ()).update (pulsarAdmin );
236
247
verifyNoInteractions (sink1 , source1 );
237
248
assertThat (functionAdmin .getProcessedFunctions ()).isEmpty ();
238
249
}
239
250
251
+ // PulsarFunctionOperations<?>, Exception
240
252
@ Test
241
253
void middleProcessedFunctionFails () throws PulsarAdminException {
242
254
var ex = new PulsarAdminException ("BOOM" );
243
255
when (sink1 .functionExists (pulsarAdmin )).thenThrow (ex );
244
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
245
- PulsarFunctionException .class );
246
- assertThat (thrown .getFailures ()).containsExactly (entry (sink1 , ex ));
256
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
257
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (sink1 , ex ));
247
258
verify (function1 ).create (pulsarAdmin );
248
259
verify (sink1 , never ()).create (pulsarAdmin );
249
260
verify (sink1 , never ()).update (pulsarAdmin );
@@ -255,9 +266,8 @@ void middleProcessedFunctionFails() throws PulsarAdminException {
255
266
void lastProcessedFunctionFails () throws PulsarAdminException {
256
267
var ex = new PulsarAdminException ("BOOM" );
257
268
when (source1 .functionExists (pulsarAdmin )).thenThrow (ex );
258
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
259
- PulsarFunctionException .class );
260
- assertThat (thrown .getFailures ()).containsExactly (entry (source1 , ex ));
269
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
270
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (source1 , ex ));
261
271
verify (function1 ).create (pulsarAdmin );
262
272
verify (sink1 ).create (pulsarAdmin );
263
273
verify (source1 , never ()).create (pulsarAdmin );
@@ -282,9 +292,8 @@ void disableFailFastOnFunctionAdmin() {
282
292
void firstProcessedFunctionFails () throws PulsarAdminException {
283
293
var ex = new PulsarAdminException ("BOOM" );
284
294
when (function1 .functionExists (pulsarAdmin )).thenThrow (ex );
285
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
286
- PulsarFunctionException .class );
287
- assertThat (thrown .getFailures ()).containsExactly (entry (function1 , ex ));
295
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
296
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (function1 , ex ));
288
297
verify (function1 , never ()).create (pulsarAdmin );
289
298
verify (function1 , never ()).update (pulsarAdmin );
290
299
verify (sink1 ).create (pulsarAdmin );
@@ -296,9 +305,8 @@ void firstProcessedFunctionFails() throws PulsarAdminException {
296
305
void middleProcessedFunctionFails () throws PulsarAdminException {
297
306
var ex = new PulsarAdminException ("BOOM" );
298
307
when (sink1 .functionExists (pulsarAdmin )).thenThrow (ex );
299
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
300
- PulsarFunctionException .class );
301
- assertThat (thrown .getFailures ()).containsExactly (entry (sink1 , ex ));
308
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
309
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (sink1 , ex ));
302
310
verify (function1 ).create (pulsarAdmin );
303
311
verify (sink1 , never ()).create (pulsarAdmin );
304
312
verify (sink1 , never ()).update (pulsarAdmin );
@@ -310,9 +318,8 @@ void middleProcessedFunctionFails() throws PulsarAdminException {
310
318
void lastProcessedFunctionFails () throws PulsarAdminException {
311
319
var ex = new PulsarAdminException ("BOOM" );
312
320
when (source1 .functionExists (pulsarAdmin )).thenThrow (ex );
313
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
314
- PulsarFunctionException .class );
315
- assertThat (thrown .getFailures ()).containsExactly (entry (source1 , ex ));
321
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
322
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (source1 , ex ));
316
323
verify (function1 ).create (pulsarAdmin );
317
324
verify (sink1 ).create (pulsarAdmin );
318
325
verify (source1 , never ()).create (pulsarAdmin );
@@ -328,9 +335,8 @@ void allProcessedFunctionsFail() throws PulsarAdminException {
328
335
when (function1 .functionExists (pulsarAdmin )).thenThrow (ex1 );
329
336
when (sink1 .functionExists (pulsarAdmin )).thenThrow (ex2 );
330
337
when (source1 .functionExists (pulsarAdmin )).thenThrow (ex3 );
331
- var thrown = catchThrowableOfType (() -> functionAdmin .createOrUpdateUserDefinedFunctions (),
332
- PulsarFunctionException .class );
333
- assertThat (thrown .getFailures ()).containsExactly (entry (function1 , ex1 ), entry (sink1 , ex2 ),
338
+ var thrown = catchThrowable (() -> functionAdmin .createOrUpdateUserDefinedFunctions ());
339
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (function1 , ex1 ), entry (sink1 , ex2 ),
334
340
entry (source1 , ex3 ));
335
341
verify (function1 , never ()).create (pulsarAdmin );
336
342
verify (function1 , never ()).update (pulsarAdmin );
@@ -474,9 +480,8 @@ void createAdminClientFails() throws PulsarClientException {
474
480
void firstProcessedFunctionFails () {
475
481
var ex = new PulsarException ("BOOM" );
476
482
doThrow (ex ).when (source1 ).stop (pulsarAdmin );
477
- var thrown = catchThrowableOfType (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions (),
478
- PulsarFunctionException .class );
479
- assertThat (thrown .getFailures ()).containsExactly (entry (source1 , ex ));
483
+ var thrown = catchThrowable (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions ());
484
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (source1 , ex ));
480
485
verify (sink1 ).stop (pulsarAdmin );
481
486
verify (function1 ).stop (pulsarAdmin );
482
487
}
@@ -485,9 +490,8 @@ void firstProcessedFunctionFails() {
485
490
void middleProcessedFunctionFails () {
486
491
var ex = new PulsarException ("BOOM" );
487
492
doThrow (ex ).when (sink1 ).stop (pulsarAdmin );
488
- var thrown = catchThrowableOfType (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions (),
489
- PulsarFunctionException .class );
490
- assertThat (thrown .getFailures ()).containsExactly (entry (sink1 , ex ));
493
+ var thrown = catchThrowable (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions ());
494
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (sink1 , ex ));
491
495
verify (source1 ).stop (pulsarAdmin );
492
496
verify (function1 ).stop (pulsarAdmin );
493
497
}
@@ -496,9 +500,8 @@ void middleProcessedFunctionFails() {
496
500
void lastProcessedFunctionFails () {
497
501
var ex = new PulsarException ("BOOM" );
498
502
doThrow (ex ).when (function1 ).stop (pulsarAdmin );
499
- var thrown = catchThrowableOfType (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions (),
500
- PulsarFunctionException .class );
501
- assertThat (thrown .getFailures ()).containsExactly (entry (function1 , ex ));
503
+ var thrown = catchThrowable (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions ());
504
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (function1 , ex ));
502
505
verify (source1 ).stop (pulsarAdmin );
503
506
verify (sink1 ).stop (pulsarAdmin );
504
507
}
@@ -511,9 +514,8 @@ void allProcessedFunctionsFail() {
511
514
doThrow (ex1 ).when (source1 ).stop (pulsarAdmin );
512
515
doThrow (ex2 ).when (sink1 ).stop (pulsarAdmin );
513
516
doThrow (ex3 ).when (function1 ).stop (pulsarAdmin );
514
- var thrown = catchThrowableOfType (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions (),
515
- PulsarFunctionException .class );
516
- assertThat (thrown .getFailures ()).containsExactly (entry (source1 , ex1 ), entry (sink1 , ex2 ),
517
+ var thrown = catchThrowable (() -> functionAdmin .enforceStopPolicyOnUserDefinedFunctions ());
518
+ assertThatPulsarFunctionExceptionFailedWith (thrown , entry (source1 , ex1 ), entry (sink1 , ex2 ),
517
519
entry (function1 , ex3 ));
518
520
}
519
521
0 commit comments