Skip to content

Commit 428d217

Browse files
committed
PHPLIB-335: ChangeStream::next() should increment key even if ResumeTokenException is thrown
1 parent e847b06 commit 428d217

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

src/ChangeStream.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ public function next()
9595
try {
9696
$this->csIt->next();
9797
if ($this->valid()) {
98-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
9998
if ($this->hasAdvanced) {
10099
$this->key++;
101100
}
102101
$this->hasAdvanced = true;
102+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
103103
}
104104
} catch (RuntimeException $e) {
105105
if (strpos($e->getMessage(), "not master") !== false) {
@@ -127,8 +127,8 @@ public function rewind()
127127
try {
128128
$this->csIt->rewind();
129129
if ($this->valid()) {
130-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
131130
$this->hasAdvanced = true;
131+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
132132
}
133133
} catch (RuntimeException $e) {
134134
if (strpos($e->getMessage(), "not master") !== false) {

tests/Operation/WatchFunctionalTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use MongoDB\Driver\ReadPreference;
88
use MongoDB\Driver\Server;
99
use MongoDB\Driver\Exception\ConnectionTimeoutException;
10+
use MongoDB\Exception\ResumeTokenException;
1011
use MongoDB\Operation\DatabaseCommand;
1112
use MongoDB\Operation\InsertOne;
1213
use MongoDB\Operation\Watch;
@@ -552,6 +553,41 @@ public function testNextAdvancesKey()
552553
$this->assertSame(1, $changeStream->key());
553554
}
554555

556+
public function testResumeTokenNotFoundAdvancesKey()
557+
{
558+
$pipeline = [['$project' => ['_id' => 0 ]]];
559+
560+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
561+
$changeStream = $operation->execute($this->getPrimaryServer());
562+
563+
/* Note: we intentionally do not start iteration with rewind() to ensure
564+
* that we test extraction functionality within next(). */
565+
$this->insertDocument(['x' => 1]);
566+
$this->insertDocument(['x' => 2]);
567+
$this->insertDocument(['x' => 3]);
568+
569+
try {
570+
$changeStream->rewind();
571+
$this->fail('ResumeTokenException was not thrown');
572+
} catch (ResumeTokenException $e) {}
573+
574+
$this->assertSame(0, $changeStream->key());
575+
576+
try {
577+
$changeStream->next();
578+
$this->fail('ResumeTokenException was not thrown');
579+
} catch (ResumeTokenException $e) {}
580+
581+
$this->assertSame(1, $changeStream->key());
582+
583+
try {
584+
$changeStream->next();
585+
$this->fail('ResumeTokenException was not thrown');
586+
} catch (ResumeTokenException $e) {}
587+
588+
$this->assertSame(2, $changeStream->key());
589+
}
590+
555591
private function insertDocument($document)
556592
{
557593
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

0 commit comments

Comments
 (0)