2121import org .apache .kafka .streams .KeyValue ;
2222import org .apache .kafka .streams .KeyValueTimestamp ;
2323import org .apache .kafka .streams .StreamsBuilder ;
24+ import org .apache .kafka .streams .StreamsConfig ;
2425import org .apache .kafka .streams .TestInputTopic ;
2526import org .apache .kafka .streams .TestOutputTopic ;
2627import org .apache .kafka .streams .Topology ;
28+ import org .apache .kafka .streams .TopologyConfig ;
2729import org .apache .kafka .streams .TopologyTestDriver ;
2830import org .apache .kafka .streams .TopologyTestDriverWrapper ;
2931import org .apache .kafka .streams .TopologyWrapper ;
3537import org .apache .kafka .streams .processor .api .MockProcessorContext ;
3638import org .apache .kafka .streams .processor .api .Processor ;
3739import org .apache .kafka .streams .processor .api .Record ;
40+ import org .apache .kafka .streams .state .BuiltInDslStoreSuppliers ;
3841import org .apache .kafka .streams .state .Stores ;
3942import org .apache .kafka .streams .test .TestRecord ;
4043import org .apache .kafka .test .MockApiProcessor ;
@@ -70,9 +73,14 @@ public class KTableKTableLeftJoinTest {
7073 private final Consumed <Integer , String > consumed = Consumed .with (Serdes .Integer (), Serdes .String ());
7174 private final Properties props = StreamsTestUtils .getStreamsConfig (Serdes .Integer (), Serdes .String ());
7275
76+ private StreamsBuilder createStreamBuilderInMemory () {
77+ props .put (StreamsConfig .DSL_STORE_SUPPLIERS_CLASS_CONFIG , BuiltInDslStoreSuppliers .InMemoryDslStoreSuppliers .class .getName ());
78+ return new StreamsBuilder (new TopologyConfig (new StreamsConfig (props )));
79+ }
80+
7381 @ Test
7482 public void testJoin () {
75- final StreamsBuilder builder = new StreamsBuilder ();
83+ final StreamsBuilder builder = createStreamBuilderInMemory ();
7684
7785 final int [] expectedKeys = new int [] {0 , 1 , 2 , 3 };
7886
@@ -193,7 +201,7 @@ public void testJoin() {
193201
194202 @ Test
195203 public void testNotSendingOldValue () {
196- final StreamsBuilder builder = new StreamsBuilder ();
204+ final StreamsBuilder builder = createStreamBuilderInMemory ();
197205
198206 final int [] expectedKeys = new int [] {0 , 1 , 2 , 3 };
199207
@@ -309,7 +317,7 @@ public void testNotSendingOldValue() {
309317
310318 @ Test
311319 public void testSendingOldValue () {
312- final StreamsBuilder builder = new StreamsBuilder ();
320+ final StreamsBuilder builder = createStreamBuilderInMemory ();
313321
314322 final int [] expectedKeys = new int [] {0 , 1 , 2 , 3 };
315323
@@ -443,7 +451,7 @@ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
443451 final String tableSix = "tableSix" ;
444452 final String [] inputs = {agg , tableOne , tableTwo , tableThree , tableFour , tableFive , tableSix };
445453
446- final StreamsBuilder builder = new StreamsBuilder ();
454+ final StreamsBuilder builder = createStreamBuilderInMemory ();
447455 final Consumed <Long , String > consumed = Consumed .with (Serdes .Long (), Serdes .String ());
448456 final KTable <Long , String > aggTable = builder
449457 .table (agg , consumed , Materialized .as (Stores .inMemoryKeyValueStore ("agg-base-store" )))
@@ -453,30 +461,12 @@ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
453461 MockReducer .STRING_ADDER ,
454462 Materialized .as (Stores .inMemoryKeyValueStore ("agg-store" )));
455463
456- final KTable <Long , String > one = builder .table (
457- tableOne ,
458- consumed ,
459- Materialized .as (Stores .inMemoryKeyValueStore ("tableOne-base-store" )));
460- final KTable <Long , String > two = builder .table (
461- tableTwo ,
462- consumed ,
463- Materialized .as (Stores .inMemoryKeyValueStore ("tableTwo-base-store" )));
464- final KTable <Long , String > three = builder .table (
465- tableThree ,
466- consumed ,
467- Materialized .as (Stores .inMemoryKeyValueStore ("tableThree-base-store" )));
468- final KTable <Long , String > four = builder .table (
469- tableFour ,
470- consumed ,
471- Materialized .as (Stores .inMemoryKeyValueStore ("tableFour-base-store" )));
472- final KTable <Long , String > five = builder .table (
473- tableFive ,
474- consumed ,
475- Materialized .as (Stores .inMemoryKeyValueStore ("tableFive-base-store" )));
476- final KTable <Long , String > six = builder .table (
477- tableSix ,
478- consumed ,
479- Materialized .as (Stores .inMemoryKeyValueStore ("tableSix-base-store" )));
464+ final KTable <Long , String > one = builder .table (tableOne , consumed );
465+ final KTable <Long , String > two = builder .table (tableTwo , consumed );
466+ final KTable <Long , String > three = builder .table (tableThree , consumed );
467+ final KTable <Long , String > four = builder .table (tableFour , consumed );
468+ final KTable <Long , String > five = builder .table (tableFive , consumed );
469+ final KTable <Long , String > six = builder .table (tableSix , consumed );
480470
481471 final ValueMapper <String , String > mapper = value -> value .toUpperCase (Locale .ROOT );
482472
@@ -515,7 +505,7 @@ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
515505
516506 @ Test
517507 public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey () {
518- final StreamsBuilder builder = new StreamsBuilder ();
508+ final StreamsBuilder builder = createStreamBuilderInMemory ();
519509
520510 @ SuppressWarnings ("unchecked" )
521511 final Processor <String , Change <String >, String , Change <Object >> join = new KTableKTableLeftJoin <>(
0 commit comments