Skip to content

Commit 84c365c

Browse files
committed
PHPLIB-451: ChangeStream::rewind() should never execute getMore
Tests were revised to no longer expect rewind() to encounter an error and resume. In most tests, rewind() is a NOP because the change stream has no events to return upon creation. Since Watch now always executes aggregate commands with APM, the check for startAtOperationTime support was moved to createResumeCallable(). We continue to only capture operationTime from the very first aggregate command. Various assertions for current() returning null were also changed to instead check valid(). Both forms are equivalent, but checking valid() is more consistent with our iteration examples.
1 parent b6c7f11 commit 84c365c

File tree

6 files changed

+186
-271
lines changed

6 files changed

+186
-271
lines changed

src/ChangeStream.php

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use MongoDB\Driver\Exception\ServerException;
2525
use MongoDB\Exception\InvalidArgumentException;
2626
use MongoDB\Exception\ResumeTokenException;
27-
use IteratorIterator;
27+
use MongoDB\Model\TailableCursorIterator;
2828
use Iterator;
2929

3030
/**
@@ -61,13 +61,14 @@ class ChangeStream implements Iterator
6161
* Constructor.
6262
*
6363
* @internal
64-
* @param Cursor $cursor
64+
* @param Cursor $cursor
6565
* @param callable $resumeCallable
66+
* @param boolean $isFirstBatchEmpty
6667
*/
67-
public function __construct(Cursor $cursor, callable $resumeCallable)
68+
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty)
6869
{
6970
$this->resumeCallable = $resumeCallable;
70-
$this->csIt = new IteratorIterator($cursor);
71+
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
7172
}
7273

7374
/**
@@ -242,17 +243,11 @@ private function onIteration($incrementKey)
242243
*/
243244
private function resume()
244245
{
245-
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
246-
$this->csIt = $newChangeStream->csIt;
246+
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);
247+
248+
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
247249
$this->csIt->rewind();
248-
/* Note: if we are resuming after a call to ChangeStream::rewind(),
249-
* $hasAdvanced will always be false. For it to be true, rewind() would
250-
* need to have thrown a RuntimeException with a resumable error, which
251-
* can only happen during the first call to IteratorIterator::rewind()
252-
* before onIteration() has a chance to set $hasAdvanced to true.
253-
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
254-
* rewinds) or throw a LogicException (rewind after next), neither of
255-
* which would result in a call to resume(). */
250+
256251
$this->onIteration($this->hasAdvanced);
257252
}
258253

src/Model/TailableCursorIterator.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ class TailableCursorIterator extends IteratorIterator
3838
*
3939
* @internal
4040
* @param Cursor $cursor
41-
* @param boolean $isFirstBatchIsEmpty
41+
* @param boolean $isFirstBatchEmpty
4242
*/
43-
public function __construct(Cursor $cursor, $isFirstBatchIsEmpty)
43+
public function __construct(Cursor $cursor, $isFirstBatchEmpty)
4444
{
4545
parent::__construct($cursor);
46-
$this->isRewindNop = $isFirstBatchIsEmpty;
46+
$this->isRewindNop = $isFirstBatchEmpty;
4747
}
4848

4949
/**

src/Operation/Watch.php

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
5757
private $changeStreamOptions;
5858
private $collectionName;
5959
private $databaseName;
60+
private $isFirstBatchEmpty = false;
6061
private $operationTime;
6162
private $pipeline;
6263
private $resumeCallable;
@@ -200,6 +201,11 @@ final public function commandFailed(CommandFailedEvent $event)
200201
/** @internal */
201202
final public function commandStarted(CommandStartedEvent $event)
202203
{
204+
if ($event->getCommandName() !== 'aggregate') {
205+
return;
206+
}
207+
208+
$this->isFirstBatchEmpty = false;
203209
}
204210

205211
/** @internal */
@@ -211,9 +217,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)
211217

212218
$reply = $event->getReply();
213219

214-
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
220+
/* Note: the spec only refers to collecting an operation time from the
221+
* "original aggregation", so only capture it if we've not already. */
222+
if (!isset($this->operationTime) && isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
215223
$this->operationTime = $reply->operationTime;
216224
}
225+
226+
if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) {
227+
$this->isFirstBatchEmpty = empty($reply->cursor->firstBatch);
228+
}
217229
}
218230

219231
/**
@@ -227,7 +239,9 @@ final public function commandSucceeded(CommandSucceededEvent $event)
227239
*/
228240
public function execute(Server $server)
229241
{
230-
return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
242+
$cursor = $this->executeAggregate($server);
243+
244+
return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchEmpty);
231245
}
232246

233247
/**
@@ -255,40 +269,36 @@ private function createResumeCallable(Manager $manager)
255269
unset($this->changeStreamOptions['startAtOperationTime']);
256270
}
257271

272+
// Select a new server using the original read preference
273+
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
274+
258275
/* If we captured an operation time from the first aggregate command
259276
* and there is no "resumeAfter" option, set "startAtOperationTime"
260277
* so that we can resume from the original aggregate's time. */
261-
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
278+
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
279+
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
262280
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
263281
}
264282

283+
// Recreate the aggregate command and execute to obtain a new cursor
265284
$this->aggregate = $this->createAggregate();
285+
$cursor = $this->executeAggregate($server);
266286

267-
/* Select a new server using the read preference, execute this
268-
* operation on it, and return the new ChangeStream. */
269-
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
270-
271-
return $this->execute($server);
287+
return [$cursor, $this->isFirstBatchEmpty];
272288
};
273289
}
274290

275291
/**
276-
* Execute the aggregate command and optionally capture its operation time.
292+
* Execute the aggregate command.
293+
*
294+
* The command will be executed using APM so that we can capture its
295+
* operation time and/or firstBatch size.
277296
*
278297
* @param Server $server
279298
* @return Cursor
280299
*/
281300
private function executeAggregate(Server $server)
282301
{
283-
/* If we've already captured an operation time or the server does not
284-
* support resuming from an operation time (e.g. MongoDB 3.6), execute
285-
* the aggregation directly and return its cursor. */
286-
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
287-
return $this->aggregate->execute($server);
288-
}
289-
290-
/* Otherwise, execute the aggregation using command monitoring so that
291-
* we can capture its operation time with commandSucceeded(). */
292302
\MongoDB\Driver\Monitoring\addSubscriber($this);
293303

294304
try {

0 commit comments

Comments
 (0)