2424import static org .junit .jupiter .api .Assertions .assertTrue ;
2525import static org .junit .jupiter .api .Assumptions .assumeTrue ;
2626
27+ import java .util .ArrayList ;
2728import java .util .HashMap ;
2829import java .util .List ;
2930import java .util .Map ;
31+ import java .util .Optional ;
3032import java .util .stream .Collectors ;
3133
3234import org .apache .kafka .common .config .Config ;
@@ -71,6 +73,7 @@ static void done() {
7173 @ AfterEach
7274 void tearDown () {
7375 dropUserAndRoles ();
76+ dropDatabases ();
7477 }
7578
7679 @ Test
@@ -82,9 +85,12 @@ void testSinkConfigValidation() {
8285 @ Test
8386 @ DisplayName ("Ensure sink configuration validation handles invalid connections" )
8487 void testSinkConfigValidationInvalidConnection () {
85- assertInvalidSink (createSinkProperties ("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000" ));
8688 assertInvalidSink (
87- createSinkRegexProperties ("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000" ));
89+ createSinkProperties ("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000" ),
90+ MongoSinkConfig .CONNECTION_URI_CONFIG );
91+ assertInvalidSink (
92+ createSinkRegexProperties ("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000" ),
93+ MongoSinkConfig .CONNECTION_URI_CONFIG );
8894 }
8995
9096 @ Test
@@ -94,30 +100,40 @@ void testSinkConfigValidationInvalidUser() {
94100 createSinkProperties (
95101 format (
96102 "mongodb://fakeUser:fakePass@%s/" ,
97- String .join ("," , getConnectionString ().getHosts ()))));
103+ String .join ("," , getConnectionString ().getHosts ()))),
104+ MongoSinkConfig .CONNECTION_URI_CONFIG );
98105 assertInvalidSink (
99106 createSinkRegexProperties (
100107 format (
101108 "mongodb://fakeUser:fakePass@%s/" ,
102- String .join ("," , getConnectionString ().getHosts ()))));
109+ String .join ("," , getConnectionString ().getHosts ()))),
110+ MongoSinkConfig .CONNECTION_URI_CONFIG );
103111 }
104112
105113 @ Test
106114 @ DisplayName ("Ensure sink validation fails with read user" )
107115 void testSinkConfigValidationReadUser () {
108116 assumeTrue (isAuthEnabled ());
109117 createUser ("read" );
110- assertInvalidSink (createSinkProperties (getConnectionStringForCustomUser ()));
111- assertInvalidSink (createSinkRegexProperties (getConnectionStringForCustomUser ()));
118+ assertInvalidSink (
119+ createSinkProperties (getConnectionStringForCustomUser ()),
120+ MongoSinkConfig .CONNECTION_URI_CONFIG );
121+ assertInvalidSink (
122+ createSinkRegexProperties (getConnectionStringForCustomUser ()),
123+ MongoSinkConfig .CONNECTION_URI_CONFIG );
112124 }
113125
114126 @ Test
115127 @ DisplayName ("Ensure sink validation passes with readWrite user" )
116128 void testSinkConfigValidationReadWriteUser () {
117129 assumeTrue (isAuthEnabled ());
118130 createUser ("readWrite" );
119- assertValidSink (createSinkProperties (getConnectionStringForCustomUser ()));
120- assertValidSink (createSinkRegexProperties (getConnectionStringForCustomUser ()));
131+ assertValidSink (
132+ createSinkProperties (getConnectionStringForCustomUser ()),
133+ MongoSinkConfig .CONNECTION_URI_CONFIG );
134+ assertValidSink (
135+ createSinkRegexProperties (getConnectionStringForCustomUser ()),
136+ MongoSinkConfig .CONNECTION_URI_CONFIG );
121137 }
122138
123139 @ Test
@@ -129,19 +145,19 @@ void testSinkConfigValidationReadWriteOnSpecificDatabase() {
129145 Map <String , String > properties = createSinkProperties (getConnectionStringForCustomUser ());
130146
131147 // Different database than has permissions for
132- assertInvalidSink (properties );
148+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
133149
134150 properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , CUSTOM_DATABASE );
135- assertValidSink (properties );
151+ assertValidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
136152
137153 // Regex tests
138154 properties = createSinkRegexProperties (getConnectionStringForCustomUser ());
139155
140156 // Different database than has permissions for
141- assertInvalidSink (properties );
157+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
142158
143159 properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , CUSTOM_DATABASE );
144- assertValidSink (properties );
160+ assertValidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
145161 }
146162
147163 @ Test
@@ -158,29 +174,29 @@ void testSinkConfigValidationCollectionBasedPrivileges() {
158174 Map <String , String > properties = createSinkProperties (getConnectionStringForCustomUser ());
159175
160176 // Different database than has permissions for
161- assertInvalidSink (properties );
177+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
162178
163179 // Different collection than has permissions for
164180 properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , CUSTOM_DATABASE );
165- assertInvalidSink (properties );
181+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
166182
167- // Different collection than has permissions for
183+ // Same collection than has permissions for
168184 properties .put (MongoSinkTopicConfig .COLLECTION_CONFIG , CUSTOM_COLLECTION );
169- assertValidSink (properties );
185+ assertValidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
170186
171187 // Regex tests
172188 properties = createSinkRegexProperties (getConnectionStringForCustomUser ());
173189
174190 // Different database than has permissions for
175- assertInvalidSink (properties );
191+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
176192
177193 // Different collection than has permissions for
178194 properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , CUSTOM_DATABASE );
179- assertInvalidSink (properties );
195+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
180196
181- // Different collection than has permissions for
197+ // Same collection than has permissions for
182198 properties .put (MongoSinkTopicConfig .COLLECTION_CONFIG , CUSTOM_COLLECTION );
183- assertValidSink (properties );
199+ assertValidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
184200 }
185201
186202 @ Test
@@ -199,29 +215,29 @@ void testSinkConfigValidationCollectionBasedDifferentAuthPrivileges() {
199215 createSinkProperties (getConnectionStringForCustomUser (CUSTOM_DATABASE ));
200216
201217 // Different database than has permissions for
202- assertInvalidSink (properties );
218+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
203219
204220 // Different collection than has permissions for
205221 properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , CUSTOM_DATABASE );
206- assertInvalidSink (properties );
222+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
207223
208224 // Same collection than has permissions for
209225 properties .put (MongoSinkTopicConfig .COLLECTION_CONFIG , CUSTOM_COLLECTION );
210- assertValidSink (properties );
226+ assertValidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
211227
212228 // Regex tests
213229 properties = createSinkRegexProperties (getConnectionStringForCustomUser (CUSTOM_DATABASE ));
214230
215231 // Different database than has permissions for
216- assertInvalidSink (properties );
232+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
217233
218234 // Different collection than has permissions for
219235 properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , CUSTOM_DATABASE );
220- assertInvalidSink (properties );
236+ assertInvalidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
221237
222238 // Same collection than has permissions for
223239 properties .put (MongoSinkTopicConfig .COLLECTION_CONFIG , CUSTOM_COLLECTION );
224- assertValidSink (properties );
240+ assertValidSink (properties , MongoSinkConfig . CONNECTION_URI_CONFIG );
225241 }
226242
227243 @ Test
@@ -296,32 +312,76 @@ void testSourceConfigValidationCollectionBasedPrivileges() {
296312
297313 // Helper methods
298314 private void assertInvalidSource (final Map <String , String > properties ) {
299- Config config = new MongoSourceConnector ().validate (properties );
300- List <String > errorMessages =
301- getConfigValue (config , MongoSourceConfig .CONNECTION_URI_CONFIG ).errorMessages ();
302- assertFalse (errorMessages .isEmpty (), "ErrorMessages shouldn't be empty" );
315+ assertFalse (getSourceErrors (properties ).isEmpty (), "Source had valid configuration" );
303316 }
304317
305318 private void assertValidSource (final Map <String , String > properties ) {
306319 assumeTrue (isReplicaSetOrSharded ());
320+ List <ConfigValue > sourceErrors = getSourceErrors (properties );
321+ assertTrue (
322+ sourceErrors .isEmpty (),
323+ "Sink had invalid configuration: "
324+ + sourceErrors .stream ()
325+ .map (cv -> format ("'%s': %s" , cv .name (), cv .errorMessages ()))
326+ .collect (Collectors .toList ()));
327+ }
328+
329+ private List <ConfigValue > getSourceErrors (final Map <String , String > properties ) {
307330 Config config = new MongoSourceConnector ().validate (properties );
308- List <String > errorMessages =
309- getConfigValue (config , MongoSourceConfig .CONNECTION_URI_CONFIG ).errorMessages ();
310- assertTrue (errorMessages .isEmpty (), format ("ErrorMessages not empty: %s" , errorMessages ));
331+ return config .configValues ().stream ()
332+ .filter (cv -> !cv .errorMessages ().isEmpty ())
333+ .collect (Collectors .toList ());
334+ }
335+
336+ private void assertInvalidSink (final Map <String , String > properties , final String configName ) {
337+ Optional <ConfigValue > configValue =
338+ getSinkErrors (properties ).stream ().filter (cv -> cv .name ().equals (configName )).findFirst ();
339+ assertTrue (configValue .isPresent ());
340+ assertFalse (
341+ configValue .get ().errorMessages ().isEmpty (), format ("No error for '%s'" , configName ));
311342 }
312343
313344 private void assertInvalidSink (final Map <String , String > properties ) {
314- Config config = new MongoSinkConnector ().validate (properties );
315- List <String > errorMessages =
316- getConfigValue (config , MongoSourceConfig .CONNECTION_URI_CONFIG ).errorMessages ();
317- assertFalse (errorMessages .isEmpty (), "ErrorMessages shouldn't be empty" );
345+ assertFalse (getSinkErrors (properties ).isEmpty (), "Sink had valid configuration" );
346+ }
347+
348+ private void assertValidSink (final Map <String , String > properties , final String configName ) {
349+ Optional <ConfigValue > configValue =
350+ getSinkErrors (properties ).stream ().filter (cv -> cv .name ().equals (configName )).findFirst ();
351+ configValue .ifPresent (
352+ (cv ) ->
353+ assertTrue (
354+ cv .errorMessages ().isEmpty (),
355+ format ("%s invalid: %s" , cv .name (), cv .errorMessages ())));
318356 }
319357
320358 private void assertValidSink (final Map <String , String > properties ) {
359+ List <ConfigValue > sinkErrors = getSinkErrors (properties );
360+ assertTrue (
361+ sinkErrors .isEmpty (),
362+ "Sink had invalid configuration: "
363+ + sinkErrors .stream ()
364+ .map (cv -> format ("'%s': %s" , cv .name (), cv .errorMessages ()))
365+ .collect (Collectors .toList ()));
366+ }
367+
368+ private List <ConfigValue > getSinkErrors (final Map <String , String > properties ) {
321369 Config config = new MongoSinkConnector ().validate (properties );
322- List <String > errorMessages =
323- getConfigValue (config , MongoSourceConfig .CONNECTION_URI_CONFIG ).errorMessages ();
324- assertTrue (errorMessages .isEmpty (), format ("ErrorMessages not empty: %s" , errorMessages ));
370+ return config .configValues ().stream ()
371+ .filter (cv -> !cv .errorMessages ().isEmpty ())
372+ .collect (Collectors .toList ());
373+ }
374+
375+ private boolean collectionExists () {
376+ return collectionExists (DEFAULT_DATABASE_NAME , "test" );
377+ }
378+
379+ private boolean collectionExists (final String databaseName , final String collectionName ) {
380+ return getMongoClient ()
381+ .getDatabase (databaseName )
382+ .listCollectionNames ()
383+ .into (new ArrayList <>())
384+ .contains (collectionName );
325385 }
326386
327387 private void createUser (final String role ) {
@@ -394,6 +454,11 @@ private void dropUserAndRoles() {
394454 }
395455 }
396456
457+ private void dropDatabases () {
458+ tryAndIgnore (() -> getMongoClient ().getDatabase (DEFAULT_DATABASE_NAME ).drop ());
459+ tryAndIgnore (() -> getMongoClient ().getDatabase (CUSTOM_DATABASE ).drop ());
460+ }
461+
397462 public static void tryAndIgnore (final Runnable r ) {
398463 try {
399464 r .run ();
@@ -446,13 +511,6 @@ private boolean isReplicaSetOrSharded() {
446511 }
447512 }
448513
449- private ConfigValue getConfigValue (final Config config , final String configName ) {
450- return config .configValues ().stream ()
451- .filter (cv -> cv .name ().equals (configName ))
452- .collect (Collectors .toList ())
453- .get (0 );
454- }
455-
456514 private String getDatabaseName () {
457515 String databaseName = getConnectionString ().getDatabase ();
458516 return databaseName != null ? databaseName : DEFAULT_DATABASE_NAME ;
@@ -472,14 +530,19 @@ private Map<String, String> createSinkProperties() {
472530 private Map <String , String > createSinkProperties (final String connectionString ) {
473531 Map <String , String > properties = createProperties (connectionString );
474532 properties .put (MongoSinkConfig .TOPICS_CONFIG , "test" );
475- properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , "test" );
533+ properties .put (MongoSinkTopicConfig .DATABASE_CONFIG , DEFAULT_DATABASE_NAME );
476534 properties .put (MongoSinkTopicConfig .COLLECTION_CONFIG , "test" );
477535 return properties ;
478536 }
479537
538+ private Map <String , String > createSinkRegexProperties () {
539+ return createSinkRegexProperties (getConnectionString ().toString ());
540+ }
541+
480542 private Map <String , String > createSinkRegexProperties (final String connectionString ) {
481543 Map <String , String > properties = createSinkProperties (connectionString );
482544 properties .remove (MongoSinkConfig .TOPICS_CONFIG );
545+ properties .remove (MongoSinkTopicConfig .COLLECTION_CONFIG , "test" );
483546 properties .put (MongoSinkConfig .TOPICS_REGEX_CONFIG , "topic-(.*)" );
484547 return properties ;
485548 }
0 commit comments