Skip to content

Commit 9f41780

Browse files
committed
PHPLIB-323: Support typeMap option for change streams
1 parent 488010b commit 9f41780

File tree

5 files changed

+90
-9
lines changed

5 files changed

+90
-9
lines changed

docs/includes/apiargs-MongoDBCollection-method-watch-option.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,8 @@ optional: true
7070
source:
7171
file: apiargs-common-option.yaml
7272
ref: session
73+
---
74+
source:
75+
file: apiargs-MongoDBCollection-common-option.yaml
76+
ref: typeMap
7377
...

src/ChangeStream.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use MongoDB\Driver\Cursor;
2222
use MongoDB\Driver\Exception\ConnectionTimeoutException;
2323
use MongoDB\Driver\Exception\RuntimeException;
24+
use MongoDB\Exception\InvalidArgumentException;
2425
use MongoDB\Exception\ResumeTokenException;
2526
use IteratorIterator;
2627
use Iterator;
@@ -149,25 +150,33 @@ public function valid()
149150
}
150151

151152
/**
152-
* Extracts the resumeToken (_id) of the input document.
153+
* Extracts the resume token (i.e. "_id" field) from the change document.
153154
*
154-
* @return void
155-
* @throws ResumeTokenException if the document has no _id.
155+
* @throws InvalidArgumentException
156+
* @throws ResumeTokenException if the resume token cannot be found (i.e. no _id field)
156157
*/
157158
private function extractResumeToken($document)
158159
{
159-
if ($document === null) {
160-
throw new ResumeTokenException("Cannot extract a resumeToken from an empty document");
160+
if ( ! is_array($document) && ! is_object($document)) {
161+
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
162+
}
163+
164+
if (is_array($document) && isset($document['_id'])) {
165+
$this->resumeToken = $document['_id'];
166+
return;
161167
}
168+
162169
if ($document instanceof Serializable) {
163170
$this->extractResumeToken($document->bsonSerialize());
164171
return;
165172
}
173+
166174
if (isset($document->_id)) {
167-
$this->resumeToken = is_array($document) ? $document['_id'] : $document->_id;
168-
} else {
169-
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
175+
$this->resumeToken = $document->_id;
176+
return;
170177
}
178+
179+
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
171180
}
172181

173182
/**

src/Operation/Watch.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ class Watch implements Executable
8383
*
8484
* Sessions are not supported for server versions < 3.6.
8585
*
86+
* * typeMap (array): Type map for BSON deserialization. This will be
87+
* applied to the returned Cursor (it is not sent to the server).
88+
*
8689
* @param string $databaseName Database name
8790
* @param string $collectionName Collection name
8891
* @param array $pipeline List of pipeline operations
@@ -148,7 +151,7 @@ private function createAggregate()
148151
$pipeline = $this->pipeline;
149152
array_unshift($pipeline, $changeStream);
150153

151-
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1]);
154+
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
152155

153156
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions);
154157
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,67 @@ public function testRewindExtractsResumeTokenAndNextResumes()
427427
$this->assertSameDocument($expectedResult, $changeStream->current());
428428
}
429429

430+
/**
431+
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
432+
*/
433+
public function testTypeMapOption(array $typeMap, $expectedChangeDocument)
434+
{
435+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100, 'typeMap' => $typeMap]);
436+
$changeStream = $operation->execute($this->getPrimaryServer());
437+
438+
$changeStream->rewind();
439+
$this->assertNull($changeStream->current());
440+
441+
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
442+
443+
$changeStream->next();
444+
$this->assertTrue($changeStream->valid());
445+
$changeDocument = $changeStream->current();
446+
447+
// Unset the resume token and namespace, which are intentionally omitted
448+
if (is_array($changeDocument)) {
449+
unset($changeDocument['_id'], $changeDocument['ns']);
450+
} else {
451+
unset($changeDocument->_id, $changeDocument->ns);
452+
}
453+
454+
$this->assertEquals($expectedChangeDocument, $changeDocument);
455+
}
456+
457+
public function provideTypeMapOptionsAndExpectedChangeDocument()
458+
{
459+
/* Note: the "_id" and "ns" fields are purposefully omitted because the
460+
* resume token's value cannot be anticipated and the collection name,
461+
* which is generated from the test name, is not available in the data
462+
* provider, respectively. */
463+
return [
464+
[
465+
['root' => 'array', 'document' => 'array'],
466+
[
467+
'operationType' => 'insert',
468+
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
469+
'documentKey' => ['_id' => 1],
470+
],
471+
],
472+
[
473+
['root' => 'object', 'document' => 'array'],
474+
(object) [
475+
'operationType' => 'insert',
476+
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
477+
'documentKey' => ['_id' => 1],
478+
],
479+
],
480+
[
481+
['root' => 'array', 'document' => 'stdClass'],
482+
[
483+
'operationType' => 'insert',
484+
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
485+
'documentKey' => (object) ['_id' => 1],
486+
],
487+
],
488+
];
489+
}
490+
430491
private function insertDocument($document)
431492
{
432493
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

tests/Operation/WatchTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ public function provideInvalidConstructorOptions()
6767
$options[][] = ['session' => $value];
6868
}
6969

70+
foreach ($this->getInvalidArrayValues() as $value) {
71+
$options[][] = ['typeMap' => $value];
72+
}
73+
7074
return $options;
7175
}
7276
}

0 commit comments

Comments
 (0)