1515 */
1616package com .mongodb .kafka .connect ;
1717
18+ import static com .mongodb .kafka .connect .sink .MongoSinkConfig .TOPICS_REGEX_CONFIG ;
19+ import static com .mongodb .kafka .connect .sink .MongoSinkConfig .TOPIC_OVERRIDE_CONFIG ;
20+ import static com .mongodb .kafka .connect .sink .MongoSinkTopicConfig .COLLECTION_CONFIG ;
1821import static java .lang .String .format ;
1922import static java .util .Arrays .asList ;
2023import static org .junit .jupiter .api .Assertions .assertEquals ;
2326import java .util .Properties ;
2427import java .util .stream .Collectors ;
2528import java .util .stream .IntStream ;
26- import java .util .stream .Stream ;
2729
2830import io .confluent .kafka .serializers .KafkaAvroSerializerConfig ;
2931
3032import org .apache .kafka .clients .producer .KafkaProducer ;
3133import org .apache .kafka .clients .producer .ProducerConfig ;
3234import org .apache .kafka .clients .producer .ProducerRecord ;
33- import org .apache .kafka .common .serialization .IntegerSerializer ;
34- import org .apache .kafka .common .serialization .StringSerializer ;
3535import org .junit .jupiter .api .DisplayName ;
3636import org .junit .jupiter .api .Test ;
3737
4141class MongoSinkConnectorTest extends MongoKafkaTestCase {
4242
4343 @ Test
44- @ DisplayName ("Ensure simple producer sends data" )
45- void testASimpleProducerSmokeTest () {
44+ @ DisplayName ("Ensure sink connect saves data to MongoDB " )
45+ void testSinkSavesAvroDataToMongoDB () {
4646 String topicName = getTopicName ();
4747 KAFKA .createTopic (topicName );
48+ addSinkConnector (topicName );
4849
49- Properties props = new Properties ();
50- props .put (ProducerConfig .TRANSACTIONAL_ID_CONFIG , topicName );
51- props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA .bootstrapServers ());
52- props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , IntegerSerializer .class );
53- props .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class );
54- props .put (ProducerConfig .ACKS_CONFIG , "all" );
55- props .put (ProducerConfig .ENABLE_IDEMPOTENCE_CONFIG , "true" );
56-
57- try (KafkaProducer <Integer , String > producer = new KafkaProducer <>(props )) {
58- producer .initTransactions ();
59- producer .beginTransaction ();
60-
61- IntStream .range (0 , 10 ).forEach (i -> {
62- producer .send (new ProducerRecord <>(topicName , i , "Hello, World!" ));
63- });
64- producer .commitTransaction ();
65-
66- assertProduced (10 , topicName );
67- }
50+ assertProducesMessages (topicName , getCollectionName ());
6851 }
6952
7053 @ Test
71- @ DisplayName ("Ensure sink connect saves data to MongoDB" )
72- void testSinkSavesAvroDataToMongoDB () {
73- Stream <TweetMsg > tweets = IntStream .range (0 , 100 ).mapToObj (i ->
74- TweetMsg .newBuilder ().setId$1 (i )
75- .setText (format ("test tweet %s end2end testing apache kafka <-> mongodb sink connector is fun!" , i ))
76- .setHashtags (asList (format ("t%s" , i ), "kafka" , "mongodb" , "testing" ))
77- .build ()
78- );
54+ @ DisplayName ("Ensure sink connect saves data to MongoDB when using regex" )
55+ void testSinkSavesAvroDataToMongoDBWhenUsingRegex () {
56+ String topicName1 = "topic-regex-101" ;
57+ String topicName2 = "topic-regex-202" ;
7958
80- String topicName = getTopicName ();
81- KAFKA .createTopic (topicName );
82- addSinkConnector (topicName );
59+ String collectionName1 = "regexColl1" ;
60+ String collectionName2 = "regexColl2" ;
8361
84- Properties producerProps = new Properties ();
85- producerProps .put (ProducerConfig .TRANSACTIONAL_ID_CONFIG , topicName );
86- producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA .bootstrapServers ());
87- producerProps .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , "io.confluent.kafka.serializers.KafkaAvroSerializer" );
88- producerProps .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , "io.confluent.kafka.serializers.KafkaAvroSerializer" );
89- producerProps .put (KafkaAvroSerializerConfig .SCHEMA_REGISTRY_URL_CONFIG , KAFKA .schemaRegistryUrl ());
62+ KAFKA .createTopic (topicName1 );
63+ KAFKA .createTopic (topicName2 );
9064
91- try ( KafkaProducer < String , TweetMsg > producer = new KafkaProducer <>( producerProps )) {
92- producer . initTransactions ( );
93- producer . beginTransaction ( );
94- tweets . forEach ( tweet -> producer . send ( new ProducerRecord <>( topicName , tweet )) );
95- producer . commitTransaction ( );
65+ Properties sinkProperties = new Properties ();
66+ sinkProperties . put ( TOPICS_REGEX_CONFIG , "topic \\ -regex \\ -(.*)" );
67+ sinkProperties . put ( format ( TOPIC_OVERRIDE_CONFIG , topicName1 , COLLECTION_CONFIG ), collectionName1 );
68+ sinkProperties . put ( format ( TOPIC_OVERRIDE_CONFIG , topicName2 , COLLECTION_CONFIG ), collectionName2 );
69+ addSinkConnector ( sinkProperties );
9670
97- assertProduced (100 , topicName );
98- assertEquals (100 , getCollection ().countDocuments ());
99- }
71+ assertProducesMessages (topicName1 , collectionName1 );
72+ assertProducesMessages (topicName2 , collectionName2 );
10073 }
10174
10275 @ Test
10376 @ DisplayName ("Ensure sink can survive a restart" )
10477 void testSinkSurvivesARestart () {
78+ String topicName = getTopicName ();
79+ KAFKA .createTopic (topicName );
80+ addSinkConnector (topicName );
81+ assertProducesMessages (topicName , getCollectionName (), true );
82+ }
83+
84+ private void assertProducesMessages (final String topicName , final String collectionName ) {
85+ assertProducesMessages (topicName , collectionName , false );
86+ }
87+
88+ private void assertProducesMessages (final String topicName , final String collectionName , final boolean restartConnector ) {
89+
10590 List <TweetMsg > tweets = IntStream .range (0 , 100 ).mapToObj (i ->
10691 TweetMsg .newBuilder ().setId$1 (i )
10792 .setText (format ("test tweet %s end2end testing apache kafka <-> mongodb sink connector is fun!" , i ))
10893 .setHashtags (asList (format ("t%s" , i ), "kafka" , "mongodb" , "testing" ))
10994 .build ()
11095 ).collect (Collectors .toList ());
11196
112- String topicName = getTopicName ();
113- KAFKA .createTopic (topicName );
114- addSinkConnector (topicName );
115-
11697 Properties producerProps = new Properties ();
11798 producerProps .put (ProducerConfig .TRANSACTIONAL_ID_CONFIG , topicName );
11899 producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA .bootstrapServers ());
@@ -127,16 +108,18 @@ void testSinkSurvivesARestart() {
127108 producer .commitTransaction ();
128109
129110 assertProduced (50 , topicName );
130- assertEquals (50 , getCollection ().countDocuments ());
111+ assertEquals (50 , getCollection (collectionName ).countDocuments (), collectionName );
131112
132- restartSinkConnector (topicName );
113+ if (restartConnector ) {
114+ restartSinkConnector (topicName );
115+ }
133116
134117 producer .beginTransaction ();
135118 tweets .stream ().filter (t -> t .getId$1 () >= 50 ).forEach (tweet -> producer .send (new ProducerRecord <>(topicName , tweet )));
136119 producer .commitTransaction ();
137120
138121 assertProduced (100 , topicName );
139- assertEquals (100 , getCollection ().countDocuments ());
122+ assertEquals (100 , getCollection (collectionName ).countDocuments ());
140123 }
141124 }
142125}
0 commit comments