3
3
namespace MongoDB \Tests \Operation ;
4
4
5
5
use MongoDB \Client ;
6
- use MongoDB \Collection ;
7
6
use MongoDB \Operation \DatabaseCommand ;
7
+ use MongoDB \Operation \InsertOne ;
8
+ use MongoDB \Operation \Watch ;
8
9
9
10
class WatchFunctionalTest extends FunctionalTestCase
10
11
{
11
12
public function setUp ()
12
13
{
13
14
parent ::setUp ();
15
+
14
16
if (version_compare ($ this ->getFeatureCompatibilityVersion (), '3.6 ' , '< ' )) {
15
17
$ this ->markTestSkipped ('$changeStream is only supported on FCV 3.6 or higher ' );
16
18
}
17
19
}
18
20
19
21
public function testResume ()
20
22
{
21
- $ this ->collection = new Collection ( $ this -> manager , $ this -> getDatabaseName (), $ this -> getCollectionName () );
23
+ $ this ->insertDocument ([ ' _id ' => 1 , ' x ' => ' foo ' ] );
22
24
23
- $ result = $ this ->collection ->insertOne (['x ' => 1 ]);
24
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
25
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
25
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
26
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
26
27
27
- $ changeStream = $ this ->collection ->watch ();
28
28
$ changeStream ->rewind ();
29
29
$ this ->assertNull ($ changeStream ->current ());
30
30
31
- $ result = $ this ->collection ->insertOne (['x ' => 2 ]);
32
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
33
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
31
+ $ this ->insertDocument (['_id ' => 2 , 'x ' => 'bar ' ]);
34
32
35
33
$ changeStream ->next ();
36
34
$ expectedResult = (object ) ([
37
35
'_id ' => $ changeStream ->current ()->_id ,
38
36
'operationType ' => 'insert ' ,
39
- 'fullDocument ' => (object ) ['_id ' => $ result -> getInsertedId () , 'x ' => 2 ],
37
+ 'fullDocument ' => (object ) ['_id ' => 2 , 'x ' => ' bar ' ],
40
38
'ns ' => (object ) ['db ' => 'phplib_test ' , 'coll ' => 'WatchFunctionalTest.e68b9f01 ' ],
41
- 'documentKey ' => (object ) ['_id ' => $ result -> getInsertedId () ]
39
+ 'documentKey ' => (object ) ['_id ' => 2 ]
42
40
]);
43
41
$ this ->assertEquals ($ changeStream ->current (), $ expectedResult );
44
42
45
43
$ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
46
44
$ operation ->execute ($ this ->getPrimaryServer ());
47
45
48
- $ result = $ this ->collection ->insertOne (['x ' => 3 ]);
49
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
50
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
46
+ $ this ->insertDocument (['_id ' => 3 , 'x ' => 'baz ' ]);
51
47
52
48
$ changeStream ->next ();
53
49
$ expectedResult = (object ) ([
54
50
'_id ' => $ changeStream ->current ()->_id ,
55
51
'operationType ' => 'insert ' ,
56
- 'fullDocument ' => (object ) ['_id ' => $ result -> getInsertedId () , 'x ' => 3 ],
52
+ 'fullDocument ' => (object ) ['_id ' => 3 , 'x ' => ' baz ' ],
57
53
'ns ' => (object ) ['db ' => 'phplib_test ' , 'coll ' => 'WatchFunctionalTest.e68b9f01 ' ],
58
- 'documentKey ' => (object ) ['_id ' => $ result -> getInsertedId () ]
54
+ 'documentKey ' => (object ) ['_id ' => 3 ]
59
55
]);
60
56
$ this ->assertEquals ($ changeStream ->current (), $ expectedResult );
61
57
}
62
58
63
59
public function testNoChangeAfterResumeBeforeInsert ()
64
60
{
65
- $ this ->collection = new Collection ( $ this -> manager , $ this -> getDatabaseName (), $ this -> getCollectionName () );
61
+ $ this ->insertDocument ([ ' _id ' => 1 , ' x ' => ' foo ' ] );
66
62
67
- $ result = $ this ->collection ->insertOne (['x ' => 1 ]);
68
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
69
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
63
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
64
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
70
65
71
- $ changeStream = $ this ->collection ->watch ();
72
66
$ changeStream ->rewind ();
73
67
$ this ->assertNull ($ changeStream ->current ());
74
68
75
- $ result = $ this ->collection ->insertOne (['x ' => 2 ]);
76
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
77
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
69
+ $ this ->insertDocument (['_id ' => 2 , 'x ' => 'bar ' ]);
78
70
79
71
$ changeStream ->next ();
80
72
$ expectedResult = (object ) ([
81
73
'_id ' => $ changeStream ->current ()->_id ,
82
74
'operationType ' => 'insert ' ,
83
- 'fullDocument ' => (object ) ['_id ' => $ result -> getInsertedId () , 'x ' => 2 ],
75
+ 'fullDocument ' => (object ) ['_id ' => 2 , 'x ' => ' bar ' ],
84
76
'ns ' => (object ) ['db ' => 'phplib_test ' , 'coll ' => 'WatchFunctionalTest.4a554985 ' ],
85
- 'documentKey ' => (object ) ['_id ' => $ result -> getInsertedId () ]
77
+ 'documentKey ' => (object ) ['_id ' => 2 ]
86
78
]);
87
79
$ this ->assertEquals ($ changeStream ->current (), $ expectedResult );
88
80
@@ -92,26 +84,23 @@ public function testNoChangeAfterResumeBeforeInsert()
92
84
$ changeStream ->next ();
93
85
$ this ->assertNull ($ changeStream ->current ());
94
86
95
- $ result = $ this ->collection ->insertOne (['x ' => 3 ]);
96
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
97
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
87
+ $ this ->insertDocument (['_id ' => 3 , 'x ' => 'baz ' ]);
98
88
99
89
$ changeStream ->next ();
100
90
$ expectedResult = (object ) ([
101
91
'_id ' => $ changeStream ->current ()->_id ,
102
92
'operationType ' => 'insert ' ,
103
- 'fullDocument ' => (object ) ['_id ' => $ result -> getInsertedId () , 'x ' => 3 ],
93
+ 'fullDocument ' => (object ) ['_id ' => 3 , 'x ' => ' baz ' ],
104
94
'ns ' => (object ) ['db ' => 'phplib_test ' , 'coll ' => 'WatchFunctionalTest.4a554985 ' ],
105
- 'documentKey ' => (object ) ['_id ' => $ result -> getInsertedId () ]
95
+ 'documentKey ' => (object ) ['_id ' => 3 ]
106
96
]);
107
97
$ this ->assertEquals ($ changeStream ->current (), $ expectedResult );
108
98
}
109
99
110
100
public function testResumeAfterKillThenNoOperations ()
111
101
{
112
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
113
-
114
- $ changeStream = $ this ->collection ->watch ();
102
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
103
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
115
104
116
105
$ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
117
106
$ operation ->execute ($ this ->getPrimaryServer ());
@@ -122,32 +111,26 @@ public function testResumeAfterKillThenNoOperations()
122
111
123
112
public function testResumeAfterKillThenOperation ()
124
113
{
125
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
126
-
127
- $ changeStream = $ this ->collection ->watch ();
114
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
115
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
128
116
129
117
$ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
130
118
$ operation ->execute ($ this ->getPrimaryServer ());
131
119
132
- $ result = $ this ->collection ->insertOne (['x ' => 3 ]);
133
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
134
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
120
+ $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
135
121
136
122
$ changeStream ->next ();
137
123
$ this ->assertNull ($ changeStream ->current ());
138
124
}
139
125
140
126
public function testKey ()
141
127
{
142
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
143
-
144
- $ changeStream = $ this ->collection ->watch ();
128
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
129
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
145
130
146
131
$ this ->assertNull ($ changeStream ->key ());
147
132
148
- $ result = $ this ->collection ->insertOne (['x ' => 1 ]);
149
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
150
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
133
+ $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
151
134
152
135
$ changeStream ->next ();
153
136
$ this ->assertSame (1 , $ changeStream ->key ());
@@ -163,24 +146,20 @@ public function testKey()
163
146
$ changeStream ->next ();
164
147
$ this ->assertNull ($ changeStream ->key ());
165
148
166
- $ result = $ this ->collection ->insertOne (['x ' => 2 ]);
167
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
168
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
149
+ $ this ->insertDocument (['_id ' => 2 , 'x ' => 'bar ' ]);
169
150
170
151
$ changeStream ->next ();
171
152
$ this ->assertSame (2 , $ changeStream ->key ());
172
153
}
173
154
174
155
public function testNonEmptyPipeline ()
175
156
{
176
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
177
-
178
157
$ pipeline = [['$project ' => ['foo ' => [0 ]]]];
179
- $ changeStream = $ this ->collection ->watch ($ pipeline , []);
180
158
181
- $ result = $ this ->collection ->insertOne (['x ' => 1 ]);
182
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
183
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
159
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline );
160
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
161
+
162
+ $ this ->insertDocument (['_id ' => 1 ]);
184
163
185
164
$ changeStream ->next ();
186
165
$ expectedResult = (object ) ([
@@ -192,9 +171,9 @@ public function testNonEmptyPipeline()
192
171
193
172
public function testCursorWithEmptyBatchNotClosed ()
194
173
{
195
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
174
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
175
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
196
176
197
- $ changeStream = $ this ->collection ->watch ();
198
177
$ this ->assertNotNull ($ changeStream );
199
178
}
200
179
@@ -203,14 +182,12 @@ public function testCursorWithEmptyBatchNotClosed()
203
182
*/
204
183
public function testFailureAfterResumeTokenRemoved ()
205
184
{
206
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
207
-
208
185
$ pipeline = [['$project ' => ['_id ' => 0 ]]];
209
- $ changeStream = $ this ->collection ->watch ($ pipeline , []);
210
186
211
- $ result = $ this ->collection ->insertOne (['x ' => 1 ]);
212
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
213
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
187
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline );
188
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
189
+
190
+ $ this ->insertDocument (['x ' => 1 ]);
214
191
215
192
$ changeStream ->next ();
216
193
}
@@ -223,29 +200,28 @@ public function testConnectionException()
223
200
$ changeStream = $ collection ->watch ();
224
201
$ changeStream ->next ();
225
202
226
- $ result = $ collection ->insertOne (['x ' => 1 ]);
227
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
228
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
203
+ $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
229
204
230
205
$ changeStream ->next ();
231
206
$ expectedResult = (object ) ([
232
207
'_id ' => $ changeStream ->current ()->_id ,
233
208
'operationType ' => 'insert ' ,
234
- 'fullDocument ' => (object ) ['_id ' => $ result -> getInsertedId () , 'x ' => 1 ],
209
+ 'fullDocument ' => (object ) ['_id ' => 1 , 'x ' => ' foo ' ],
235
210
'ns ' => (object ) ['db ' => 'phplib_test ' , 'coll ' => 'WatchFunctionalTest.226d95f1 ' ],
236
- 'documentKey ' => (object ) ['_id ' => $ result -> getInsertedId () ]
211
+ 'documentKey ' => (object ) ['_id ' => 1 ]
237
212
]);
238
213
$ this ->assertEquals ($ changeStream ->current (), $ expectedResult );
239
214
}
240
215
241
216
public function testMaxAwaitTimeMS ()
242
217
{
243
- $ this ->collection = new Collection ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName ());
244
218
/* On average, an acknowledged write takes about 20 ms to appear in a
245
219
* change stream on the server so we'll use a higher maxAwaitTimeMS to
246
220
* ensure we see the write. */
247
221
$ maxAwaitTimeMS = 100 ;
248
- $ changeStream = $ this ->collection ->watch ([], ['maxAwaitTimeMS ' => $ maxAwaitTimeMS ]);
222
+
223
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => $ maxAwaitTimeMS ]);
224
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
249
225
250
226
/* The initial change stream is empty so we should expect a delay when
251
227
* we call rewind, since it issues a getMore. Expect to wait at least
@@ -272,14 +248,19 @@ public function testMaxAwaitTimeMS()
272
248
273
249
/* After inserting a document, the change stream will not issue a
274
250
* getMore so we should not expect a delay. */
275
- $ result = $ this ->collection ->insertOne (['_id ' => 1 ]);
276
- $ this ->assertInstanceOf ('MongoDB\InsertOneResult ' , $ result );
277
- $ this ->assertSame (1 , $ result ->getInsertedCount ());
251
+ $ this ->insertDocument (['_id ' => 1 ]);
278
252
279
253
$ startTime = microtime (true );
280
254
$ changeStream ->next ();
281
255
$ duration = microtime (true ) - $ startTime ;
282
256
$ this ->assertLessThan ($ maxAwaitTimeMS * 0.001 , $ duration );
283
257
$ this ->assertTrue ($ changeStream ->valid ());
284
258
}
259
+
260
+ private function insertDocument ($ document )
261
+ {
262
+ $ insertOne = new InsertOne ($ this ->getDatabaseName (), $ this ->getCollectionName (), $ document );
263
+ $ writeResult = $ insertOne ->execute ($ this ->getPrimaryServer ());
264
+ $ this ->assertEquals (1 , $ writeResult ->getInsertedCount ());
265
+ }
285
266
}
0 commit comments