2
2
3
3
namespace MongoDB \Tests \Operation ;
4
4
5
+ use MongoDB \ChangeStream ;
5
6
use MongoDB \Client ;
6
7
use MongoDB \Driver \Manager ;
7
8
use MongoDB \Driver \ReadPreference ;
@@ -52,8 +53,7 @@ public function testResume()
52
53
53
54
$ this ->assertSameDocument ($ expectedResult , $ changeStream ->current ());
54
55
55
- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
56
- $ operation ->execute ($ this ->getPrimaryServer ());
56
+ $ this ->killChangeStreamCursor ($ changeStream );
57
57
58
58
$ this ->insertDocument (['_id ' => 3 , 'x ' => 'baz ' ]);
59
59
@@ -150,8 +150,7 @@ public function testNoChangeAfterResumeBeforeInsert()
150
150
151
151
$ this ->assertSameDocument ($ expectedResult , $ changeStream ->current ());
152
152
153
- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
154
- $ operation ->execute ($ this ->getPrimaryServer ());
153
+ $ this ->killChangeStreamCursor ($ changeStream );
155
154
156
155
$ changeStream ->next ();
157
156
$ this ->assertNull ($ changeStream ->current ());
@@ -176,8 +175,7 @@ public function testResumeAfterKillThenNoOperations()
176
175
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
177
176
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
178
177
179
- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
180
- $ operation ->execute ($ this ->getPrimaryServer ());
178
+ $ this ->killChangeStreamCursor ($ changeStream );
181
179
182
180
$ changeStream ->next ();
183
181
$ this ->assertNull ($ changeStream ->current ());
@@ -188,8 +186,7 @@ public function testResumeAfterKillThenOperation()
188
186
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
189
187
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
190
188
191
- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
192
- $ operation ->execute ($ this ->getPrimaryServer ());
189
+ $ this ->killChangeStreamCursor ($ changeStream );
193
190
194
191
$ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
195
192
@@ -214,8 +211,7 @@ public function testKey()
214
211
$ changeStream ->next ();
215
212
$ this ->assertNull ($ changeStream ->key ());
216
213
217
- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), ["killCursors " => $ this ->getCollectionName (), "cursors " => [$ changeStream ->getCursorId ()]]);
218
- $ operation ->execute ($ this ->getPrimaryServer ());
214
+ $ this ->killChangeStreamCursor ($ changeStream );
219
215
220
216
$ changeStream ->next ();
221
217
$ this ->assertNull ($ changeStream ->key ());
@@ -318,4 +314,15 @@ private function insertDocument($document)
318
314
$ writeResult = $ insertOne ->execute ($ this ->getPrimaryServer ());
319
315
$ this ->assertEquals (1 , $ writeResult ->getInsertedCount ());
320
316
}
317
+
318
+ private function killChangeStreamCursor (ChangeStream $ changeStream )
319
+ {
320
+ $ command = [
321
+ 'killCursors ' => $ this ->getCollectionName (),
322
+ 'cursors ' => [ $ changeStream ->getCursorId () ],
323
+ ];
324
+
325
+ $ operation = new DatabaseCommand ($ this ->getDatabaseName (), $ command );
326
+ $ operation ->execute ($ this ->getPrimaryServer ());
327
+ }
321
328
}
0 commit comments