43
43
import org .apache .kafka .streams .KeyValue ;
44
44
import org .apache .kafka .streams .StreamsBuilder ;
45
45
import org .apache .kafka .streams .StreamsConfig ;
46
- import org .apache .kafka .streams .StreamsConfig .InternalConfig ;
47
46
import org .apache .kafka .streams .Topology ;
48
47
import org .apache .kafka .streams .integration .utils .EmbeddedKafkaCluster ;
49
48
import org .apache .kafka .streams .integration .utils .IntegrationTestUtils ;
77
76
import org .junit .jupiter .api .TestInfo ;
78
77
import org .junit .jupiter .api .Timeout ;
79
78
import org .junit .jupiter .params .ParameterizedTest ;
80
- import org .junit .jupiter .params .provider .CsvSource ;
81
79
import org .junit .jupiter .params .provider .ValueSource ;
82
80
import org .slf4j .Logger ;
83
81
import org .slf4j .LoggerFactory ;
@@ -161,8 +159,8 @@ public void createTopics(final TestInfo testInfo) throws InterruptedException {
161
159
CLUSTER .createTopic (inputStream , 2 , 1 );
162
160
}
163
161
164
- private Properties props (final boolean stateUpdaterEnabled ) {
165
- return props (mkObjectProperties (mkMap (mkEntry ( InternalConfig . STATE_UPDATER_ENABLED , stateUpdaterEnabled ) )));
162
+ private Properties props () {
163
+ return props (mkObjectProperties (mkMap ()));
166
164
}
167
165
168
166
private Properties props (final Properties extraProperties ) {
@@ -267,17 +265,12 @@ public void shouldRestoreNullRecord(final boolean useNewProtocol) throws Excepti
267
265
}
268
266
269
267
@ ParameterizedTest
270
- @ CsvSource ({
271
- "true,true" ,
272
- "true,false" ,
273
- "false,true" ,
274
- "false,false"
275
- })
276
- public void shouldRestoreStateFromSourceTopicForReadOnlyStore (final boolean stateUpdaterEnabled , final boolean useNewProtocol ) throws Exception {
268
+ @ ValueSource (booleans = {true , false })
269
+ public void shouldRestoreStateFromSourceTopicForReadOnlyStore (final boolean useNewProtocol ) throws Exception {
277
270
final AtomicInteger numReceived = new AtomicInteger (0 );
278
271
final Topology topology = new Topology ();
279
272
280
- final Properties props = props (stateUpdaterEnabled );
273
+ final Properties props = props ();
281
274
if (useNewProtocol ) {
282
275
props .put (StreamsConfig .GROUP_PROTOCOL_CONFIG , GroupProtocol .STREAMS .name ());
283
276
}
@@ -338,17 +331,12 @@ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stat
338
331
}
339
332
340
333
@ ParameterizedTest
341
- @ CsvSource ({
342
- "true,true" ,
343
- "true,false" ,
344
- "false,true" ,
345
- "false,false"
346
- })
347
- public void shouldRestoreStateFromSourceTopicForGlobalTable (final boolean stateUpdaterEnabled , final boolean useNewProtocol ) throws Exception {
334
+ @ ValueSource (booleans = {true , false })
335
+ public void shouldRestoreStateFromSourceTopicForGlobalTable (final boolean useNewProtocol ) throws Exception {
348
336
final AtomicInteger numReceived = new AtomicInteger (0 );
349
337
final StreamsBuilder builder = new StreamsBuilder ();
350
338
351
- final Properties props = props (stateUpdaterEnabled );
339
+ final Properties props = props ();
352
340
props .put (StreamsConfig .TOPOLOGY_OPTIMIZATION_CONFIG , StreamsConfig .OPTIMIZE );
353
341
if (useNewProtocol ) {
354
342
props .put (StreamsConfig .GROUP_PROTOCOL_CONFIG , GroupProtocol .STREAMS .name ());
@@ -413,20 +401,15 @@ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateU
413
401
}
414
402
415
403
@ ParameterizedTest
416
- @ CsvSource ({
417
- "true,true" ,
418
- "true,false" ,
419
- "false,true" ,
420
- "false,false"
421
- })
422
- public void shouldRestoreStateFromChangelogTopic (final boolean stateUpdaterEnabled , final boolean useNewProtocol ) throws Exception {
404
+ @ ValueSource (booleans = {true , false })
405
+ public void shouldRestoreStateFromChangelogTopic (final boolean useNewProtocol ) throws Exception {
423
406
final String changelog = appId + "-store-changelog" ;
424
407
CLUSTER .createTopic (changelog , 2 , 1 );
425
408
426
409
final AtomicInteger numReceived = new AtomicInteger (0 );
427
410
final StreamsBuilder builder = new StreamsBuilder ();
428
411
429
- final Properties props = props (stateUpdaterEnabled );
412
+ final Properties props = props ();
430
413
431
414
if (useNewProtocol ) {
432
415
props .put (StreamsConfig .GROUP_PROTOCOL_CONFIG , GroupProtocol .STREAMS .name ());
@@ -474,13 +457,8 @@ public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabl
474
457
}
475
458
476
459
@ ParameterizedTest
477
- @ CsvSource ({
478
- "true,true" ,
479
- "true,false" ,
480
- "false,true" ,
481
- "false,false"
482
- })
483
- public void shouldSuccessfullyStartWhenLoggingDisabled (final boolean stateUpdaterEnabled , final boolean useNewProtocol ) throws InterruptedException {
460
+ @ ValueSource (booleans = {true , false })
461
+ public void shouldSuccessfullyStartWhenLoggingDisabled (final boolean useNewProtocol ) throws InterruptedException {
484
462
final StreamsBuilder builder = new StreamsBuilder ();
485
463
486
464
final KStream <Integer , Integer > stream = builder .stream (inputStream );
@@ -490,7 +468,7 @@ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdate
490
468
Integer ::sum ,
491
469
Materialized .<Integer , Integer , KeyValueStore <Bytes , byte []>>as ("reduce-store" ).withLoggingDisabled ()
492
470
);
493
- final Properties props = props (stateUpdaterEnabled );
471
+ final Properties props = props ();
494
472
if (useNewProtocol ) {
495
473
props .put (StreamsConfig .GROUP_PROTOCOL_CONFIG , GroupProtocol .STREAMS .name ());
496
474
}
@@ -503,13 +481,8 @@ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdate
503
481
}
504
482
505
483
@ ParameterizedTest
506
- @ CsvSource ({
507
- "true,true" ,
508
- "true,false" ,
509
- "false,true" ,
510
- "false,false"
511
- })
512
- public void shouldProcessDataFromStoresWithLoggingDisabled (final boolean stateUpdaterEnabled , final boolean useNewProtocol ) throws InterruptedException {
484
+ @ ValueSource (booleans = {true , false })
485
+ public void shouldProcessDataFromStoresWithLoggingDisabled (final boolean useNewProtocol ) throws InterruptedException {
513
486
IntegrationTestUtils .produceKeyValuesSynchronously (inputStream ,
514
487
asList (KeyValue .pair (1 , 1 ),
515
488
KeyValue .pair (2 , 2 ),
@@ -537,7 +510,7 @@ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUp
537
510
538
511
final Topology topology = streamsBuilder .build ();
539
512
540
- final Properties props = props (stateUpdaterEnabled );
513
+ final Properties props = props ();
541
514
542
515
if (useNewProtocol ) {
543
516
props .put (StreamsConfig .GROUP_PROTOCOL_CONFIG , GroupProtocol .STREAMS .name ());
@@ -558,13 +531,8 @@ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUp
558
531
}
559
532
560
533
@ ParameterizedTest
561
- @ CsvSource ({
562
- "true,true" ,
563
- "true,false" ,
564
- "false,true" ,
565
- "false,false"
566
- })
567
- public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore (final boolean stateUpdaterEnabled , final boolean useNewProtocol ) throws Exception {
534
+ @ ValueSource (booleans = {true , false })
535
+ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore (final boolean useNewProtocol ) throws Exception {
568
536
final StreamsBuilder builder = new StreamsBuilder ();
569
537
builder .table (
570
538
inputStream ,
@@ -576,7 +544,7 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f
576
544
CLUSTER .setGroupStandbyReplicas (appId , 1 );
577
545
}
578
546
579
- final Properties props1 = props (stateUpdaterEnabled );
547
+ final Properties props1 = props ();
580
548
props1 .put (StreamsConfig .NUM_STANDBY_REPLICAS_CONFIG , 1 );
581
549
props1 .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory (appId + "-1" ).getPath ());
582
550
if (useNewProtocol ) {
@@ -585,7 +553,7 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f
585
553
purgeLocalStreamsState (props1 );
586
554
final KafkaStreams streams1 = new KafkaStreams (builder .build (), props1 );
587
555
588
- final Properties props2 = props (stateUpdaterEnabled );
556
+ final Properties props2 = props ();
589
557
props2 .put (StreamsConfig .NUM_STANDBY_REPLICAS_CONFIG , 1 );
590
558
props2 .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory (appId + "-2" ).getPath ());
591
559
if (useNewProtocol ) {
0 commit comments