Skip to content

Commit 99453ef

Browse files
committed
Merge pull request #510
2 parents d42c3b4 + 428d217 commit 99453ef

File tree

2 files changed

+60
-5
lines changed

2 files changed

+60
-5
lines changed

src/ChangeStream.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class ChangeStream implements Iterator
3838
private $resumeToken;
3939
private $resumeCallable;
4040
private $csIt;
41-
private $key;
41+
private $key = 0;
42+
private $hasAdvanced = false;
4243

4344
const CURSOR_NOT_FOUND = 43;
4445

@@ -53,8 +54,6 @@ public function __construct(Cursor $cursor, callable $resumeCallable)
5354
{
5455
$this->resumeCallable = $resumeCallable;
5556
$this->csIt = new IteratorIterator($cursor);
56-
57-
$this->key = 0;
5857
}
5958

6059
/**
@@ -96,8 +95,11 @@ public function next()
9695
try {
9796
$this->csIt->next();
9897
if ($this->valid()) {
98+
if ($this->hasAdvanced) {
99+
$this->key++;
100+
}
101+
$this->hasAdvanced = true;
99102
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
100-
$this->key++;
101103
}
102104
} catch (RuntimeException $e) {
103105
if (strpos($e->getMessage(), "not master") !== false) {
@@ -125,6 +127,7 @@ public function rewind()
125127
try {
126128
$this->csIt->rewind();
127129
if ($this->valid()) {
130+
$this->hasAdvanced = true;
128131
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
129132
}
130133
} catch (RuntimeException $e) {

tests/Operation/WatchFunctionalTest.php

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace MongoDB\Tests\Operation;
44

55
use MongoDB\ChangeStream;
6-
use MongoDB\Client;
76
use MongoDB\Driver\Manager;
87
use MongoDB\Driver\ReadPreference;
98
use MongoDB\Driver\Server;
109
use MongoDB\Driver\Exception\ConnectionTimeoutException;
10+
use MongoDB\Exception\ResumeTokenException;
1111
use MongoDB\Operation\DatabaseCommand;
1212
use MongoDB\Operation\InsertOne;
1313
use MongoDB\Operation\Watch;
@@ -536,6 +536,58 @@ public function provideTypeMapOptionsAndExpectedChangeDocument()
536536
];
537537
}
538538

539+
public function testNextAdvancesKey()
540+
{
541+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
542+
$changeStream = $operation->execute($this->getPrimaryServer());
543+
544+
$this->insertDocument(['x' => 1]);
545+
$this->insertDocument(['x' => 2]);
546+
547+
$changeStream->next();
548+
549+
$this->assertSame(0, $changeStream->key());
550+
551+
$changeStream->next();
552+
553+
$this->assertSame(1, $changeStream->key());
554+
}
555+
556+
public function testResumeTokenNotFoundAdvancesKey()
557+
{
558+
$pipeline = [['$project' => ['_id' => 0 ]]];
559+
560+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
561+
$changeStream = $operation->execute($this->getPrimaryServer());
562+
563+
/* Note: we intentionally do not start iteration with rewind() to ensure
564+
* that we test extraction functionality within next(). */
565+
$this->insertDocument(['x' => 1]);
566+
$this->insertDocument(['x' => 2]);
567+
$this->insertDocument(['x' => 3]);
568+
569+
try {
570+
$changeStream->rewind();
571+
$this->fail('ResumeTokenException was not thrown');
572+
} catch (ResumeTokenException $e) {}
573+
574+
$this->assertSame(0, $changeStream->key());
575+
576+
try {
577+
$changeStream->next();
578+
$this->fail('ResumeTokenException was not thrown');
579+
} catch (ResumeTokenException $e) {}
580+
581+
$this->assertSame(1, $changeStream->key());
582+
583+
try {
584+
$changeStream->next();
585+
$this->fail('ResumeTokenException was not thrown');
586+
} catch (ResumeTokenException $e) {}
587+
588+
$this->assertSame(2, $changeStream->key());
589+
}
590+
539591
private function insertDocument($document)
540592
{
541593
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

0 commit comments

Comments
 (0)