17
17
18
18
class WatchFunctionalTest extends FunctionalTestCase
19
19
{
20
+ private $ defaultOptions = ['maxAwaitTimeMS ' => 500 ];
21
+
20
22
public function setUp ()
21
23
{
22
24
parent ::setUp ();
@@ -34,7 +36,7 @@ public function testNextResumesAfterCursorNotFound()
34
36
{
35
37
$ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
36
38
37
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
39
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
38
40
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
39
41
40
42
$ changeStream ->rewind ();
@@ -81,7 +83,7 @@ public function testNextResumesAfterConnectionException()
81
83
$ manager = new Manager ($ this ->getUri (), ['socketTimeoutMS ' => 50 ]);
82
84
$ primaryServer = $ manager ->selectServer (new ReadPreference (ReadPreference::RP_PRIMARY ));
83
85
84
- $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
86
+ $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
85
87
$ changeStream = $ operation ->execute ($ primaryServer );
86
88
87
89
/* Note: we intentionally do not start iteration with rewind() to ensure
@@ -134,7 +136,7 @@ public function testRewindResumesAfterConnectionException()
134
136
$ manager = new Manager ($ this ->getUri (), ['socketTimeoutMS ' => 50 ]);
135
137
$ primaryServer = $ manager ->selectServer (new ReadPreference (ReadPreference::RP_PRIMARY ));
136
138
137
- $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
139
+ $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
138
140
$ changeStream = $ operation ->execute ($ primaryServer );
139
141
140
142
$ commands = [];
@@ -180,7 +182,7 @@ public function testNoChangeAfterResumeBeforeInsert()
180
182
{
181
183
$ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
182
184
183
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
185
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
184
186
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
185
187
186
188
$ changeStream ->rewind ();
@@ -225,7 +227,7 @@ public function testNoChangeAfterResumeBeforeInsert()
225
227
226
228
public function testKey ()
227
229
{
228
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
230
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
229
231
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
230
232
231
233
$ this ->assertFalse ($ changeStream ->valid ());
@@ -262,7 +264,7 @@ public function testNonEmptyPipeline()
262
264
{
263
265
$ pipeline = [['$project ' => ['foo ' => [0 ]]]];
264
266
265
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
267
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
266
268
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
267
269
268
270
$ this ->insertDocument (['_id ' => 1 ]);
@@ -313,7 +315,7 @@ public function testNextResumeTokenNotFound()
313
315
{
314
316
$ pipeline = [['$project ' => ['_id ' => 0 ]]];
315
317
316
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
318
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
317
319
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
318
320
319
321
/* Note: we intentionally do not start iteration with rewind() to ensure
@@ -331,7 +333,7 @@ public function testRewindResumeTokenNotFound()
331
333
{
332
334
$ pipeline = [['$project ' => ['_id ' => 0 ]]];
333
335
334
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
336
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
335
337
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
336
338
337
339
$ this ->insertDocument (['x ' => 1 ]);
@@ -347,7 +349,7 @@ public function testNextResumeTokenInvalidType()
347
349
{
348
350
$ pipeline = [['$project ' => ['_id ' => ['$literal ' => 'foo ' ]]]];
349
351
350
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
352
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
351
353
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
352
354
353
355
/* Note: we intentionally do not start iteration with rewind() to ensure
@@ -365,7 +367,7 @@ public function testRewindResumeTokenInvalidType()
365
367
{
366
368
$ pipeline = [['$project ' => ['_id ' => ['$literal ' => 'foo ' ]]]];
367
369
368
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , [ ' maxAwaitTimeMS ' => 100 ] );
370
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this -> defaultOptions );
369
371
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
370
372
371
373
$ this ->insertDocument (['x ' => 1 ]);
@@ -378,13 +380,18 @@ public function testMaxAwaitTimeMS()
378
380
/* On average, an acknowledged write takes about 20 ms to appear in a
379
381
* change stream on the server so we'll use a higher maxAwaitTimeMS to
380
382
* ensure we see the write. */
381
- $ maxAwaitTimeMS = 100 ;
383
+ $ maxAwaitTimeMS = 500 ;
382
384
383
385
/* Calculate an approximate pivot to use for time assertions. We will
384
386
* assert that the duration of blocking responses is greater than this
385
387
* value, and vice versa. */
386
388
$ pivot = ($ maxAwaitTimeMS * 0.001 ) * 0.9 ;
387
389
390
+ /* Calculate an approximate upper bound to use for time assertions. We
391
+ * will assert that the duration of blocking responses is less than this
392
+ * value. */
393
+ $ upperBound = ($ maxAwaitTimeMS * 0.001 ) * 1.5 ;
394
+
388
395
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => $ maxAwaitTimeMS ]);
389
396
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
390
397
@@ -397,7 +404,7 @@ public function testMaxAwaitTimeMS()
397
404
$ changeStream ->rewind ();
398
405
$ duration = microtime (true ) - $ startTime ;
399
406
$ this ->assertGreaterThan ($ pivot , $ duration );
400
- $ this ->assertLessThan (0.5 , $ duration );
407
+ $ this ->assertLessThan ($ upperBound , $ duration );
401
408
402
409
$ this ->assertFalse ($ changeStream ->valid ());
403
410
@@ -407,7 +414,7 @@ public function testMaxAwaitTimeMS()
407
414
$ changeStream ->next ();
408
415
$ duration = microtime (true ) - $ startTime ;
409
416
$ this ->assertGreaterThan ($ pivot , $ duration );
410
- $ this ->assertLessThan (0.5 , $ duration );
417
+ $ this ->assertLessThan ($ upperBound , $ duration );
411
418
412
419
$ this ->assertFalse ($ changeStream ->valid ());
413
420
@@ -424,7 +431,7 @@ public function testMaxAwaitTimeMS()
424
431
425
432
public function testRewindResumesAfterCursorNotFound ()
426
433
{
427
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
434
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
428
435
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
429
436
430
437
$ this ->killChangeStreamCursor ($ changeStream );
@@ -436,7 +443,7 @@ public function testRewindResumesAfterCursorNotFound()
436
443
437
444
public function testRewindExtractsResumeTokenAndNextResumes ()
438
445
{
439
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], [ ' maxAwaitTimeMS ' => 100 ] );
446
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this -> defaultOptions );
440
447
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
441
448
442
449
$ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
@@ -473,7 +480,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
473
480
*/
474
481
public function testTypeMapOption (array $ typeMap , $ expectedChangeDocument )
475
482
{
476
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => 100 , ' typeMap ' => $ typeMap ]);
483
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['typeMap ' => $ typeMap ] + $ this -> defaultOptions );
477
484
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
478
485
479
486
$ changeStream ->rewind ();
0 commit comments