19
19
using MongoDB . Driver . Core . TestHelpers . XunitExtensions ;
20
20
using MongoDB . Driver . Tests ;
21
21
using System ;
22
+ using System . Linq ;
22
23
using System . Threading ;
23
24
using System . Threading . Tasks ;
24
25
using Xunit ;
@@ -44,10 +45,10 @@ public void ChangeStreamExample1()
44
45
. Start ( ) ;
45
46
46
47
// Start Changestream Example 1
47
- var enumerator = inventory . Watch ( ) . ToEnumerable ( ) . GetEnumerator ( ) ;
48
- enumerator . MoveNext ( ) ;
49
- var next = enumerator . Current ;
50
- enumerator . Dispose ( ) ;
48
+ var cursor = inventory . Watch ( ) ;
49
+ while ( cursor . MoveNext ( ) && cursor . Current . Count ( ) == 0 ) { } // keep calling MoveNext until we've read the first batch
50
+ var next = cursor . Current . First ( ) ;
51
+ cursor . Dispose ( ) ;
51
52
// End Changestream Example 1
52
53
53
54
next . FullDocument . Should ( ) . Be ( document ) ;
@@ -74,10 +75,10 @@ public void ChangeStreamExample2()
74
75
75
76
// Start Changestream Example 2
76
77
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption . UpdateLookup } ;
77
- var enumerator = inventory . Watch ( options ) . ToEnumerable ( ) . GetEnumerator ( ) ;
78
- enumerator . MoveNext ( ) ;
79
- var next = enumerator . Current ;
80
- enumerator . Dispose ( ) ;
78
+ var cursor = inventory . Watch ( options ) ;
79
+ while ( cursor . MoveNext ( ) && cursor . Current . Count ( ) == 0 ) { } // keep calling MoveNext until we've read the first batch
80
+ var next = cursor . Current . First ( ) ;
81
+ cursor . Dispose ( ) ;
81
82
// End Changestream Example 2
82
83
83
84
var expectedFullDocument = document . Set ( "x" , 2 ) ;
@@ -98,7 +99,7 @@ public void ChangeStreamExample3()
98
99
new BsonDocument ( "x" , 2 )
99
100
} ;
100
101
101
- ChangeStreamDocument < BsonDocument > lastChangeStreamDocument ;
102
+ IChangeStreamCursor < ChangeStreamDocument < BsonDocument > > previousCursor ;
102
103
{
103
104
new Thread ( ( ) =>
104
105
{
@@ -107,19 +108,18 @@ public void ChangeStreamExample3()
107
108
} )
108
109
. Start ( ) ;
109
110
110
- var enumerator = inventory . Watch ( ) . ToEnumerable ( ) . GetEnumerator ( ) ;
111
- enumerator . MoveNext ( ) ;
112
- lastChangeStreamDocument = enumerator . Current ;
111
+ previousCursor = inventory . Watch ( new ChangeStreamOptions { BatchSize = 1 } ) ;
112
+ while ( previousCursor . MoveNext ( ) && previousCursor . Current . Count ( ) == 0 ) { } // keep calling MoveNext until we've read the first batch
113
113
}
114
114
115
115
{
116
116
// Start Changestream Example 3
117
- var resumeToken = lastChangeStreamDocument . ResumeToken ;
117
+ var resumeToken = previousCursor . GetResumeToken ( ) ;
118
118
var options = new ChangeStreamOptions { ResumeAfter = resumeToken } ;
119
- var enumerator = inventory . Watch ( options ) . ToEnumerable ( ) . GetEnumerator ( ) ;
120
- enumerator . MoveNext ( ) ;
121
- var next = enumerator . Current ;
122
- enumerator . Dispose ( ) ;
119
+ var cursor = inventory . Watch ( options ) ;
120
+ cursor . MoveNext ( ) ;
121
+ var next = cursor . Current . First ( ) ;
122
+ cursor . Dispose ( ) ;
123
123
// End Changestream Example 3
124
124
125
125
next . FullDocument . Should ( ) . Be ( documents [ 1 ] ) ;
@@ -161,15 +161,10 @@ public void ChangestreamExample4()
161
161
"{ $addFields : { newField : 'this is an added field!' } }" ) ;
162
162
163
163
var collection = database . GetCollection < BsonDocument > ( "inventory" ) ;
164
- using ( var changeStream = collection . Watch ( pipeline ) )
164
+ using ( var cursor = collection . Watch ( pipeline ) )
165
165
{
166
- using ( var enumerator = changeStream . ToEnumerable ( ) . GetEnumerator ( ) )
167
- {
168
- if ( enumerator . MoveNext ( ) )
169
- {
170
- var next = enumerator . Current ;
171
- }
172
- }
166
+ while ( cursor . MoveNext ( ) && cursor . Current . Count ( ) == 0 ) { } // keep calling MoveNext until we've read the first batch
167
+ var next = cursor . Current . First ( ) ;
173
168
}
174
169
// End Changestream Example 4
175
170
}
0 commit comments