Skip to content

Commit f440809

Browse files
committed
Resilient source
Can now support non-existent collections / databases And collection drops KAFKA-57 KAFKA-59
1 parent 4bcf76f commit f440809

File tree

2 files changed

+126
-39
lines changed

2 files changed

+126
-39
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorTest.java

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.List;
2626
import java.util.Properties;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.regex.Pattern;
2829
import java.util.stream.IntStream;
2930

@@ -42,8 +43,11 @@
4243
import com.mongodb.kafka.connect.mongodb.MongoKafkaTestCase;
4344
import com.mongodb.kafka.connect.source.MongoSourceConfig;
4445

46+
4547
public class MongoSourceConnectorTest extends MongoKafkaTestCase {
4648

49+
private static final AtomicInteger POSTFIX = new AtomicInteger();
50+
4751
@BeforeEach
4852
void setUp() {
4953
assumeTrue(isReplicaSetOrSharded());
@@ -63,10 +67,13 @@ void tearDown() {
6367
void testSourceLoadsDataFromMongoClient() {
6468
addSourceConnector();
6569

66-
MongoCollection<Document> coll1 = getDatabase("1").getCollection("coll");
67-
MongoCollection<Document> coll2 = getDatabase("2").getCollection("coll");
68-
MongoCollection<Document> coll3 = getDatabase("3").getCollection("coll");
69-
MongoCollection<Document> coll4 = getDatabase("1").getCollection("db1Coll2");
70+
MongoDatabase db1 = getDatabaseWithPostfix();
71+
MongoDatabase db2 = getDatabaseWithPostfix();
72+
MongoDatabase db3 = getDatabaseWithPostfix();
73+
MongoCollection<Document> coll1 = db1.getCollection("coll");
74+
MongoCollection<Document> coll2 = db2.getCollection("coll");
75+
MongoCollection<Document> coll3 = db3.getCollection("coll");
76+
MongoCollection<Document> coll4 = db1.getCollection("db1Coll2");
7077

7178
insertMany(rangeClosed(1, 50), coll1, coll2);
7279

@@ -76,7 +83,7 @@ void testSourceLoadsDataFromMongoClient() {
7683
() -> assertProduced(0, coll3));
7784

7885

79-
getDatabase("1").drop();
86+
db1.drop();
8087
insertMany(rangeClosed(51, 60), coll2, coll4);
8188
insertMany(rangeClosed(1, 70), coll3);
8289

@@ -95,7 +102,7 @@ void testSourceLoadsDataFromDatabase() {
95102
Pattern pattern = Pattern.compile(format("^%s.*", getDatabaseName()));
96103
consumer.subscribe(pattern);
97104

98-
MongoDatabase db = getDatabase("4");
105+
MongoDatabase db = getDatabaseWithPostfix();
99106

100107
Properties sourceProperties = new Properties();
101108
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, db.getName());
@@ -134,10 +141,39 @@ void testSourceLoadsDataFromDatabase() {
134141
}
135142
}
136143

144+
@Test
145+
@DisplayName("Ensure source can handle non existent database and survive dropping")
146+
void testSourceCanHandleNonExistentDatabaseAndSurviveDropping() throws InterruptedException {
147+
try (KafkaConsumer<?, ?> consumer = createConsumer()) {
148+
Pattern pattern = Pattern.compile(format("^%s.*", getDatabaseName()));
149+
consumer.subscribe(pattern);
150+
151+
MongoDatabase db = getDatabaseWithPostfix();
152+
MongoCollection<Document> coll = db.getCollection("coll");
153+
154+
Properties sourceProperties = new Properties();
155+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, db.getName());
156+
addSourceConnector(sourceProperties);
157+
158+
Thread.sleep(5000);
159+
assertProduced(0, coll);
160+
161+
insertMany(rangeClosed(1, 100), coll);
162+
assertProduced(100, coll);
163+
164+
db.drop();
165+
assertProduced(101, coll);
166+
167+
Thread.sleep(5000);
168+
insertMany(rangeClosed(1, 100), coll);
169+
assertProduced(201, coll);
170+
}
171+
}
172+
137173
@Test
138174
@DisplayName("Ensure source loads data from collection")
139175
void testSourceLoadsDataFromCollection() {
140-
MongoCollection<Document> coll = getDatabase("5").getCollection("coll");
176+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
141177

142178
Properties sourceProperties = new Properties();
143179
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
@@ -148,13 +184,36 @@ void testSourceLoadsDataFromCollection() {
148184
assertProduced(100, coll);
149185

150186
coll.drop();
187+
assertProduced(101, coll);
188+
}
189+
190+
@Test
191+
@DisplayName("Ensure source can handle non existent collection and survive dropping")
192+
void testSourceCanHandleNonExistentCollectionAndSurviveDropping() throws InterruptedException {
193+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
194+
195+
Properties sourceProperties = new Properties();
196+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
197+
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
198+
addSourceConnector(sourceProperties);
199+
200+
Thread.sleep(5000);
201+
assertProduced(0, coll);
202+
203+
insertMany(rangeClosed(1, 100), coll);
151204
assertProduced(100, coll);
205+
206+
coll.drop();
207+
assertProduced(101, coll);
208+
209+
insertMany(rangeClosed(1, 100), coll);
210+
assertProduced(201, coll);
152211
}
153212

154213
@Test
155214
@DisplayName("Ensure source loads data from collection and outputs documents only")
156215
void testSourceLoadsDataFromCollectionDocumentOnly() {
157-
MongoCollection<Document> coll = getDatabase("6").getCollection("coll");
216+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
158217

159218
Properties sourceProperties = new Properties();
160219
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
@@ -169,8 +228,8 @@ void testSourceLoadsDataFromCollectionDocumentOnly() {
169228
assertProduced(docs, coll);
170229
}
171230

172-
private MongoDatabase getDatabase(final String postfix) {
173-
return getMongoClient().getDatabase(format("%s%s", getDatabaseName(), postfix));
231+
private MongoDatabase getDatabaseWithPostfix() {
232+
return getMongoClient().getDatabase(format("%s%s", getDatabaseName(), POSTFIX.incrementAndGet()));
174233
}
175234

176235
private List<Document> insertMany(final IntStream stream, final MongoCollection<?>... collections) {

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ public class MongoSourceTask extends SourceTask {
6161
private static final String INVALIDATE = "invalidate";
6262

6363
private final Time time;
64+
private final AtomicBoolean isRunning = new AtomicBoolean();
6465
private MongoSourceConfig sourceConfig;
6566
private MongoClient mongoClient;
66-
private AtomicBoolean isRunning = new AtomicBoolean();
6767
private MongoCursor<BsonDocument> cursor;
6868

6969
public MongoSourceTask() {
@@ -106,7 +106,7 @@ public List<SourceRecord> poll() {
106106
Map<String, Object> partition = createPartitionMap(sourceConfig);
107107

108108
while (isRunning.get()) {
109-
Optional<BsonDocument> next = Optional.ofNullable(cursor.tryNext());
109+
Optional<BsonDocument> next = getNextDocument();
110110
long untilNext = nextUpdate - time.milliseconds();
111111

112112
if (!next.isPresent()) {
@@ -120,7 +120,7 @@ public List<SourceRecord> poll() {
120120
} else {
121121
BsonDocument changeStreamDocument = next.get();
122122
Map<String, String> sourceOffset = singletonMap("_id", changeStreamDocument.getDocument("_id").toJson());
123-
String topicName = getTopicNameFromNamespace(prefix, changeStreamDocument.getDocument("ns"));
123+
String topicName = getTopicNameFromNamespace(prefix, changeStreamDocument.getDocument("ns", new BsonDocument()));
124124

125125
Optional<String> jsonDocument = Optional.empty();
126126
if (publishFullDocumentOnly) {
@@ -140,8 +140,8 @@ public List<SourceRecord> poll() {
140140

141141
// If the cursor is invalidated add the record and return calls
142142
if (changeStreamDocument.getString("operationType", new BsonString("")).getValue().equalsIgnoreCase(INVALIDATE)) {
143-
LOGGER.info("Cursor has been invalidated, no further messages will be published");
144-
isRunning.set(false);
143+
LOGGER.info("Cursor has been invalidated.");
144+
cursor = null;
145145
return sourceRecords;
146146
} else if (sourceRecords.size() == maxBatchSize) {
147147
LOGGER.debug("Reached max batch size: {}, returning records", maxBatchSize);
@@ -169,30 +169,7 @@ public synchronized void stop() {
169169

170170
MongoCursor<BsonDocument> createCursor(final MongoSourceConfig cfg, final MongoClient mongoClient) {
171171
LOGGER.debug("Creating a MongoCursor");
172-
String database = cfg.getString(DATABASE_CONFIG);
173-
String collection = cfg.getString(COLLECTION_CONFIG);
174-
175-
Optional<List<Document>> pipeline = cfg.getPipeline();
176-
ChangeStreamIterable<Document> changeStream;
177-
if (database.isEmpty()) {
178-
LOGGER.info("Watching all changes on the cluster");
179-
changeStream = pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
180-
} else if (collection.isEmpty()) {
181-
LOGGER.info("Watching for database changes on '{}'", database);
182-
MongoDatabase db = mongoClient.getDatabase(database);
183-
changeStream = pipeline.map(db::watch).orElse(db.watch());
184-
} else {
185-
LOGGER.info("Watching for collection changes on '{}.{}'", database, collection);
186-
MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
187-
changeStream = pipeline.map(coll::watch).orElse(coll.watch());
188-
}
189-
190-
int batchSize = cfg.getInt(BATCH_SIZE_CONFIG);
191-
if (batchSize > 0) {
192-
changeStream.batchSize(batchSize);
193-
}
194-
cfg.getFullDocument().ifPresent(changeStream::fullDocument);
195-
cfg.getCollation().ifPresent(changeStream::collation);
172+
ChangeStreamIterable<Document> changeStream = getChangeStreamIterable(cfg, mongoClient);
196173

197174
Map<String, Object> offset = context != null ? context.offsetStorageReader().offset(createPartitionMap(cfg)) : null;
198175
if (offset != null) {
@@ -218,4 +195,55 @@ Map<String, Object> createPartitionMap(final MongoSourceConfig cfg) {
218195
return singletonMap("ns", format("%s/%s.%s", cfg.getString(CONNECTION_URI_CONFIG),
219196
cfg.getString(DATABASE_CONFIG), cfg.getString(COLLECTION_CONFIG)));
220197
}
198+
199+
/**
200+
* Returns the next document to be delivered to Kafka.
201+
*
202+
* <p>
203+
* Return the next result from the change stream cursor. Creating the cursor if necessary.
204+
* </p>
205+
*
206+
* @return the next document
207+
*/
208+
private Optional<BsonDocument> getNextDocument() {
209+
if (cursor == null) {
210+
cursor = createCursor(sourceConfig, mongoClient);
211+
}
212+
213+
try {
214+
return Optional.ofNullable(cursor.tryNext());
215+
} catch (Exception e) {
216+
cursor = null;
217+
return Optional.empty();
218+
}
219+
}
220+
221+
private ChangeStreamIterable<Document> getChangeStreamIterable(final MongoSourceConfig cfg,
222+
final MongoClient mongoClient) {
223+
String database = cfg.getString(DATABASE_CONFIG);
224+
String collection = cfg.getString(COLLECTION_CONFIG);
225+
226+
Optional<List<Document>> pipeline = cfg.getPipeline();
227+
ChangeStreamIterable<Document> changeStream;
228+
if (database.isEmpty()) {
229+
LOGGER.info("Watching all changes on the cluster");
230+
changeStream = pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
231+
} else if (collection.isEmpty()) {
232+
LOGGER.info("Watching for database changes on '{}'", database);
233+
MongoDatabase db = mongoClient.getDatabase(database);
234+
changeStream = pipeline.map(db::watch).orElse(db.watch());
235+
} else {
236+
LOGGER.info("Watching for collection changes on '{}.{}'", database, collection);
237+
MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
238+
changeStream = pipeline.map(coll::watch).orElse(coll.watch());
239+
}
240+
241+
int batchSize = cfg.getInt(BATCH_SIZE_CONFIG);
242+
if (batchSize > 0) {
243+
changeStream.batchSize(batchSize);
244+
}
245+
cfg.getFullDocument().ifPresent(changeStream::fullDocument);
246+
cfg.getCollation().ifPresent(changeStream::collation);
247+
return changeStream;
248+
}
221249
}

0 commit comments

Comments
 (0)