Skip to content

Commit ddd5684

Browse files
committed
Ensure filtered change streams survive invalidate events
KAFKA-83
1 parent 7e6bf97 commit ddd5684

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,30 @@ void testSourceCanHandleNonExistentCollectionAndSurviveDropping() throws Interru
309309
assertProduced(201, coll);
310310
}
311311

312+
@Test
313+
@DisplayName("Ensure source can handle non existent collection and survive dropping")
314+
void testSourceCanSurviveDroppingWithPipelineWatchingInsertsOnly() throws InterruptedException {
315+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
316+
317+
Properties sourceProperties = new Properties();
318+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
319+
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
320+
sourceProperties.put(MongoSourceConfig.PIPELINE_CONFIG, "[{\"$match\": {\"operationType\": \"insert\"}}]");
321+
addSourceConnector(sourceProperties);
322+
323+
Thread.sleep(5000);
324+
assertProduced(0, coll);
325+
326+
insertMany(rangeClosed(1, 50), coll);
327+
assertProduced(50, coll);
328+
329+
coll.drop();
330+
Thread.sleep(5000);
331+
332+
insertMany(rangeClosed(1, 50), coll);
333+
assertProduced(100, coll);
334+
}
335+
312336
@Test
313337
@DisplayName("Ensure source loads data from collection and outputs documents only")
314338
void testSourceLoadsDataFromCollectionDocumentOnly() {

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646

4747
import org.bson.BsonDocument;
4848
import org.bson.BsonDocumentWrapper;
49-
import org.bson.BsonString;
5049
import org.bson.Document;
5150

5251
import com.mongodb.MongoClientSettings;
@@ -90,7 +89,6 @@
9089
*/
9190
public class MongoSourceTask extends SourceTask {
9291
private static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
93-
private static final String INVALIDATE = "invalidate";
9492

9593
private final Time time;
9694
private final AtomicBoolean isRunning = new AtomicBoolean();
@@ -189,12 +187,7 @@ public List<SourceRecord> poll() {
189187
Schema.STRING_SCHEMA, json));
190188
});
191189

192-
// If the cursor is invalidated add the record and return calls
193-
if (changeStreamDocument.getString("operationType", new BsonString("")).getValue().equalsIgnoreCase(INVALIDATE)) {
194-
LOGGER.info("Cursor has been invalidated.");
195-
cursor = null;
196-
return sourceRecords;
197-
} else if (sourceRecords.size() == maxBatchSize) {
190+
if (sourceRecords.size() == maxBatchSize) {
198191
LOGGER.debug("Reached max batch size: {}, returning records", maxBatchSize);
199192
return sourceRecords;
200193
}
@@ -317,7 +310,12 @@ private Optional<BsonDocument> getNextDocument() {
317310
}
318311

319312
try {
320-
return Optional.ofNullable(cursor.tryNext());
313+
BsonDocument next = cursor.tryNext();
314+
if (next == null && cursor.getServerCursor() == null) {
315+
cursor.close();
316+
cursor = null;
317+
}
318+
return Optional.ofNullable(next);
321319
} catch (Exception e) {
322320
cursor.close();
323321
cursor = null;

0 commit comments

Comments
 (0)