Skip to content

Commit d89f0c0

Browse files
committed
Fix resumability of change streams
KAFKA-86
1 parent ddd5684 commit d89f0c0

File tree

7 files changed

+424
-125
lines changed

7 files changed

+424
-125
lines changed

.evergreen/config.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ axes:
343343
display_name: "4.0"
344344
variables:
345345
VERSION: "4.0"
346+
- id: "4.2"
347+
display_name: "4.2"
348+
variables:
349+
VERSION: "4.2"
346350
- id: "latest"
347351
display_name: "latest"
348352
variables:

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorTest.java

Lines changed: 156 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@
1515
*/
1616
package com.mongodb.kafka.connect;
1717

18+
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.concat;
19+
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createDropCollection;
20+
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createDropDatabase;
21+
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createInsert;
22+
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createInserts;
1823
import static java.lang.String.format;
24+
import static java.util.Arrays.asList;
25+
import static java.util.Collections.emptyList;
26+
import static java.util.Collections.singletonList;
1927
import static java.util.stream.Collectors.toList;
2028
import static java.util.stream.IntStream.rangeClosed;
2129
import static org.junit.jupiter.api.Assertions.assertAll;
@@ -74,26 +82,31 @@ void testSourceLoadsDataFromMongoClient() {
7482
insertMany(rangeClosed(1, 50), coll1, coll2);
7583

7684
assertAll(
77-
() -> assertProduced(50, coll1),
78-
() -> assertProduced(50, coll2),
79-
() -> assertProduced(0, coll3));
85+
() -> assertProduced(createInserts(1, 50), coll1),
86+
() -> assertProduced(createInserts(1, 50), coll2),
87+
() -> assertProduced(emptyList(), coll3)
88+
);
8089

8190

8291
db1.drop();
8392
insertMany(rangeClosed(51, 60), coll2, coll4);
8493
insertMany(rangeClosed(1, 70), coll3);
8594

8695
assertAll(
87-
() -> assertProduced(51, coll1),
88-
() -> assertProduced(60, coll2),
89-
() -> assertProduced(70, coll3),
90-
() -> assertProduced(10, coll4)
96+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll1),
97+
() -> assertProduced(createInserts(1, 60), coll2),
98+
() -> assertProduced(createInserts(1, 70), coll3),
99+
() -> assertProduced(createInserts(51, 60), coll4)
91100
);
92101
}
93102

94103
@Test
95104
@DisplayName("Ensure source loads data from MongoClient with copy existing data")
96105
void testSourceLoadsDataFromMongoClientWithCopyExisting() {
106+
Properties sourceProperties = new Properties();
107+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
108+
addSourceConnector(sourceProperties);
109+
97110
MongoDatabase db1 = getDatabaseWithPostfix();
98111
MongoDatabase db2 = getDatabaseWithPostfix();
99112
MongoDatabase db3 = getDatabaseWithPostfix();
@@ -104,24 +117,21 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
104117

105118
insertMany(rangeClosed(1, 50), coll1, coll2);
106119

107-
Properties sourceProperties = new Properties();
108-
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
109-
addSourceConnector(sourceProperties);
110-
111120
assertAll(
112-
() -> assertProduced(50, coll1),
113-
() -> assertProduced(50, coll2),
114-
() -> assertProduced(0, coll3));
121+
() -> assertProduced(createInserts(1, 50), coll1),
122+
() -> assertProduced(createInserts(1, 50), coll2),
123+
() -> assertProduced(emptyList(), coll3)
124+
);
115125

116126
db1.drop();
117127
insertMany(rangeClosed(51, 60), coll2, coll4);
118128
insertMany(rangeClosed(1, 70), coll3);
119129

120130
assertAll(
121-
() -> assertProduced(51, coll1),
122-
() -> assertProduced(60, coll2),
123-
() -> assertProduced(70, coll3),
124-
() -> assertProduced(10, coll4)
131+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll1),
132+
() -> assertProduced(createInserts(1, 60), coll2),
133+
() -> assertProduced(createInserts(1, 70), coll3),
134+
() -> assertProduced(createInserts(51, 60), coll4)
125135
);
126136
}
127137

@@ -145,9 +155,9 @@ void testSourceLoadsDataFromDatabase() {
145155
insertMany(rangeClosed(1, 50), coll1, coll2);
146156

147157
assertAll(
148-
() -> assertProduced(50, coll1),
149-
() -> assertProduced(50, coll2),
150-
() -> assertProduced(0, coll3)
158+
() -> assertProduced(createInserts(1, 50), coll1),
159+
() -> assertProduced(createInserts(1, 50), coll2),
160+
() -> assertProduced(emptyList(), coll3)
151161
);
152162

153163
// Update some of the collections
@@ -163,10 +173,10 @@ void testSourceLoadsDataFromDatabase() {
163173
insertMany(rangeClosed(21, 30), coll4);
164174

165175
assertAll(
166-
() -> assertProduced(51, coll1),
167-
() -> assertProduced(51, coll2),
168-
() -> assertProduced(21, coll3),
169-
() -> assertProduced(10, coll4)
176+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll1),
177+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll2),
178+
() -> assertProduced(concat(createInserts(1, 20), singletonList(createDropCollection())), coll3),
179+
() -> assertProduced(createInserts(21, 30), coll4)
170180
);
171181
}
172182
}
@@ -192,9 +202,9 @@ void testSourceLoadsDataFromDatabaseCopyExisting() {
192202
addSourceConnector(sourceProperties);
193203

194204
assertAll(
195-
() -> assertProduced(50, coll1),
196-
() -> assertProduced(50, coll2),
197-
() -> assertProduced(0, coll3)
205+
() -> assertProduced(createInserts(1, 50), coll1),
206+
() -> assertProduced(createInserts(1, 50), coll2),
207+
() -> assertProduced(emptyList(), coll3)
198208
);
199209

200210
// Update some of the collections
@@ -210,10 +220,10 @@ void testSourceLoadsDataFromDatabaseCopyExisting() {
210220
insertMany(rangeClosed(21, 30), coll4);
211221

212222
assertAll(
213-
() -> assertProduced(51, coll1),
214-
() -> assertProduced(51, coll2),
215-
() -> assertProduced(21, coll3),
216-
() -> assertProduced(10, coll4)
223+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll1),
224+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll2),
225+
() -> assertProduced(concat(createInserts(1, 20), singletonList(createDropCollection())), coll3),
226+
() -> assertProduced(createInserts(21, 30), coll4)
217227
);
218228
}
219229
}
@@ -226,24 +236,49 @@ void testSourceCanHandleNonExistentDatabaseAndSurviveDropping() throws Interrupt
226236
consumer.subscribe(pattern);
227237

228238
MongoDatabase db = getDatabaseWithPostfix();
229-
MongoCollection<Document> coll = db.getCollection("coll");
239+
MongoCollection<Document> coll1 = db.getCollection("coll1");
240+
MongoCollection<Document> coll2 = db.getCollection("coll2");
241+
MongoCollection<Document> coll3 = db.getCollection("coll3");
242+
db.drop();
230243

231244
Properties sourceProperties = new Properties();
232245
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, db.getName());
233246
addSourceConnector(sourceProperties);
234247

235248
Thread.sleep(5000);
236-
assertProduced(0, coll);
249+
assertAll(
250+
() -> assertProduced(emptyList(), coll1),
251+
() -> assertProduced(emptyList(), coll2),
252+
() -> assertProduced(emptyList(), coll3),
253+
() -> assertProduced(emptyList(), db.getName())
254+
);
255+
256+
insertMany(rangeClosed(1, 50), coll1, coll2);
257+
insertMany(rangeClosed(1, 1), coll3);
237258

238-
insertMany(rangeClosed(1, 100), coll);
239-
assertProduced(100, coll);
259+
assertAll(
260+
() -> assertProduced(createInserts(1, 50), coll1),
261+
() -> assertProduced(createInserts(1, 50), coll2),
262+
() -> assertProduced(singletonList(createInsert(1)), coll3),
263+
() -> assertProduced(emptyList(), db.getName())
264+
);
240265

241266
db.drop();
242-
assertProduced(101, coll);
267+
assertAll(
268+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll1),
269+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll2),
270+
() -> assertProduced(asList(createInsert(1), createDropCollection()), coll3),
271+
() -> assertProduced(singletonList(createDropDatabase()), db.getName())
272+
);
243273

244-
Thread.sleep(5000);
245-
insertMany(rangeClosed(1, 100), coll);
246-
assertProduced(201, coll);
274+
insertMany(rangeClosed(51, 100), coll1, coll2, coll3);
275+
276+
assertAll(
277+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection()), createInserts(51, 100)), coll1),
278+
() -> assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection()), createInserts(51, 100)), coll2),
279+
() -> assertProduced(concat(asList(createInsert(1), createDropCollection()), createInserts(51, 100)), coll3),
280+
() -> assertProduced(singletonList(createDropDatabase()), db.getName())
281+
);
247282
}
248283
}
249284

@@ -258,10 +293,10 @@ void testSourceLoadsDataFromCollection() {
258293
addSourceConnector(sourceProperties);
259294

260295
insertMany(rangeClosed(1, 100), coll);
261-
assertProduced(100, coll);
296+
assertProduced(createInserts(1, 100), coll);
262297

263298
coll.drop();
264-
assertProduced(101, coll);
299+
assertProduced(concat(createInserts(1, 100), singletonList(createDropCollection())), coll);
265300
}
266301

267302
@Test
@@ -277,13 +312,13 @@ void testSourceLoadsDataFromCollectionCopyExisting() {
277312
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
278313
addSourceConnector(sourceProperties);
279314

280-
assertProduced(50, coll);
315+
assertProduced(createInserts(1, 50), coll);
281316

282317
insertMany(rangeClosed(51, 100), coll);
283-
assertProduced(100, coll);
318+
assertProduced(createInserts(1, 100), coll);
284319

285320
coll.drop();
286-
assertProduced(101, coll);
321+
assertProduced(concat(createInserts(1, 100), singletonList(createDropCollection())), coll);
287322
}
288323

289324
@Test
@@ -297,20 +332,20 @@ void testSourceCanHandleNonExistentCollectionAndSurviveDropping() throws Interru
297332
addSourceConnector(sourceProperties);
298333

299334
Thread.sleep(5000);
300-
assertProduced(0, coll);
335+
assertProduced(emptyList(), coll);
301336

302337
insertMany(rangeClosed(1, 100), coll);
303-
assertProduced(100, coll);
338+
assertProduced(createInserts(1, 100), coll);
304339

305340
coll.drop();
306-
assertProduced(101, coll);
341+
assertProduced(concat(createInserts(1, 100), singletonList(createDropCollection())), coll);
307342

308-
insertMany(rangeClosed(1, 100), coll);
309-
assertProduced(201, coll);
343+
insertMany(rangeClosed(101, 200), coll);
344+
assertProduced(concat(createInserts(1, 100), singletonList(createDropCollection()), createInserts(101, 200)), coll);
310345
}
311346

312347
@Test
313-
@DisplayName("Ensure source can handle non existent collection and survive dropping")
348+
@DisplayName("Ensure source can handle a pipeline watching inserts on a non existent collection and survive dropping")
314349
void testSourceCanSurviveDroppingWithPipelineWatchingInsertsOnly() throws InterruptedException {
315350
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
316351

@@ -321,16 +356,16 @@ void testSourceCanSurviveDroppingWithPipelineWatchingInsertsOnly() throws Interr
321356
addSourceConnector(sourceProperties);
322357

323358
Thread.sleep(5000);
324-
assertProduced(0, coll);
359+
assertProduced(emptyList(), coll);
325360

326361
insertMany(rangeClosed(1, 50), coll);
327-
assertProduced(50, coll);
362+
assertProduced(createInserts(1, 50), coll);
328363

329364
coll.drop();
330365
Thread.sleep(5000);
331366

332-
insertMany(rangeClosed(1, 50), coll);
333-
assertProduced(100, coll);
367+
insertMany(rangeClosed(51, 100), coll);
368+
assertProduced(createInserts(1, 100), coll);
334369
}
335370

336371
@Test
@@ -347,13 +382,77 @@ void testSourceLoadsDataFromCollectionDocumentOnly() {
347382
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
348383
addSourceConnector(sourceProperties);
349384

350-
assertProduced(docs, coll);
385+
assertProducedDocs(docs, coll);
351386

352387
List<Document> allDocs = new ArrayList<>(docs);
353388
allDocs.addAll(insertMany(rangeClosed(51, 100), coll));
354389

355390
coll.drop();
356-
assertProduced(allDocs, coll);
391+
assertProducedDocs(allDocs, coll);
392+
}
393+
394+
@Test
395+
@DisplayName("Ensure source can survive a restart")
396+
void testSourceSurvivesARestart() {
397+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
398+
399+
Properties sourceProperties = new Properties();
400+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
401+
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
402+
addSourceConnector(sourceProperties);
403+
404+
insertMany(rangeClosed(1, 50), coll);
405+
assertProduced(createInserts(1, 50), coll);
406+
407+
insertMany(rangeClosed(51, 100), coll);
408+
restartConnector(sourceProperties);
409+
410+
assertProduced(concat(createInserts(1, 100)), coll);
411+
}
412+
413+
@Test
414+
@DisplayName("Ensure source can survive a restart with a drop")
415+
void testSourceSurvivesARestartWithDrop() {
416+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
417+
418+
Properties sourceProperties = new Properties();
419+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
420+
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
421+
addSourceConnector(sourceProperties);
422+
423+
insertMany(rangeClosed(1, 50), coll);
424+
assertProduced(createInserts(1, 50), coll);
425+
426+
coll.drop();
427+
assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection())), coll);
428+
429+
restartConnector(sourceProperties);
430+
insertMany(rangeClosed(51, 100), coll);
431+
432+
assertProduced(concat(createInserts(1, 50), singletonList(createDropCollection()), createInserts(51, 100)), coll);
433+
}
434+
435+
@Test
436+
@DisplayName("Ensure source can survive a restart with a drop when watching just inserts")
437+
void testSourceSurvivesARestartWithDropIncludingPipeline() {
438+
MongoCollection<Document> coll = getDatabaseWithPostfix().getCollection("coll");
439+
440+
Properties sourceProperties = new Properties();
441+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
442+
sourceProperties.put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
443+
sourceProperties.put(MongoSourceConfig.PIPELINE_CONFIG, "[{\"$match\": {\"operationType\": \"insert\"}}]");
444+
addSourceConnector(sourceProperties);
445+
446+
insertMany(rangeClosed(1, 50), coll);
447+
assertProduced(createInserts(1, 50), coll);
448+
449+
coll.drop();
450+
assertProduced(createInserts(1, 50), coll);
451+
452+
restartConnector(sourceProperties);
453+
insertMany(rangeClosed(51, 100), coll);
454+
455+
assertProduced(createInserts(1, 100), coll);
357456
}
358457

359458
private MongoDatabase getDatabaseWithPostfix() {

src/integrationTest/java/com/mongodb/kafka/connect/embedded/EmbeddedKafka.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ private Properties connectWorkerConfig() {
225225
workerProps.put("key.converter.schemas.enable", "false");
226226
workerProps.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
227227
workerProps.put("value.converter.schemas.enable", "false");
228-
workerProps.put(DistributedConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
228+
workerProps.put(DistributedConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "100");
229229
workerProps.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, createTempDirectory().getAbsolutePath() + "connect");
230230

231231
return workerProps;

0 commit comments

Comments
 (0)