@@ -9,7 +9,7 @@ use crate::{
9
9
bson:: { doc, Bson } ,
10
10
error:: { ErrorKind , Result } ,
11
11
options:: { ClientOptions , FindOptions , ServerApi , ServerApiVersion } ,
12
- test:: { TestClient , DEFAULT_URI , LOCK } ,
12
+ test:: { log_uncaptured , TestClient , DEFAULT_URI , LOCK } ,
13
13
Client ,
14
14
Collection ,
15
15
} ;
@@ -1441,7 +1441,6 @@ async fn versioned_api_examples() -> GenericResult<()> {
1441
1441
1442
1442
use std:: { error:: Error , result:: Result } ;
1443
1443
1444
- use crate :: test:: log_uncaptured;
1445
1444
// Start Versioned API Example 5
1446
1445
// With the `bson-chrono-0_4` feature enabled, this function can be dropped in favor of using
1447
1446
// `chrono::DateTime` values directly.
@@ -1755,6 +1754,81 @@ async fn index_examples() -> Result<()> {
1755
1754
Ok ( ( ) )
1756
1755
}
1757
1756
1757
+ async fn change_streams_examples ( ) -> Result < ( ) > {
1758
+ use crate :: { change_stream:: options:: FullDocumentType , options:: ChangeStreamOptions , RUNTIME } ;
1759
+ use std:: time:: Duration ;
1760
+
1761
+ let client = TestClient :: new ( ) . await ;
1762
+ if !client. is_replica_set ( ) && !client. is_sharded ( ) {
1763
+ log_uncaptured ( "skipping change_streams_examples due to unsupported topology" ) ;
1764
+ return Ok ( ( ) ) ;
1765
+ }
1766
+ let db = client. database ( "change_streams_examples" ) ;
1767
+ db. drop ( None ) . await ?;
1768
+ let inventory = db. collection :: < Document > ( "inventory" ) ;
1769
+ // Populate an item so the collection exists for the change stream to watch.
1770
+ inventory. insert_one ( doc ! { } , None ) . await ?;
1771
+
1772
+ // Background writer thread so that the `stream.next()` calls return something.
1773
+ let ( tx, mut rx) = tokio:: sync:: oneshot:: channel ( ) ;
1774
+ let writer_inventory = inventory. clone ( ) ;
1775
+ let handle = RUNTIME
1776
+ . spawn ( async move {
1777
+ let mut interval = RUNTIME . interval ( Duration :: from_millis ( 100 ) ) ;
1778
+ loop {
1779
+ tokio:: select! {
1780
+ _ = interval. tick( ) => {
1781
+ writer_inventory. insert_one( doc! { } , None ) . await ?;
1782
+ }
1783
+ _ = & mut rx => break ,
1784
+ }
1785
+ }
1786
+ Result :: Ok ( ( ) )
1787
+ } )
1788
+ . unwrap ( ) ;
1789
+
1790
+ #[ allow( unused_variables, unused_imports) ]
1791
+ {
1792
+ {
1793
+ // Start Changestream Example 1
1794
+ use futures:: stream:: TryStreamExt ;
1795
+ let mut stream = inventory. watch ( None , None ) . await ?;
1796
+ let next = stream. try_next ( ) . await ?;
1797
+ // End Changestream Example 1
1798
+ }
1799
+
1800
+ {
1801
+ // Start Changestream Example 2
1802
+ use futures:: stream:: TryStreamExt ;
1803
+ let options = ChangeStreamOptions :: builder ( )
1804
+ . full_document ( Some ( FullDocumentType :: UpdateLookup ) )
1805
+ . build ( ) ;
1806
+ let mut stream = inventory. watch ( None , options) . await ?;
1807
+ let next = stream. try_next ( ) . await ?;
1808
+ // End Changestream Example 2
1809
+ }
1810
+
1811
+ {
1812
+ let stream = inventory. watch ( None , None ) . await ?;
1813
+ // Start Changestream Example 3
1814
+ use futures:: stream:: TryStreamExt ;
1815
+ let resume_token = stream. resume_token ( ) ;
1816
+ let options = ChangeStreamOptions :: builder ( )
1817
+ . resume_after ( resume_token)
1818
+ . build ( ) ;
1819
+ let mut stream = inventory. watch ( None , options) . await ?;
1820
+ stream. try_next ( ) . await ?;
1821
+ // End Changestream Example 3
1822
+ }
1823
+ }
1824
+
1825
+ // Shut down the writer thread.
1826
+ let _ = tx. send ( ( ) ) ;
1827
+ handle. await ?;
1828
+
1829
+ Ok ( ( ) )
1830
+ }
1831
+
1758
1832
#[ cfg_attr( feature = "tokio-runtime" , tokio:: test) ]
1759
1833
#[ cfg_attr( feature = "async-std-runtime" , async_std:: test) ]
1760
1834
async fn test ( ) {
@@ -1780,4 +1854,5 @@ async fn test() {
1780
1854
aggregation_examples ( ) . await . unwrap ( ) ;
1781
1855
run_command_examples ( ) . await . unwrap ( ) ;
1782
1856
index_examples ( ) . await . unwrap ( ) ;
1857
+ change_streams_examples ( ) . await . unwrap ( ) ;
1783
1858
}
0 commit comments