|
21 | 21 | import com.mongodb.client.FindIterable;
|
22 | 22 | import com.mongodb.client.MongoCollection;
|
23 | 23 | import com.mongodb.client.MongoDatabase;
|
| 24 | +// imports required for change streams |
| 25 | +import com.mongodb.client.MongoCursor; |
| 26 | +import com.mongodb.client.model.changestream.ChangeStreamDocument; |
| 27 | +import com.mongodb.client.model.changestream.FullDocument; |
| 28 | +import org.bson.BsonDocument; |
| 29 | +// end required change streams imports |
24 | 30 | import org.bson.BsonType;
|
25 | 31 | import org.bson.Document;
|
26 | 32 | import org.junit.After;
|
|
29 | 35 | import java.util.ArrayList;
|
30 | 36 | import java.util.HashSet;
|
31 | 37 | import java.util.List;
|
| 38 | +import java.util.concurrent.atomic.AtomicBoolean; |
32 | 39 |
|
| 40 | +import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; |
| 41 | +import static com.mongodb.ClusterFixture.serverVersionAtLeast; |
33 | 42 | import static com.mongodb.Fixture.getDefaultDatabaseName;
|
34 | 43 | import static com.mongodb.Fixture.getMongoClient;
|
35 | 44 |
|
|
61 | 70 | import static java.util.Collections.singletonList;
|
62 | 71 | import static org.junit.Assert.assertEquals;
|
63 | 72 | import static org.junit.Assert.assertTrue;
|
| 73 | +import static org.junit.Assume.assumeTrue; |
64 | 74 |
|
65 | 75 |
|
66 | 76 | public final class DocumentationSamples extends DatabaseTestCase {
|
@@ -603,6 +613,52 @@ public void testDeletions() {
|
603 | 613 | assertEquals(0, collection.count());
|
604 | 614 | }
|
605 | 615 |
|
| 616 | + @Test |
| 617 | + public void testWatch() { |
| 618 | + assumeTrue(isDiscoverableReplicaSet() && serverVersionAtLeast(3, 5)); |
| 619 | + |
| 620 | + final MongoCollection<Document> inventory = collection; |
| 621 | + final AtomicBoolean stop = new AtomicBoolean(false); |
| 622 | + |
| 623 | + new Thread(new Runnable() { |
| 624 | + @Override |
| 625 | + public void run() { |
| 626 | + while (!stop.get()) { |
| 627 | + collection.insertOne(new Document()); |
| 628 | + try { |
| 629 | + Thread.sleep(10); |
| 630 | + } catch (InterruptedException e) { |
| 631 | + // ignore |
| 632 | + } |
| 633 | + } |
| 634 | + } |
| 635 | + }).start(); |
| 636 | + |
| 637 | + |
| 638 | + // Start Changestream Example 1 |
| 639 | + MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); |
| 640 | + ChangeStreamDocument<Document> next = cursor.next(); |
| 641 | + // End Changestream Example 1 |
| 642 | + |
| 643 | + cursor.close(); |
| 644 | + |
| 645 | + // Start Changestream Example 2 |
| 646 | + cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); |
| 647 | + next = cursor.next(); |
| 648 | + // End Changestream Example 2 |
| 649 | + |
| 650 | + cursor.close(); |
| 651 | + |
| 652 | + // Start Changestream Example 3 |
| 653 | + BsonDocument resumeToken = next.getResumeToken(); |
| 654 | + cursor = inventory.watch().resumeAfter(resumeToken).iterator(); |
| 655 | + next = cursor.next(); |
| 656 | + // End Changestream Example 3 |
| 657 | + |
| 658 | + cursor.close(); |
| 659 | + |
| 660 | + stop.set(true); |
| 661 | + } |
606 | 662 |
|
607 | 663 | @After
|
608 | 664 | public void tearDown() {
|
|
0 commit comments