3838class KafkaConnectServiceTests extends AbstractIntegrationTest {
3939 private final String connectName = "kafka-connect" ;
4040 private final String connectorName = UUID .randomUUID ().toString ();
41+ private final String topicName = "test-topic" ;
4142 private final Map <String , Object > config = Map .of (
4243 "name" , connectorName ,
4344 "connector.class" , "org.apache.kafka.connect.file.FileStreamSinkConnector" ,
4445 "tasks.max" , "1" ,
45- "topics" , "output-topic" ,
46+ "topics" , topicName ,
4647 "file" , "/tmp/test" ,
4748 "test.password" , "******"
4849 );
49- private final String topicName = "test-topic" ;
5050
5151 @ Autowired
5252 private WebTestClient webTestClient ;
@@ -84,20 +84,6 @@ void setUp() {
8484 "test.password" , "test-credentials" )))
8585 .exchange ()
8686 .expectStatus ().isOk ();
87-
88- webTestClient .post ()
89- .uri ("/api/clusters/{clusterName}/connects/{connectName}/connectors" , LOCAL , connectName )
90- .bodyValue (new NewConnectorDTO ()
91- .name (connectorName + "source" )
92- .config (Map .of (
93- "connector.class" , "org.apache.kafka.connect.file.FileStreamSourceConnector" ,
94- "tasks.max" , "1" ,
95- "topic" , topicName ,
96- "file" , "/tmp/test" ,
97- "test.password" , "test-credentials" )))
98- .exchange ()
99- .expectStatus ().isOk ();
100-
10187 // Force cache refresh
10288 statisticsService .updateCache (kafkaCluster ).block ();
10389 }
@@ -109,11 +95,6 @@ void tearDown() {
10995 connectName , connectorName )
11096 .exchange ()
11197 .expectStatus ().isOk ();
112- webTestClient .delete ()
113- .uri ("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}" , LOCAL ,
114- connectName , connectorName + "source" )
115- .exchange ()
116- .expectStatus ().isOk ();
11798 }
11899
119100 @ Test
@@ -241,7 +222,7 @@ void shouldUpdateConfig() {
241222 .bodyValue (Map .of (
242223 "connector.class" , "org.apache.kafka.connect.file.FileStreamSinkConnector" ,
243224 "tasks.max" , "1" ,
244- "topics" , "another-topic" ,
225+ "topics" , topicName ,
245226 "file" , "/tmp/new"
246227 )
247228 )
@@ -258,7 +239,7 @@ void shouldUpdateConfig() {
258239 .isEqualTo (Map .of (
259240 "connector.class" , "org.apache.kafka.connect.file.FileStreamSinkConnector" ,
260241 "tasks.max" , "1" ,
261- "topics" , "another-topic" ,
242+ "topics" , topicName ,
262243 "file" , "/tmp/new" ,
263244 "name" , connectorName
264245 ));
@@ -348,7 +329,7 @@ void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
348329 .isEqualTo (Map .of (
349330 "connector.class" , "org.apache.kafka.connect.file.FileStreamSinkConnector" ,
350331 "tasks.max" , "1" ,
351- "topics" , "output-topic" ,
332+ "topics" , topicName ,
352333 "file" , "/tmp/test" ,
353334 "name" , connectorName ,
354335 "test.password" , "******"
@@ -377,7 +358,7 @@ void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
377358 .isEqualTo (Map .of (
378359 "connector.class" , "org.apache.kafka.connect.file.FileStreamSinkConnector" ,
379360 "tasks.max" , "1" ,
380- "topics" , "output-topic" ,
361+ "topics" , topicName ,
381362 "file" , "/tmp/test" ,
382363 "test.password" , "******" ,
383364 "name" , connectorName
0 commit comments