Skip to content

Commit 041c0c4

Browse files
committed
Merge pull request #480
2 parents 488010b + 99f54dd commit 041c0c4

File tree

6 files changed

+156
-16
lines changed

6 files changed

+156
-16
lines changed

docs/includes/apiargs-MongoDBCollection-method-watch-option.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,8 @@ optional: true
7070
source:
7171
file: apiargs-common-option.yaml
7272
ref: session
73+
---
74+
source:
75+
file: apiargs-MongoDBCollection-common-option.yaml
76+
ref: typeMap
7377
...

src/ChangeStream.php

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use MongoDB\Driver\Cursor;
2222
use MongoDB\Driver\Exception\ConnectionTimeoutException;
2323
use MongoDB\Driver\Exception\RuntimeException;
24+
use MongoDB\Exception\InvalidArgumentException;
2425
use MongoDB\Exception\ResumeTokenException;
2526
use IteratorIterator;
2627
use Iterator;
@@ -92,7 +93,7 @@ public function next()
9293
try {
9394
$this->csIt->next();
9495
if ($this->valid()) {
95-
$this->extractResumeToken($this->csIt->current());
96+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
9697
$this->key++;
9798
}
9899
} catch (RuntimeException $e) {
@@ -121,7 +122,7 @@ public function rewind()
121122
try {
122123
$this->csIt->rewind();
123124
if ($this->valid()) {
124-
$this->extractResumeToken($this->csIt->current());
125+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
125126
}
126127
} catch (RuntimeException $e) {
127128
if (strpos($e->getMessage(), "not master") !== false) {
@@ -149,25 +150,36 @@ public function valid()
149150
}
150151

151152
/**
152-
* Extracts the resumeToken (_id) of the input document.
153+
* Extracts the resume token (i.e. "_id" field) from the change document.
153154
*
154-
* @return void
155-
* @throws ResumeTokenException if the document has no _id.
155+
* @param array|document $document Change document
156+
* @return mixed
157+
* @throws InvalidArgumentException
158+
* @throws ResumeTokenException if the resume token is not found or invalid
156159
*/
157160
private function extractResumeToken($document)
158161
{
159-
if ($document === null) {
160-
throw new ResumeTokenException("Cannot extract a resumeToken from an empty document");
162+
if ( ! is_array($document) && ! is_object($document)) {
163+
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
161164
}
165+
162166
if ($document instanceof Serializable) {
163-
$this->extractResumeToken($document->bsonSerialize());
164-
return;
167+
return $this->extractResumeToken($document->bsonSerialize());
168+
}
169+
170+
$resumeToken = is_array($document)
171+
? (isset($document['_id']) ? $document['_id'] : null)
172+
: (isset($document->_id) ? $document->_id : null);
173+
174+
if ( ! isset($resumeToken)) {
175+
throw ResumeTokenException::notFound();
165176
}
166-
if (isset($document->_id)) {
167-
$this->resumeToken = is_array($document) ? $document['_id'] : $document->_id;
168-
} else {
169-
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
177+
178+
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
179+
throw ResumeTokenException::invalidType($resumeToken);
170180
}
181+
182+
return $resumeToken;
171183
}
172184

173185
/**

src/Exception/ResumeTokenException.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,24 @@
1919

2020
class ResumeTokenException extends \Exception
2121
{
22+
/**
23+
* Thrown when a resume token is not found in a change document.
24+
*
25+
* @return self
26+
*/
27+
public static function notFound()
28+
{
29+
return new static('Resume token not found in change document');
30+
}
31+
32+
/**
33+
* Thrown when a resume token has an invalid type.
34+
*
35+
* @param mixed $value Actual value (used to derive the type)
36+
* @return self
37+
*/
38+
public static function invalidType($value)
39+
{
40+
return new static(sprintf('Expected resume token to have type "array or object" but found "%s"', gettype($value)));
41+
}
2242
}

src/Operation/Watch.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ class Watch implements Executable
8383
*
8484
* Sessions are not supported for server versions < 3.6.
8585
*
86+
* * typeMap (array): Type map for BSON deserialization. This will be
87+
* applied to the returned Cursor (it is not sent to the server).
88+
*
8689
* @param string $databaseName Database name
8790
* @param string $collectionName Collection name
8891
* @param array $pipeline List of pipeline operations
@@ -148,7 +151,7 @@ private function createAggregate()
148151
$pipeline = $this->pipeline;
149152
array_unshift($pipeline, $changeStream);
150153

151-
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1]);
154+
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
152155

153156
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions);
154157
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,9 @@ public function testInitialCursorIsNotClosed()
307307

308308
/**
309309
* @expectedException MongoDB\Exception\ResumeTokenException
310+
* @expectedExceptionMessage Resume token not found in change document
310311
*/
311-
public function testNextCannotExtractResumeToken()
312+
public function testNextResumeTokenNotFound()
312313
{
313314
$pipeline = [['$project' => ['_id' => 0 ]]];
314315

@@ -324,8 +325,9 @@ public function testNextCannotExtractResumeToken()
324325

325326
/**
326327
* @expectedException MongoDB\Exception\ResumeTokenException
328+
* @expectedExceptionMessage Resume token not found in change document
327329
*/
328-
public function testRewindCannotExtractResumeToken()
330+
public function testRewindResumeTokenNotFound()
329331
{
330332
$pipeline = [['$project' => ['_id' => 0 ]]];
331333

@@ -337,6 +339,40 @@ public function testRewindCannotExtractResumeToken()
337339
$changeStream->rewind();
338340
}
339341

342+
/**
343+
* @expectedException MongoDB\Exception\ResumeTokenException
344+
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
345+
*/
346+
public function testNextResumeTokenInvalidType()
347+
{
348+
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
349+
350+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
351+
$changeStream = $operation->execute($this->getPrimaryServer());
352+
353+
/* Note: we intentionally do not start iteration with rewind() to ensure
354+
* that we test extraction functionality within next(). */
355+
$this->insertDocument(['x' => 1]);
356+
357+
$changeStream->next();
358+
}
359+
360+
/**
361+
* @expectedException MongoDB\Exception\ResumeTokenException
362+
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
363+
*/
364+
public function testRewindResumeTokenInvalidType()
365+
{
366+
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
367+
368+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
369+
$changeStream = $operation->execute($this->getPrimaryServer());
370+
371+
$this->insertDocument(['x' => 1]);
372+
373+
$changeStream->rewind();
374+
}
375+
340376
public function testMaxAwaitTimeMS()
341377
{
342378
/* On average, an acknowledged write takes about 20 ms to appear in a
@@ -427,6 +463,67 @@ public function testRewindExtractsResumeTokenAndNextResumes()
427463
$this->assertSameDocument($expectedResult, $changeStream->current());
428464
}
429465

466+
/**
467+
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
468+
*/
469+
public function testTypeMapOption(array $typeMap, $expectedChangeDocument)
470+
{
471+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100, 'typeMap' => $typeMap]);
472+
$changeStream = $operation->execute($this->getPrimaryServer());
473+
474+
$changeStream->rewind();
475+
$this->assertNull($changeStream->current());
476+
477+
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
478+
479+
$changeStream->next();
480+
$this->assertTrue($changeStream->valid());
481+
$changeDocument = $changeStream->current();
482+
483+
// Unset the resume token and namespace, which are intentionally omitted
484+
if (is_array($changeDocument)) {
485+
unset($changeDocument['_id'], $changeDocument['ns']);
486+
} else {
487+
unset($changeDocument->_id, $changeDocument->ns);
488+
}
489+
490+
$this->assertEquals($expectedChangeDocument, $changeDocument);
491+
}
492+
493+
public function provideTypeMapOptionsAndExpectedChangeDocument()
494+
{
495+
/* Note: the "_id" and "ns" fields are purposefully omitted because the
496+
* resume token's value cannot be anticipated and the collection name,
497+
* which is generated from the test name, is not available in the data
498+
* provider, respectively. */
499+
return [
500+
[
501+
['root' => 'array', 'document' => 'array'],
502+
[
503+
'operationType' => 'insert',
504+
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
505+
'documentKey' => ['_id' => 1],
506+
],
507+
],
508+
[
509+
['root' => 'object', 'document' => 'array'],
510+
(object) [
511+
'operationType' => 'insert',
512+
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
513+
'documentKey' => ['_id' => 1],
514+
],
515+
],
516+
[
517+
['root' => 'array', 'document' => 'stdClass'],
518+
[
519+
'operationType' => 'insert',
520+
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
521+
'documentKey' => (object) ['_id' => 1],
522+
],
523+
],
524+
];
525+
}
526+
430527
private function insertDocument($document)
431528
{
432529
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

tests/Operation/WatchTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ public function provideInvalidConstructorOptions()
6767
$options[][] = ['session' => $value];
6868
}
6969

70+
foreach ($this->getInvalidArrayValues() as $value) {
71+
$options[][] = ['typeMap' => $value];
72+
}
73+
7074
return $options;
7175
}
7276
}

0 commit comments

Comments
 (0)