Skip to content

Commit d848f58

Browse files
committed
Merge branch 'v1.4'
2 parents 457f8d6 + 234d565 commit d848f58

File tree

2 files changed

+45
-23
lines changed

2 files changed

+45
-23
lines changed

src/Operation/Watch.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
class Watch implements Executable, /* @internal */ CommandSubscriber
4949
{
50-
private static $wireVersionForOperationTime = 7;
50+
private static $wireVersionForStartAtOperationTime = 7;
5151

5252
const FULL_DOCUMENT_DEFAULT = 'default';
5353
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
@@ -272,9 +272,9 @@ private function createResumeCallable(Manager $manager)
272272
private function executeAggregate(Server $server)
273273
{
274274
/* If we've already captured an operation time or the server does not
275-
* support returning an operation time (e.g. MongoDB 3.6), execute the
276-
* aggregation directly and return its cursor. */
277-
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForOperationTime)) {
275+
* support resuming from an operation time (e.g. MongoDB 3.6), execute
276+
* the aggregation directly and return its cursor. */
277+
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
278278
return $this->aggregate->execute($server);
279279
}
280280

tests/Operation/WatchFunctionalTest.php

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
class WatchFunctionalTest extends FunctionalTestCase
2222
{
23+
private static $wireVersionForStartAtOperationTime = 7;
24+
2325
private $defaultOptions = ['maxAwaitTimeMS' => 500];
2426

2527
public function setUp()
@@ -127,6 +129,8 @@ function(array $event) use (&$commands) {
127129

128130
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
129131
{
132+
$this->skipIfStartAtOperationTimeNotSupported();
133+
130134
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
131135

132136
$operationTime = null;
@@ -143,7 +147,9 @@ function (array $event) use (&$events) {
143147

144148
$this->assertCount(1, $events);
145149
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
146-
$operationTime = $events[0]['succeeded']->getReply()->operationTime;
150+
$reply = $events[0]['succeeded']->getReply();
151+
$this->assertObjectHasAttribute('operationTime', $reply);
152+
$operationTime = $reply->operationTime;
147153
$this->assertInstanceOf(TimestampInterface::class, $operationTime);
148154

149155
$this->assertNull($changeStream->current());
@@ -391,20 +397,29 @@ public function testResumeMultipleTimesInSuccession()
391397

392398
$this->insertDocument(['_id' => 1]);
393399

400+
/* Insert a document and advance the change stream to ensure we capture
401+
* a resume token. This is necessary when startAtOperationTime is not
402+
* supported (i.e. 3.6 server version). */
403+
$changeStream->next();
404+
$this->assertTrue($changeStream->valid());
405+
$this->assertSame(0, $changeStream->key());
406+
407+
$this->insertDocument(['_id' => 2]);
408+
394409
/* Killing the cursor and advancing when there is a result will test
395410
* that next()'s resume attempt picks up the latest change. */
396411
$this->killChangeStreamCursor($changeStream);
397412

398413
$changeStream->next();
399414
$this->assertTrue($changeStream->valid());
400-
$this->assertSame(0, $changeStream->key());
415+
$this->assertSame(1, $changeStream->key());
401416

402417
$expectedResult = [
403418
'_id' => $changeStream->current()->_id,
404419
'operationType' => 'insert',
405-
'fullDocument' => ['_id' => 1],
420+
'fullDocument' => ['_id' => 2],
406421
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
407-
'documentKey' => ['_id' => 1],
422+
'documentKey' => ['_id' => 2],
408423
];
409424

410425
$this->assertMatchesDocument($expectedResult, $changeStream->current());
@@ -417,48 +432,48 @@ public function testResumeMultipleTimesInSuccession()
417432

418433
$changeStream->rewind();
419434
$this->assertTrue($changeStream->valid());
420-
$this->assertSame(0, $changeStream->key());
435+
$this->assertSame(1, $changeStream->key());
421436

422437
$expectedResult = [
423438
'_id' => $changeStream->current()->_id,
424439
'operationType' => 'insert',
425-
'fullDocument' => ['_id' => 1],
440+
'fullDocument' => ['_id' => 2],
426441
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
427-
'documentKey' => ['_id' => 1],
442+
'documentKey' => ['_id' => 2],
428443
];
429444

430445
$this->assertMatchesDocument($expectedResult, $changeStream->current());
431446

432-
$this->insertDocument(['_id' => 2]);
447+
$this->insertDocument(['_id' => 3]);
433448

434449
$changeStream->next();
435450
$this->assertTrue($changeStream->valid());
436-
$this->assertSame(1, $changeStream->key());
451+
$this->assertSame(2, $changeStream->key());
437452

438453
$expectedResult = [
439454
'_id' => $changeStream->current()->_id,
440455
'operationType' => 'insert',
441-
'fullDocument' => ['_id' => 2],
456+
'fullDocument' => ['_id' => 3],
442457
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
443-
'documentKey' => ['_id' => 2],
458+
'documentKey' => ['_id' => 3],
444459
];
445460

446461
$this->assertMatchesDocument($expectedResult, $changeStream->current());
447462

448463
$this->killChangeStreamCursor($changeStream);
449464

450-
$this->insertDocument(['_id' => 3]);
465+
$this->insertDocument(['_id' => 4]);
451466

452467
$changeStream->next();
453468
$this->assertTrue($changeStream->valid());
454-
$this->assertSame(2, $changeStream->key());
469+
$this->assertSame(3, $changeStream->key());
455470

456471
$expectedResult = [
457472
'_id' => $changeStream->current()->_id,
458473
'operationType' => 'insert',
459-
'fullDocument' => ['_id' => 3],
474+
'fullDocument' => ['_id' => 4],
460475
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
461-
'documentKey' => ['_id' => 3],
476+
'documentKey' => ['_id' => 4],
462477
];
463478

464479
$this->assertMatchesDocument($expectedResult, $changeStream->current());
@@ -469,18 +484,18 @@ public function testResumeMultipleTimesInSuccession()
469484
* we'll see {_id: 3} returned again. */
470485
$this->killChangeStreamCursor($changeStream);
471486

472-
$this->insertDocument(['_id' => 4]);
487+
$this->insertDocument(['_id' => 5]);
473488

474489
$changeStream->next();
475490
$this->assertTrue($changeStream->valid());
476-
$this->assertSame(3, $changeStream->key());
491+
$this->assertSame(4, $changeStream->key());
477492

478493
$expectedResult = [
479494
'_id' => $changeStream->current()->_id,
480495
'operationType' => 'insert',
481-
'fullDocument' => ['_id' => 4],
496+
'fullDocument' => ['_id' => 5],
482497
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
483-
'documentKey' => ['_id' => 4],
498+
'documentKey' => ['_id' => 5],
484499
];
485500

486501
$this->assertMatchesDocument($expectedResult, $changeStream->current());
@@ -954,4 +969,11 @@ private function killChangeStreamCursor(ChangeStream $changeStream)
954969
$operation = new DatabaseCommand($this->getDatabaseName(), $command);
955970
$operation->execute($this->getPrimaryServer());
956971
}
972+
973+
private function skipIfStartAtOperationTimeNotSupported()
974+
{
975+
if (!\MongoDB\server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime)) {
976+
$this->markTestSkipped('Operation time is not supported');
977+
}
978+
}
957979
}

0 commit comments

Comments
 (0)