Skip to content

Commit d38c095

Browse files
committed
Merge pull request #639
2 parents bfcda32 + b3da293 commit d38c095

11 files changed

+987
-445
lines changed

src/ChangeStream.php

Lines changed: 37 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717

1818
namespace MongoDB;
1919

20-
use MongoDB\BSON\Serializable;
21-
use MongoDB\Driver\Cursor;
20+
use MongoDB\Driver\CursorId;
2221
use MongoDB\Driver\Exception\ConnectionException;
2322
use MongoDB\Driver\Exception\RuntimeException;
2423
use MongoDB\Driver\Exception\ServerException;
25-
use MongoDB\Exception\InvalidArgumentException;
2624
use MongoDB\Exception\ResumeTokenException;
27-
use MongoDB\Model\TailableCursorIterator;
25+
use MongoDB\Model\ChangeStreamIterator;
2826
use Iterator;
2927

3028
/**
@@ -42,13 +40,14 @@ class ChangeStream implements Iterator
4240
*/
4341
const CURSOR_NOT_FOUND = 43;
4442

45-
private static $errorCodeCappedPositionLost = 136;
46-
private static $errorCodeInterrupted = 11601;
47-
private static $errorCodeCursorKilled = 237;
43+
private static $nonResumableErrorCodes = [
44+
136, // CappedPositionLost
45+
237, // CursorKilled
46+
11601, // Interrupted
47+
];
4848

49-
private $resumeToken;
5049
private $resumeCallable;
51-
private $csIt;
50+
private $iterator;
5251
private $key = 0;
5352

5453
/**
@@ -61,14 +60,13 @@ class ChangeStream implements Iterator
6160
* Constructor.
6261
*
6362
* @internal
64-
* @param Cursor $cursor
65-
* @param callable $resumeCallable
66-
* @param boolean $isFirstBatchEmpty
63+
* @param ChangeStreamIterator $iterator
64+
* @param callable $resumeCallable
6765
*/
68-
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty)
66+
public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
6967
{
68+
$this->iterator = $iterator;
7069
$this->resumeCallable = $resumeCallable;
71-
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
7270
}
7371

7472
/**
@@ -77,15 +75,29 @@ public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBa
7775
*/
7876
public function current()
7977
{
80-
return $this->csIt->current();
78+
return $this->iterator->current();
8179
}
8280

8381
/**
84-
* @return \MongoDB\Driver\CursorId
82+
* @return CursorId
8583
*/
8684
public function getCursorId()
8785
{
88-
return $this->csIt->getInnerIterator()->getId();
86+
return $this->iterator->getInnerIterator()->getId();
87+
}
88+
89+
/**
90+
* Returns the resume token for the iterator's current position.
91+
*
92+
* Null may be returned if no change documents have been iterated and the
93+
* server did not include a postBatchResumeToken in its aggregate or getMore
94+
* command response.
95+
*
96+
* @return array|object|null
97+
*/
98+
public function getResumeToken()
99+
{
100+
return $this->iterator->getResumeToken();
89101
}
90102

91103
/**
@@ -108,7 +120,7 @@ public function key()
108120
public function next()
109121
{
110122
try {
111-
$this->csIt->next();
123+
$this->iterator->next();
112124
$this->onIteration($this->hasAdvanced);
113125
} catch (RuntimeException $e) {
114126
$this->resumeOrThrow($e);
@@ -123,7 +135,7 @@ public function next()
123135
public function rewind()
124136
{
125137
try {
126-
$this->csIt->rewind();
138+
$this->iterator->rewind();
127139
/* Unlike next() and resume(), the decision to increment the key
128140
* does not depend on whether the change stream has advanced. This
129141
* ensures that multiple calls to rewind() do not alter state. */
@@ -139,40 +151,7 @@ public function rewind()
139151
*/
140152
public function valid()
141153
{
142-
return $this->csIt->valid();
143-
}
144-
145-
/**
146-
* Extracts the resume token (i.e. "_id" field) from the change document.
147-
*
148-
* @param array|object $document Change document
149-
* @return mixed
150-
* @throws InvalidArgumentException
151-
* @throws ResumeTokenException if the resume token is not found or invalid
152-
*/
153-
private function extractResumeToken($document)
154-
{
155-
if ( ! is_array($document) && ! is_object($document)) {
156-
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
157-
}
158-
159-
if ($document instanceof Serializable) {
160-
return $this->extractResumeToken($document->bsonSerialize());
161-
}
162-
163-
$resumeToken = is_array($document)
164-
? (isset($document['_id']) ? $document['_id'] : null)
165-
: (isset($document->_id) ? $document->_id : null);
166-
167-
if ( ! isset($resumeToken)) {
168-
throw ResumeTokenException::notFound();
169-
}
170-
171-
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
172-
throw ResumeTokenException::invalidType($resumeToken);
173-
}
174-
175-
return $resumeToken;
154+
return $this->iterator->valid();
176155
}
177156

178157
/**
@@ -196,7 +175,7 @@ private function isResumableError(RuntimeException $exception)
196175
return false;
197176
}
198177

199-
if (in_array($exception->getCode(), [self::$errorCodeCappedPositionLost, self::$errorCodeCursorKilled, self::$errorCodeInterrupted])) {
178+
if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
200179
return false;
201180
}
202181

@@ -222,13 +201,11 @@ private function onIteration($incrementKey)
222201
}
223202

224203
/* Return early if there is not a current result. Avoid any attempt to
225-
* increment the iterator's key or extract a resume token */
204+
* increment the iterator's key. */
226205
if (!$this->valid()) {
227206
return;
228207
}
229208

230-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
231-
232209
if ($incrementKey) {
233210
$this->key++;
234211
}
@@ -237,16 +214,14 @@ private function onIteration($incrementKey)
237214
}
238215

239216
/**
240-
* Creates a new changeStream after a resumable server error.
217+
* Recreates the ChangeStreamIterator after a resumable server error.
241218
*
242219
* @return void
243220
*/
244221
private function resume()
245222
{
246-
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);
247-
248-
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
249-
$this->csIt->rewind();
223+
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
224+
$this->iterator->rewind();
250225

251226
$this->onIteration($this->hasAdvanced);
252227
}

0 commit comments

Comments
 (0)