2525package com .bakdata .kafka .integration ;
2626
2727
28- import static com .bakdata .kafka .integration .StreamsRunnerTest .configureApp ;
2928import static org .mockito .Mockito .verify ;
3029import static org .mockito .Mockito .verifyNoMoreInteractions ;
3130
5756import io .confluent .kafka .schemaregistry .client .rest .exceptions .RestClientException ;
5857import io .confluent .kafka .serializers .KafkaAvroSerializer ;
5958import java .io .IOException ;
59+ import java .nio .file .Path ;
6060import java .util .List ;
6161import java .util .regex .Pattern ;
6262import java .util .stream .Collectors ;
7373import org .assertj .core .api .junit .jupiter .SoftAssertionsExtension ;
7474import org .junit .jupiter .api .Test ;
7575import org .junit .jupiter .api .extension .ExtendWith ;
76+ import org .junit .jupiter .api .io .TempDir ;
7677import org .mockito .Mock ;
7778import org .mockito .junit .jupiter .MockitoExtension ;
7879import org .mockito .junit .jupiter .MockitoSettings ;
@@ -86,39 +87,13 @@ class StreamsCleanUpRunnerTest extends KafkaTest {
8687 private SoftAssertions softly ;
8788 @ Mock
8889 private TopicHook topicHook ;
90+ @ TempDir
91+ private Path stateDir ;
8992
9093 static <K , V > KeyValue <K , V > toKeyValue (final ConsumerRecord <K , V > consumerRecord ) {
9194 return new KeyValue <>(consumerRecord .key (), consumerRecord .value ());
9295 }
9396
94- private static ConfiguredStreamsApp <StreamsApp > createWordCountPatternApplication () {
95- return configureApp (new WordCountPattern (), StreamsTopicConfig .builder ()
96- .inputPattern (Pattern .compile (".*_topic" ))
97- .outputTopic ("word_output" )
98- .build ());
99- }
100-
101- private static ConfiguredStreamsApp <StreamsApp > createWordCountApplication () {
102- return configureApp (new WordCount (), StreamsTopicConfig .builder ()
103- .inputTopics (List .of ("word_input" ))
104- .outputTopic ("word_output" )
105- .build ());
106- }
107-
108- private static ConfiguredStreamsApp <StreamsApp > createMirrorValueApplication () {
109- return configureApp (new MirrorValueWithAvro (), StreamsTopicConfig .builder ()
110- .inputTopics (List .of ("input" ))
111- .outputTopic ("output" )
112- .build ());
113- }
114-
115- private static ConfiguredStreamsApp <StreamsApp > createMirrorKeyApplication () {
116- return configureApp (new MirrorKeyWithAvro (), StreamsTopicConfig .builder ()
117- .inputTopics (List .of ("input" ))
118- .outputTopic ("output" )
119- .build ());
120- }
121-
12297 private static void reset (final ExecutableApp <?, StreamsCleanUpRunner , ?> app ) {
12398 try (final StreamsCleanUpRunner cleanUpRunner = app .createCleanUpRunner ()) {
12499 cleanUpRunner .reset ();
@@ -131,9 +106,13 @@ private static void clean(final ExecutableApp<?, ? extends CleanUpRunner, ?> app
131106 }
132107 }
133108
109+ ConfiguredStreamsApp <StreamsApp > configureApp (final StreamsApp app , final StreamsTopicConfig topics ) {
110+ return StreamsRunnerTest .configureApp (app , topics , this .stateDir );
111+ }
112+
134113 @ Test
135114 void shouldDeleteTopic () {
136- try (final ConfiguredStreamsApp <StreamsApp > app = createWordCountApplication ();
115+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createWordCountApplication ();
137116 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (
138117 this .createEndpointWithoutSchemaRegistry ())) {
139118 final KafkaTestClient testClient = this .newTestClient ();
@@ -171,7 +150,7 @@ void shouldDeleteTopic() {
171150
172151 @ Test
173152 void shouldDeleteConsumerGroup () {
174- try (final ConfiguredStreamsApp <StreamsApp > app = createWordCountApplication ();
153+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createWordCountApplication ();
175154 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (
176155 this .createEndpointWithoutSchemaRegistry ())) {
177156 final KafkaTestClient testClient = this .newTestClient ();
@@ -216,7 +195,7 @@ void shouldDeleteConsumerGroup() {
216195
217196 @ Test
218197 void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist () {
219- try (final ConfiguredStreamsApp <StreamsApp > app = createWordCountApplication ();
198+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createWordCountApplication ();
220199 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (
221200 this .createEndpointWithoutSchemaRegistry ())) {
222201 final KafkaTestClient testClient = this .newTestClient ();
@@ -353,7 +332,7 @@ void shouldDeleteIntermediateTopics() {
353332
354333 @ Test
355334 void shouldDeleteState () {
356- try (final ConfiguredStreamsApp <StreamsApp > app = createWordCountApplication ();
335+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createWordCountApplication ();
357336 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (
358337 this .createEndpointWithoutSchemaRegistry ())) {
359338 final KafkaTestClient testClient = this .newTestClient ();
@@ -391,7 +370,7 @@ void shouldDeleteState() {
391370
392371 @ Test
393372 void shouldReprocessAlreadySeenRecords () {
394- try (final ConfiguredStreamsApp <StreamsApp > app = createWordCountApplication ();
373+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createWordCountApplication ();
395374 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (
396375 this .createEndpointWithoutSchemaRegistry ())) {
397376 final KafkaTestClient testClient = this .newTestClient ();
@@ -422,7 +401,7 @@ void shouldReprocessAlreadySeenRecords() {
422401 @ Test
423402 void shouldDeleteValueSchema ()
424403 throws IOException , RestClientException {
425- try (final ConfiguredStreamsApp <StreamsApp > app = createMirrorValueApplication ();
404+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createMirrorValueApplication ();
426405 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (this .createEndpoint ());
427406 final SchemaRegistryClient client = this .getSchemaRegistryClient ()) {
428407 final TestRecord testRecord = TestRecord .newBuilder ().setContent ("key 1" ).build ();
@@ -454,7 +433,7 @@ void shouldDeleteValueSchema()
454433 @ Test
455434 void shouldDeleteKeySchema ()
456435 throws IOException , RestClientException {
457- try (final ConfiguredStreamsApp <StreamsApp > app = createMirrorKeyApplication ();
436+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createMirrorKeyApplication ();
458437 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (this .createEndpoint ());
459438 final SchemaRegistryClient client = this .getSchemaRegistryClient ()) {
460439 final TestRecord testRecord = TestRecord .newBuilder ().setContent ("key 1" ).build ();
@@ -585,15 +564,15 @@ void shouldCallCleanUpHookForAllTopics() {
585564
586565 @ Test
587566 void shouldNotThrowExceptionOnMissingInputTopic () {
588- try (final ConfiguredStreamsApp <StreamsApp > app = createMirrorKeyApplication ();
567+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createMirrorKeyApplication ();
589568 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (this .createEndpoint ())) {
590569 this .softly .assertThatCode (() -> clean (executableApp )).doesNotThrowAnyException ();
591570 }
592571 }
593572
594573 @ Test
595574 void shouldThrowExceptionOnResetterError () {
596- try (final ConfiguredStreamsApp <StreamsApp > app = createMirrorKeyApplication ();
575+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createMirrorKeyApplication ();
597576 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (this .createEndpoint ());
598577 final StreamsRunner runner = executableApp .createRunner ()) {
599578 final KafkaTestClient testClient = this .newTestClient ();
@@ -610,7 +589,7 @@ void shouldThrowExceptionOnResetterError() {
610589
611590 @ Test
612591 void shouldReprocessAlreadySeenRecordsWithPattern () {
613- try (final ConfiguredStreamsApp <StreamsApp > app = createWordCountPatternApplication ();
592+ try (final ConfiguredStreamsApp <StreamsApp > app = this . createWordCountPatternApplication ();
614593 final ExecutableStreamsApp <StreamsApp > executableApp = app .withEndpoint (
615594 this .createEndpointWithoutSchemaRegistry ())) {
616595 final KafkaTestClient testClient = this .newTestClient ();
@@ -643,6 +622,34 @@ void shouldReprocessAlreadySeenRecordsWithPattern() {
643622 }
644623 }
645624
625+ private ConfiguredStreamsApp <StreamsApp > createWordCountPatternApplication () {
626+ return this .configureApp (new WordCountPattern (), StreamsTopicConfig .builder ()
627+ .inputPattern (Pattern .compile (".*_topic" ))
628+ .outputTopic ("word_output" )
629+ .build ());
630+ }
631+
632+ private ConfiguredStreamsApp <StreamsApp > createWordCountApplication () {
633+ return this .configureApp (new WordCount (), StreamsTopicConfig .builder ()
634+ .inputTopics (List .of ("word_input" ))
635+ .outputTopic ("word_output" )
636+ .build ());
637+ }
638+
639+ private ConfiguredStreamsApp <StreamsApp > createMirrorValueApplication () {
640+ return this .configureApp (new MirrorValueWithAvro (), StreamsTopicConfig .builder ()
641+ .inputTopics (List .of ("input" ))
642+ .outputTopic ("output" )
643+ .build ());
644+ }
645+
646+ private ConfiguredStreamsApp <StreamsApp > createMirrorKeyApplication () {
647+ return this .configureApp (new MirrorKeyWithAvro (), StreamsTopicConfig .builder ()
648+ .inputTopics (List .of ("input" ))
649+ .outputTopic ("output" )
650+ .build ());
651+ }
652+
646653 private void run (final ExecutableStreamsApp <?> app ) {
647654 try (final StreamsRunner runner = app .createRunner ()) {
648655 StreamsRunnerTest .run (runner );
@@ -653,15 +660,15 @@ private void run(final ExecutableStreamsApp<?> app) {
653660
654661 private ConfiguredStreamsApp <StreamsApp > createComplexApplication () {
655662 this .newTestClient ().createTopic (ComplexTopologyApplication .THROUGH_TOPIC );
656- return configureApp (new ComplexTopologyApplication (), StreamsTopicConfig .builder ()
663+ return this . configureApp (new ComplexTopologyApplication (), StreamsTopicConfig .builder ()
657664 .inputTopics (List .of ("input" ))
658665 .outputTopic ("output" )
659666 .build ());
660667 }
661668
662669 private ConfiguredStreamsApp <StreamsApp > createComplexCleanUpHookApplication () {
663670 this .newTestClient ().createTopic (ComplexTopologyApplication .THROUGH_TOPIC );
664- return configureApp (new ComplexTopologyApplication () {
671+ return this . configureApp (new ComplexTopologyApplication () {
665672 @ Override
666673 public StreamsCleanUpConfiguration setupCleanUp (
667674 final EffectiveAppConfiguration <StreamsTopicConfig > configuration ) {
0 commit comments