Skip to content

Commit 99f54dd

Browse files
committed
Refactor resume token extraction and invalid type exception
1 parent 9f41780 commit 99f54dd

File tree

3 files changed

+74
-15
lines changed

3 files changed

+74
-15
lines changed

src/ChangeStream.php

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public function next()
9393
try {
9494
$this->csIt->next();
9595
if ($this->valid()) {
96-
$this->extractResumeToken($this->csIt->current());
96+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
9797
$this->key++;
9898
}
9999
} catch (RuntimeException $e) {
@@ -122,7 +122,7 @@ public function rewind()
122122
try {
123123
$this->csIt->rewind();
124124
if ($this->valid()) {
125-
$this->extractResumeToken($this->csIt->current());
125+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
126126
}
127127
} catch (RuntimeException $e) {
128128
if (strpos($e->getMessage(), "not master") !== false) {
@@ -152,31 +152,34 @@ public function valid()
152152
/**
153153
* Extracts the resume token (i.e. "_id" field) from the change document.
154154
*
155+
* @param array|document $document Change document
156+
* @return mixed
155157
* @throws InvalidArgumentException
156-
* @throws ResumeTokenException if the resume token cannot be found (i.e. no _id field)
158+
* @throws ResumeTokenException if the resume token is not found or invalid
157159
*/
158160
private function extractResumeToken($document)
159161
{
160162
if ( ! is_array($document) && ! is_object($document)) {
161163
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
162164
}
163165

164-
if (is_array($document) && isset($document['_id'])) {
165-
$this->resumeToken = $document['_id'];
166-
return;
166+
if ($document instanceof Serializable) {
167+
return $this->extractResumeToken($document->bsonSerialize());
167168
}
168169

169-
if ($document instanceof Serializable) {
170-
$this->extractResumeToken($document->bsonSerialize());
171-
return;
170+
$resumeToken = is_array($document)
171+
? (isset($document['_id']) ? $document['_id'] : null)
172+
: (isset($document->_id) ? $document->_id : null);
173+
174+
if ( ! isset($resumeToken)) {
175+
throw ResumeTokenException::notFound();
172176
}
173177

174-
if (isset($document->_id)) {
175-
$this->resumeToken = $document->_id;
176-
return;
178+
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
179+
throw ResumeTokenException::invalidType($resumeToken);
177180
}
178181

179-
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
182+
return $resumeToken;
180183
}
181184

182185
/**

src/Exception/ResumeTokenException.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,24 @@
1919

2020
class ResumeTokenException extends \Exception
2121
{
22+
/**
23+
* Thrown when a resume token is not found in a change document.
24+
*
25+
* @return self
26+
*/
27+
public static function notFound()
28+
{
29+
return new static('Resume token not found in change document');
30+
}
31+
32+
/**
33+
* Thrown when a resume token has an invalid type.
34+
*
35+
* @param mixed $value Actual value (used to derive the type)
36+
* @return self
37+
*/
38+
public static function invalidType($value)
39+
{
40+
return new static(sprintf('Expected resume token to have type "array or object" but found "%s"', gettype($value)));
41+
}
2242
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,9 @@ public function testInitialCursorIsNotClosed()
307307

308308
/**
309309
* @expectedException MongoDB\Exception\ResumeTokenException
310+
* @expectedExceptionMessage Resume token not found in change document
310311
*/
311-
public function testNextCannotExtractResumeToken()
312+
public function testNextResumeTokenNotFound()
312313
{
313314
$pipeline = [['$project' => ['_id' => 0 ]]];
314315

@@ -324,8 +325,9 @@ public function testNextCannotExtractResumeToken()
324325

325326
/**
326327
* @expectedException MongoDB\Exception\ResumeTokenException
328+
* @expectedExceptionMessage Resume token not found in change document
327329
*/
328-
public function testRewindCannotExtractResumeToken()
330+
public function testRewindResumeTokenNotFound()
329331
{
330332
$pipeline = [['$project' => ['_id' => 0 ]]];
331333

@@ -337,6 +339,40 @@ public function testRewindCannotExtractResumeToken()
337339
$changeStream->rewind();
338340
}
339341

342+
/**
343+
* @expectedException MongoDB\Exception\ResumeTokenException
344+
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
345+
*/
346+
public function testNextResumeTokenInvalidType()
347+
{
348+
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
349+
350+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
351+
$changeStream = $operation->execute($this->getPrimaryServer());
352+
353+
/* Note: we intentionally do not start iteration with rewind() to ensure
354+
* that we test extraction functionality within next(). */
355+
$this->insertDocument(['x' => 1]);
356+
357+
$changeStream->next();
358+
}
359+
360+
/**
361+
* @expectedException MongoDB\Exception\ResumeTokenException
362+
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
363+
*/
364+
public function testRewindResumeTokenInvalidType()
365+
{
366+
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
367+
368+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
369+
$changeStream = $operation->execute($this->getPrimaryServer());
370+
371+
$this->insertDocument(['x' => 1]);
372+
373+
$changeStream->rewind();
374+
}
375+
340376
public function testMaxAwaitTimeMS()
341377
{
342378
/* On average, an acknowledged write takes about 20 ms to appear in a

0 commit comments

Comments
 (0)