Skip to content

Commit ebc867b

Browse files
authored
Added support for automatic collection creation for 5.0
Added support for automatic time-series collection creation for 5.0 KAFKA-228
1 parent fd30c01 commit ebc867b

File tree

13 files changed

+793
-16
lines changed

13 files changed

+793
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
## 1.6.0
66

77
### Improvements
8+
- [KAFKA-228](https://jira.mongodb.org/browse/KAFKA-228) Added support for automatic timeseries collection creation for 5.0
89
- [KAFKA-215](https://jira.mongodb.org/browse/KAFKA-215) Added mongo specific override options for error handling properties
910
- [KAFKA-222](https://jira.mongodb.org/browse/KAFKA-222) Added a new jar `mongo-kafka-connect-<version>-confluent.jar` which just contains
1011
the dependencies needed for running the connector with confluent. `mongo-kafka-connect-<version>-all.jar` now also includes `Avro`

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ repositories {
5454
}
5555

5656
extra.apply {
57-
set("mongodbDriverVersion", "[4.2,4.2.99)")
57+
set("mongodbDriverVersion", "[4.3,4.3.99)")
5858
set("kafkaVersion", "2.6.0")
5959
set("avroVersion", "1.9.2")
6060

src/integrationTest/java/com/mongodb/kafka/connect/ConnectorValidationIntegrationTest.java

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,19 @@
1616

1717
package com.mongodb.kafka.connect;
1818

19+
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPIC_OVERRIDE_CONFIG;
20+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG;
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_GRANULARITY_CONFIG;
22+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG;
23+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG;
24+
import static com.mongodb.kafka.connect.util.MongoClientHelper.isAtleastFiveDotZero;
1925
import static java.lang.String.format;
2026
import static java.util.Arrays.asList;
2127
import static java.util.Collections.emptyList;
2228
import static java.util.Collections.singletonList;
2329
import static org.junit.jupiter.api.Assertions.assertFalse;
2430
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
2532
import static org.junit.jupiter.api.Assumptions.assumeTrue;
2633

2734
import java.util.ArrayList;
@@ -42,6 +49,7 @@
4249
import org.bson.Document;
4350

4451
import com.mongodb.ConnectionString;
52+
import com.mongodb.MongoCredential;
4553
import com.mongodb.client.MongoClient;
4654
import com.mongodb.client.MongoClients;
4755
import com.mongodb.client.MongoDatabase;
@@ -199,6 +207,116 @@ void testSinkConfigValidationCollectionBasedPrivileges() {
199207
assertValidSink(properties, MongoSinkConfig.CONNECTION_URI_CONFIG);
200208
}
201209

210+
@Test
211+
@DisplayName("Ensure sink timeseries validation works as expected")
212+
void testSinkConfigValidationTimeseries() {
213+
assumeTrue(isAtleastFiveDotZero(getMongoClient()));
214+
215+
// Missing timefield
216+
Map<String, String> properties = createSinkProperties();
217+
properties.put(TIMESERIES_GRANULARITY_CONFIG, "hours");
218+
assertInvalidSink(properties, TIMESERIES_GRANULARITY_CONFIG);
219+
220+
properties.put(TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG, "1");
221+
assertInvalidSink(properties, TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG);
222+
223+
properties.put(TIMESERIES_METAFIELD_CONFIG, "meta");
224+
assertInvalidSink(properties, TIMESERIES_METAFIELD_CONFIG);
225+
226+
properties.put(TIMESERIES_TIMEFIELD_CONFIG, "ts");
227+
assertValidSink(properties);
228+
229+
// Confirm collection created
230+
assertTrue(collectionExists());
231+
232+
// Create normal collection confirm invalid.
233+
dropDatabases();
234+
getMongoClient().getDatabase(DEFAULT_DATABASE_NAME).createCollection("test");
235+
assertInvalidSink(properties, TIMESERIES_TIMEFIELD_CONFIG);
236+
}
237+
238+
@Test
239+
@DisplayName("Ensure sink timeseries validation works as expected when using regex config")
240+
void testSinkConfigValidationTimeseriesRegex() {
241+
assumeTrue(isAtleastFiveDotZero(getMongoClient()));
242+
243+
// Missing timefield
244+
Map<String, String> properties = createSinkRegexProperties();
245+
properties.put(TIMESERIES_GRANULARITY_CONFIG, "hours");
246+
assertInvalidSink(properties);
247+
assertInvalidSink(properties, TIMESERIES_GRANULARITY_CONFIG);
248+
249+
properties.put(TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG, "1");
250+
assertInvalidSink(properties, TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG);
251+
252+
properties.put(TIMESERIES_METAFIELD_CONFIG, "meta");
253+
assertInvalidSink(properties, TIMESERIES_METAFIELD_CONFIG);
254+
255+
properties.put(TIMESERIES_TIMEFIELD_CONFIG, "ts");
256+
assertValidSink(properties);
257+
258+
// Confirm no collection created
259+
assertFalse(collectionExists());
260+
}
261+
262+
@Test
263+
@DisplayName(
264+
"Ensure sink timeseries validation works as expected when using regex config with overrides")
265+
void testSinkConfigValidationTimeseriesRegexWithOverrides() {
266+
assumeTrue(isAtleastFiveDotZero(getMongoClient()));
267+
268+
Map<String, String> properties = createSinkRegexProperties();
269+
properties.put(MongoSinkTopicConfig.COLLECTION_CONFIG, "test");
270+
properties.put(
271+
format(TOPIC_OVERRIDE_CONFIG, "topic-test", TIMESERIES_GRANULARITY_CONFIG), "hours");
272+
assertInvalidSink(properties, TIMESERIES_GRANULARITY_CONFIG);
273+
274+
properties.put(
275+
format(TOPIC_OVERRIDE_CONFIG, "topic-test", TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG), "1");
276+
assertInvalidSink(properties, TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG);
277+
278+
properties.put(
279+
format(TOPIC_OVERRIDE_CONFIG, "topic-test", TIMESERIES_METAFIELD_CONFIG), "meta");
280+
assertInvalidSink(properties, TIMESERIES_METAFIELD_CONFIG);
281+
282+
properties.put(format(TOPIC_OVERRIDE_CONFIG, "topic-test", TIMESERIES_TIMEFIELD_CONFIG), "ts");
283+
assertValidSink(properties);
284+
285+
// Confirm collection created thanks to override name
286+
assertTrue(collectionExists());
287+
}
288+
289+
@Test
290+
@DisplayName("Ensure sink validation when timeseries not supported")
291+
void testSinkConfigValidationTimeseriesNotSupported() {
292+
assumeFalse(isAtleastFiveDotZero(getMongoClient()));
293+
294+
Map<String, String> properties = createSinkProperties();
295+
properties.put(TIMESERIES_TIMEFIELD_CONFIG, "ts");
296+
assertInvalidSink(properties, TIMESERIES_TIMEFIELD_CONFIG);
297+
}
298+
299+
@Test
300+
@DisplayName("Ensure sink validation timeseries auth permissions")
301+
void testSinkConfigAuthValidationTimeseries() {
302+
assumeTrue(isAuthEnabled());
303+
assumeTrue(isAtleastFiveDotZero(getMongoClient()));
304+
305+
Map<String, String> properties = createSinkProperties(getConnectionStringForCustomUser());
306+
properties.put(MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG, "ts");
307+
308+
// Missing permissions
309+
createUserFromDocument(format("{ role: 'read', db: '%s'}", getDatabaseName()));
310+
assertInvalidSink(properties);
311+
312+
// Add permissions
313+
dropUserAndRoles();
314+
createUserFromDocument(format("{ role: 'readWrite', db: '%s'}", getDatabaseName()));
315+
316+
assertValidSink(properties);
317+
assertTrue(collectionExists());
318+
}
319+
202320
@Test
203321
@DisplayName(
204322
"Ensure sink validation passes with specific collection based privileges with a different auth db")
@@ -385,7 +503,7 @@ private boolean collectionExists(final String databaseName, final String collect
385503
}
386504

387505
private void createUser(final String role) {
388-
createUser(getConnectionString().getCredential().getSource(), role);
506+
createUser(getAuthSource(), role);
389507
}
390508

391509
private void createUser(final String databaseName, final String role) {
@@ -409,7 +527,7 @@ private void createUserFromDocument(final String role) {
409527

410528
private void createUserFromDocument(final List<String> roles) {
411529
getMongoClient()
412-
.getDatabase(getConnectionString().getCredential().getSource())
530+
.getDatabase(getAuthSource())
413531
.runCommand(
414532
Document.parse(
415533
format(
@@ -422,7 +540,7 @@ private void createUserWithCustomRole(final List<String> privileges) {
422540
}
423541

424542
private void createUserWithCustomRole(final List<String> privileges, final List<String> roles) {
425-
createUserWithCustomRole(getConnectionString().getCredential().getSource(), privileges, roles);
543+
createUserWithCustomRole(getAuthSource(), privileges, roles);
426544
}
427545

428546
private void createUserWithCustomRole(
@@ -441,7 +559,7 @@ private void dropUserAndRoles() {
441559
if (isAuthEnabled()) {
442560
List<MongoDatabase> databases =
443561
asList(
444-
getMongoClient().getDatabase(getConnectionString().getCredential().getSource()),
562+
getMongoClient().getDatabase(getAuthSource()),
445563
getMongoClient().getDatabase(CUSTOM_DATABASE));
446564

447565
for (final MongoDatabase database : databases) {
@@ -475,7 +593,7 @@ private MongoClient getMongoClient() {
475593
}
476594

477595
private String getConnectionStringForCustomUser() {
478-
return getConnectionStringForCustomUser(getConnectionString().getCredential().getSource());
596+
return getConnectionStringForCustomUser(getAuthSource());
479597
}
480598

481599
private String getConnectionStringForCustomUser(final String authSource) {
@@ -486,8 +604,7 @@ private String getConnectionStringForCustomUser(final String authSource) {
486604
format("%s%s:%s@%s", scheme, CUSTOM_USER, CUSTOM_PASSWORD, hostsAndQuery);
487605
userConnectionString =
488606
userConnectionString.replace(
489-
format("authSource=%s", getConnectionString().getCredential().getSource()),
490-
format("authSource=%s", authSource));
607+
format("authSource=%s", getAuthSource()), format("authSource=%s", authSource));
491608

492609
if (!userConnectionString.contains("authSource")) {
493610
String separator = userConnectionString.contains("/?") ? "&" : "?";
@@ -501,6 +618,12 @@ private boolean isAuthEnabled() {
501618
return getConnectionString().getCredential() != null;
502619
}
503620

621+
private String getAuthSource() {
622+
return Optional.ofNullable(getConnectionString().getCredential())
623+
.map(MongoCredential::getSource)
624+
.orElseThrow(() -> new AssertionError("No auth credential"));
625+
}
626+
504627
private boolean isReplicaSetOrSharded() {
505628
try (MongoClient mongoClient = MongoClients.create(getConnectionString())) {
506629
Document isMaster =

src/integrationTest/java/com/mongodb/kafka/connect/sink/MongoSinkTaskIntegrationTest.java

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,22 @@
2020
import static java.lang.String.format;
2121
import static java.util.stream.Collectors.toList;
2222
import static java.util.stream.IntStream.rangeClosed;
23+
import static org.junit.jupiter.api.Assertions.assertEquals;
2324
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
2425
import static org.junit.jupiter.api.Assertions.assertThrows;
2526
import static org.junit.jupiter.api.Assertions.assertTrue;
27+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
2628

2729
import java.util.ArrayList;
2830
import java.util.Collection;
31+
import java.util.Date;
2932
import java.util.HashMap;
3033
import java.util.List;
3134
import java.util.Map;
3235
import java.util.function.Function;
3336
import java.util.stream.Stream;
3437

38+
import org.apache.kafka.common.config.ConfigException;
3539
import org.apache.kafka.connect.data.Schema;
3640
import org.apache.kafka.connect.errors.DataException;
3741
import org.apache.kafka.connect.sink.SinkRecord;
@@ -98,7 +102,7 @@ void testSinkCanHandleTombstoneNullEvents() {
98102
documents.stream(),
99103
Document::toJson,
100104
d -> format("{op: 'c', after: '%s'}", d.toJson()),
101-
d -> d.get("_id", 0L));
105+
d -> d.get("_id", 0));
102106

103107
sinkRecords.add(
104108
5,
@@ -202,8 +206,7 @@ void testSinkCanHandleInvalidValueWhenErrorToleranceIsAll() {
202206
.collect(toList());
203207

204208
List<SinkRecord> sinkRecords =
205-
createRecords(
206-
documents.stream(), Document::toJson, Document::toJson, d -> d.get("c", 0L));
209+
createRecords(documents.stream(), Document::toJson, Document::toJson, d -> d.get("c", 0));
207210
task.put(sinkRecords);
208211

209212
assertIterableEquals(
@@ -248,7 +251,7 @@ void testSinkCanHandleInvalidDocumentWhenErrorToleranceIsAll() {
248251
documents.stream(),
249252
Document::toJson,
250253
d -> d.get("_id", 0) != 4 ? d.toJson() : "a",
251-
d -> d.get("c", 0L));
254+
d -> d.get("c", 0));
252255
task.put(sinkRecords);
253256

254257
assertIterableEquals(
@@ -306,6 +309,93 @@ void testSinkCanHandleInvalidCDCWhenErrorToleranceIsAll() {
306309
}
307310
}
308311

312+
@Test
313+
@DisplayName("Ensure sink regex timeseries errors if cannot create")
314+
void testSinkRegexTimeseriesCannotCreate() {
315+
Map<String, String> cfg = createSettings();
316+
cfg.remove(MongoSinkConfig.TOPICS_CONFIG);
317+
cfg.put(MongoSinkConfig.TOPICS_REGEX_CONFIG, "topic-(.*)");
318+
cfg.put(MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG, "ts");
319+
cfg.put(MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG, "meta");
320+
321+
getCollection().insertOne(new Document());
322+
try (AutoCloseableSinkTask task = createSinkTask()) {
323+
task.start(cfg);
324+
325+
List<Document> documents =
326+
rangeClosed(1, 11)
327+
.mapToObj(
328+
i -> {
329+
Document doc = new Document("_id", i);
330+
doc.put("ts", new Date());
331+
doc.put("meta", "meta");
332+
return doc;
333+
})
334+
.collect(toList());
335+
List<SinkRecord> sinkRecords = createRecords(documents);
336+
assertThrows(ConfigException.class, () -> task.put(sinkRecords));
337+
}
338+
}
339+
340+
@Test
341+
@DisplayName("Ensure sink regex timeseries errors missing timefield create")
342+
void testSinkRegexTimeseriesMissingTimefield() {
343+
assumeTrue(isGreaterThanFourDotFour());
344+
Map<String, String> cfg = createSettings();
345+
cfg.remove(MongoSinkConfig.TOPICS_CONFIG);
346+
cfg.put(MongoSinkConfig.TOPICS_REGEX_CONFIG, "topic-(.*)");
347+
cfg.put(MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG, "ts");
348+
cfg.put(MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG, "meta");
349+
350+
try (AutoCloseableSinkTask task = createSinkTask()) {
351+
task.start(cfg);
352+
353+
List<Document> documents =
354+
rangeClosed(1, 11)
355+
.mapToObj(
356+
i -> {
357+
Document doc = new Document("_id", i);
358+
if (i != 4) {
359+
doc.put("ts", new Date());
360+
}
361+
doc.put("meta", "meta");
362+
return doc;
363+
})
364+
.collect(toList());
365+
List<SinkRecord> sinkRecords = createRecords(documents);
366+
assertThrows(DataException.class, () -> task.put(sinkRecords));
367+
}
368+
}
369+
370+
@Test
371+
@DisplayName("Ensure sink regex timeseries works as expected")
372+
void testSinkRegexTimeseriesWorks() {
373+
assumeTrue(isGreaterThanFourDotFour());
374+
Map<String, String> cfg = createSettings();
375+
cfg.remove(MongoSinkConfig.TOPICS_CONFIG);
376+
cfg.put(MongoSinkConfig.TOPICS_REGEX_CONFIG, "topic-(.*)");
377+
cfg.put(MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG, "ts");
378+
cfg.put(MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG, "meta");
379+
380+
try (AutoCloseableSinkTask task = createSinkTask()) {
381+
task.start(cfg);
382+
383+
List<Document> documents =
384+
rangeClosed(1, 11)
385+
.mapToObj(
386+
i -> {
387+
Document doc = new Document("_id", i);
388+
doc.put("ts", new Date());
389+
doc.put("meta", i);
390+
return doc;
391+
})
392+
.collect(toList());
393+
List<SinkRecord> sinkRecords = createRecords(documents);
394+
task.put(sinkRecords);
395+
assertEquals(getCollection().countDocuments(), 11);
396+
}
397+
}
398+
309399
public AutoCloseableSinkTask createSinkTask() {
310400
return new AutoCloseableSinkTask(new MongoSinkTask());
311401
}

0 commit comments

Comments
 (0)