Skip to content

Commit 248b658

Browse files
committed
Added copy existing data support to the source connector
KAFKA-65
1 parent f440809 commit 248b658

File tree

10 files changed

+749
-73
lines changed

10 files changed

+749
-73
lines changed

config/checkstyle/checkstyle.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,9 @@
119119
<module name="LineLength">
120120
<property name="max" value="140"/>
121121
</module>
122-
123-
<module name="MethodLength"/>
122+
<module name="MethodLength">
123+
<property name="max" value="170"/>
124+
</module>
124125
<module name="ParameterNumber">
125126
<property name="max" value="11"/>
126127
</module>

config/checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<suppressions>
2323
<suppress checks="LineLength" files=".*MongoSinkTopicConfig.java"/>
2424
<suppress checks="MethodLength" files=".*MongoSinkTopicConfig.java"/>
25+
<suppress checks="MethodLength" files=".*MongoSourceConfig.java"/>
2526
<suppress checks="LineLength" files=".*RecordConverterTest.java"/>
2627
<suppress checks="MethodLength" files=".*RecordConverterTest.java"/>
2728

src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ class MongoSinkConnectorTest extends MongoKafkaTestCase {
4141
@Test
4242
@DisplayName("Ensure simple producer sends data")
4343
void testASimpleProducerSmokeTest() {
44-
KAFKA.createTopic(getTopicName());
44+
String topicName = getTopicName();
45+
KAFKA.createTopic(topicName);
4546

4647
Properties props = new Properties();
47-
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getTopicName());
48+
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, topicName);
4849
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
4950
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
5051
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -56,11 +57,11 @@ void testASimpleProducerSmokeTest() {
5657
producer.beginTransaction();
5758

5859
IntStream.range(0, 10).forEach(i -> {
59-
producer.send(new ProducerRecord<>(getTopicName(), i, "Hello, World!"));
60+
producer.send(new ProducerRecord<>(topicName, i, "Hello, World!"));
6061
});
6162
producer.commitTransaction();
6263

63-
assertProduced(10);
64+
assertProduced(10, topicName);
6465
}
6566

6667
@Test
@@ -73,11 +74,12 @@ void testSinkSavesAvroDataToMongoDB() {
7374
.build()
7475
);
7576

76-
KAFKA.createTopic(getTopicName());
77-
addSinkConnector();
77+
String topicName = getTopicName();
78+
KAFKA.createTopic(topicName);
79+
addSinkConnector(topicName);
7880

7981
Properties producerProps = new Properties();
80-
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getTopicName());
82+
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, topicName);
8183
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
8284
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
8385
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
@@ -86,10 +88,10 @@ void testSinkSavesAvroDataToMongoDB() {
8688

8789
producer.initTransactions();
8890
producer.beginTransaction();
89-
tweets.forEach(tweet -> producer.send(new ProducerRecord<>(getTopicName(), tweet)));
91+
tweets.forEach(tweet -> producer.send(new ProducerRecord<>(topicName, tweet)));
9092
producer.commitTransaction();
9193

92-
assertProduced(100);
94+
assertProduced(100, topicName);
9395
assertEquals(100, getCollection().countDocuments());
9496
}
9597
}

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorTest.java

Lines changed: 112 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.ArrayList;
2525
import java.util.List;
2626
import java.util.Properties;
27-
import java.util.concurrent.atomic.AtomicInteger;
2827
import java.util.regex.Pattern;
2928
import java.util.stream.IntStream;
3029

@@ -43,11 +42,8 @@
4342
import com.mongodb.kafka.connect.mongodb.MongoKafkaTestCase;
4443
import com.mongodb.kafka.connect.source.MongoSourceConfig;
4544

46-
4745
public class MongoSourceConnectorTest extends MongoKafkaTestCase {
4846

49-
private static final AtomicInteger POSTFIX = new AtomicInteger();
50-
5147
@BeforeEach
5248
void setUp() {
5349
assumeTrue(isReplicaSetOrSharded());
@@ -63,7 +59,7 @@ void tearDown() {
6359
}
6460

6561
@Test
66-
@DisplayName("Ensure source loads data from MongoDB MongoClient")
62+
@DisplayName("Ensure source loads data from MongoClient")
6763
void testSourceLoadsDataFromMongoClient() {
6864
addSourceConnector();
6965

@@ -96,7 +92,41 @@ void testSourceLoadsDataFromMongoClient() {
9692
}
9793

9894
@Test
99-
@DisplayName("Ensure source loads data from MongoDB database")
95+
@DisplayName("Ensure source loads data from MongoClient with copy existing data")
96+
void testSourceLoadsDataFromMongoClientWithCopyExisting() {
97+
MongoDatabase db1 = getDatabaseWithPostfix();
98+
MongoDatabase db2 = getDatabaseWithPostfix();
99+
MongoDatabase db3 = getDatabaseWithPostfix();
100+
MongoCollection<Document> coll1 = db1.getCollection("coll");
101+
MongoCollection<Document> coll2 = db2.getCollection("coll");
102+
MongoCollection<Document> coll3 = db3.getCollection("coll");
103+
MongoCollection<Document> coll4 = db1.getCollection("db1Coll2");
104+
105+
insertMany(rangeClosed(1, 50), coll1, coll2);
106+
107+
Properties sourceProperties = new Properties();
108+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
109+
addSourceConnector(sourceProperties);
110+
111+
assertAll(
112+
() -> assertProduced(50, coll1),
113+
() -> assertProduced(50, coll2),
114+
() -> assertProduced(0, coll3));
115+
116+
db1.drop();
117+
insertMany(rangeClosed(51, 60), coll2, coll4);
118+
insertMany(rangeClosed(1, 70), coll3);
119+
120+
assertAll(
121+
() -> assertProduced(51, coll1),
122+
() -> assertProduced(60, coll2),
123+
() -> assertProduced(70, coll3),
124+
() -> assertProduced(10, coll4)
125+
);
126+
}
127+
128+
@Test
129+
@DisplayName("Ensure source loads data from database")
100130
void testSourceLoadsDataFromDatabase() {
101131
try (KafkaConsumer<?, ?> consumer = createConsumer()) {
102132
Pattern pattern = Pattern.compile(format("^%s.*", getDatabaseName()));
@@ -141,6 +171,53 @@ void testSourceLoadsDataFromDatabase() {
141171
}
142172
}
143173

174+
@Test
175+
@DisplayName("Ensure source loads data from database with copy existing data")
176+
void testSourceLoadsDataFromDatabaseCopyExisting() {
177+
try (KafkaConsumer<?, ?> consumer = createConsumer()) {
178+
Pattern pattern = Pattern.compile(format("^%s.*", getDatabaseName()));
179+
consumer.subscribe(pattern);
180+
181+
MongoDatabase db = getDatabaseWithPostfix();
182+
183+
MongoCollection<Document> coll1 = db.getCollection("coll1");
184+
MongoCollection<Document> coll2 = db.getCollection("coll2");
185+
MongoCollection<Document> coll3 = db.getCollection("coll3");
186+
187+
insertMany(rangeClosed(1, 50), coll1, coll2);
188+
189+
Properties sourceProperties = new Properties();
190+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, db.getName());
191+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
192+
addSourceConnector(sourceProperties);
193+
194+
assertAll(
195+
() -> assertProduced(50, coll1),
196+
() -> assertProduced(50, coll2),
197+
() -> assertProduced(0, coll3)
198+
);
199+
200+
// Update some of the collections
201+
coll1.drop();
202+
coll2.drop();
203+
204+
insertMany(rangeClosed(1, 20), coll3);
205+
206+
String collName4 = "coll4";
207+
coll3.renameCollection(new MongoNamespace(getDatabaseName(), collName4));
208+
MongoCollection<Document> coll4 = db.getCollection(collName4);
209+
210+
insertMany(rangeClosed(21, 30), coll4);
211+
212+
assertAll(
213+
() -> assertProduced(51, coll1),
214+
() -> assertProduced(51, coll2),
215+
() -> assertProduced(21, coll3),
216+
() -> assertProduced(10, coll4)
217+
);
218+
}
219+
}
220+
144221
@Test
145222
@DisplayName("Ensure source can handle non existent database and survive dropping")
146223
void testSourceCanHandleNonExistentDatabaseAndSurviveDropping() throws InterruptedException {
@@ -187,6 +264,28 @@ void testSourceLoadsDataFromCollection() {
187264
assertProduced(101, coll);
188265
}
189266

267+
@Test
268+
@DisplayName("Ensure source loads data from collection with copy existing data")
269+
void testSourceLoadsDataFromCollectionCopyExisting() {
270+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
271+
272+
insertMany(rangeClosed(1, 50), coll);
273+
274+
Properties sourceProperties = new Properties();
275+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
276+
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
277+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
278+
addSourceConnector(sourceProperties);
279+
280+
assertProduced(50, coll);
281+
282+
insertMany(rangeClosed(51, 100), coll);
283+
assertProduced(100, coll);
284+
285+
coll.drop();
286+
assertProduced(101, coll);
287+
}
288+
190289
@Test
191290
@DisplayName("Ensure source can handle non existent collection and survive dropping")
192291
void testSourceCanHandleNonExistentCollectionAndSurviveDropping() throws InterruptedException {
@@ -215,17 +314,22 @@ void testSourceCanHandleNonExistentCollectionAndSurviveDropping() throws Interru
215314
void testSourceLoadsDataFromCollectionDocumentOnly() {
216315
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
217316

317+
List<Document> docs = insertMany(rangeClosed(1, 50), coll);
318+
218319
Properties sourceProperties = new Properties();
219320
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
220321
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
221322
sourceProperties.put(MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG, "true");
323+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
222324
addSourceConnector(sourceProperties);
223325

224-
List<Document> docs = insertMany(rangeClosed(1, 100), coll);
225326
assertProduced(docs, coll);
226327

328+
List<Document> allDocs = new ArrayList<>(docs);
329+
allDocs.addAll(insertMany(rangeClosed(51, 100), coll));
330+
227331
coll.drop();
228-
assertProduced(docs, coll);
332+
assertProduced(allDocs, coll);
229333
}
230334

231335
private MongoDatabase getDatabaseWithPostfix() {

src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.mongodb.kafka.connect.mongodb;
1717

18+
import static java.lang.String.format;
1819
import static java.util.Collections.singletonList;
1920
import static org.apache.kafka.common.utils.Utils.sleep;
2021
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -23,6 +24,7 @@
2324
import java.util.ArrayList;
2425
import java.util.List;
2526
import java.util.Properties;
27+
import java.util.concurrent.atomic.AtomicInteger;
2628
import java.util.stream.Collectors;
2729

2830
import io.confluent.connect.avro.AvroConverter;
@@ -51,14 +53,15 @@
5153

5254
public class MongoKafkaTestCase {
5355
protected static final Logger LOGGER = LoggerFactory.getLogger(MongoKafkaTestCase.class);
56+
protected static final AtomicInteger POSTFIX = new AtomicInteger();
5457

5558
@RegisterExtension
5659
public static final EmbeddedKafka KAFKA = new EmbeddedKafka();
5760
@RegisterExtension
5861
public static final MongoDBHelper MONGODB = new MongoDBHelper();
5962

6063
public String getTopicName() {
61-
return getCollection().getNamespace().getFullName();
64+
return format("%s%s", getCollection().getNamespace().getFullName(), POSTFIX.incrementAndGet());
6265
}
6366

6467
public MongoClient getMongoClient() {
@@ -91,10 +94,6 @@ public boolean isReplicaSetOrSharded() {
9194
return isMaster.containsKey("setName") || isMaster.get("msg", "").equals("isdbgrid");
9295
}
9396

94-
public void assertProduced(final int expectedCount) {
95-
assertProduced(expectedCount, getCollection());
96-
}
97-
9897
public void assertProduced(final int expectedCount, final MongoCollection<?> coll) {
9998
assertProduced(expectedCount, coll.getNamespace().getFullName());
10099
}
@@ -114,8 +113,8 @@ public List<Bytes> getProduced(final int expectedCount, final String topicName)
114113
consumer.subscribe(singletonList(topicName));
115114
List<Bytes> data = new ArrayList<>();
116115
int retryCount = 0;
117-
while (data.size() < expectedCount && retryCount < 5) {
118-
consumer.poll(Duration.ofSeconds(10)).records(topicName).forEach((r) -> data.add((Bytes) r.value()));
116+
while (data.size() < expectedCount && retryCount < 30) {
117+
consumer.poll(Duration.ofSeconds(2)).records(topicName).forEach((r) -> data.add((Bytes) r.value()));
119118
retryCount++;
120119
LOGGER.info("Polling {} ({}) seen: #{}", topicName, retryCount, data.size());
121120
}
@@ -136,13 +135,9 @@ public List<Bytes> getProduced(final int expectedCount, final String topicName)
136135
return new KafkaConsumer<>(props);
137136
}
138137

139-
public void addSinkConnector() {
140-
addSinkConnector(new Properties());
141-
}
142-
143-
public void addSinkConnector(final Properties overrides) {
138+
public void addSinkConnector(final String topicName) {
144139
Properties props = new Properties();
145-
props.put("topics", getTopicName());
140+
props.put("topics", topicName);
146141
props.put("connector.class", MongoSinkConnector.class.getName());
147142
props.put(MongoSinkConfig.CONNECTION_URI_CONFIG, MONGODB.getConnectionString().toString());
148143
props.put(MongoSinkTopicConfig.DATABASE_CONFIG, MONGODB.getDatabaseName());
@@ -153,7 +148,6 @@ public void addSinkConnector(final Properties overrides) {
153148
props.put("value.converter", AvroConverter.class.getName());
154149
props.put("value.converter.schema.registry.url", KAFKA.schemaRegistryUrl());
155150

156-
overrides.forEach(props::put);
157151
KAFKA.addSinkConnector(props);
158152
}
159153

0 commit comments

Comments
 (0)