Skip to content

Commit a09973f

Browse files
committed
Merge pull request #476
2 parents 0e1516b + 956f1e1 commit a09973f

File tree

8 files changed

+480
-353
lines changed

8 files changed

+480
-353
lines changed

docs/includes/apiargs-MongoDBCollection-method-watch-option.yaml

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1-
source:
2-
file: apiargs-MongoDBCollection-method-aggregate-option.yaml
3-
ref: batchSize
1+
---
2+
arg_name: option
3+
name: batchSize
4+
type: integer
5+
description: |
6+
Specifies the maximum number of change events to return in each batch of the
7+
response from the MongoDB cluster.
8+
interface: phpmethod
9+
operation: ~
10+
optional: true
411
---
512
source:
613
file: apiargs-MongoDBCollection-common-option.yaml
@@ -18,6 +25,10 @@ description: |
1825
1926
- ``MongoDB\Operation\ChangeStream::FULL_DOCUMENT_DEFAULT`` (*default*)
2027
- ``MongoDB\Operation\ChangeStream::FULL_DOCUMENT_UPDATE_LOOKUP``
28+
29+
.. note::
30+
31+
This is an option of the `$changeStream` pipeline stage.
2132
interface: phpmethod
2233
operation: ~
2334
optional: true
@@ -45,7 +56,9 @@ type: array|object
4556
description: |
4657
Specifies the logical starting point for the new change stream.
4758
48-
Note this is an option of the '$changeStream' pipeline stage.
59+
.. note::
60+
61+
This is an option of the `$changeStream` pipeline stage.
4962
interface: phpmethod
5063
operation: ~
5164
optional: true

src/ChangeStream.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public function next()
100100
$resumable = true;
101101
}
102102
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
103-
$resumable = true;
103+
$resumable = true;
104104
}
105105
if ($e instanceof ConnectionTimeoutException) {
106106
$resumable = true;
@@ -147,7 +147,7 @@ private function extractResumeToken($document)
147147
if (isset($document->_id)) {
148148
$this->resumeToken = is_array($document) ? $document['_id'] : $document->_id;
149149
} else {
150-
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
150+
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
151151
}
152152
}
153153

src/Collection.php

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use MongoDB\BSON\JavascriptInterface;
2121
use MongoDB\BSON\Serializable;
22-
use MongoDB\ChangeStream as ChangeStreamResult;
22+
use MongoDB\ChangeStream;
2323
use MongoDB\Driver\Cursor;
2424
use MongoDB\Driver\Manager;
2525
use MongoDB\Driver\ReadConcern;
@@ -32,7 +32,6 @@
3232
use MongoDB\Model\IndexInfoIterator;
3333
use MongoDB\Operation\Aggregate;
3434
use MongoDB\Operation\BulkWrite;
35-
use MongoDB\Operation\ChangeStream;
3635
use MongoDB\Operation\CreateIndexes;
3736
use MongoDB\Operation\Count;
3837
use MongoDB\Operation\DeleteMany;
@@ -52,6 +51,7 @@
5251
use MongoDB\Operation\ReplaceOne;
5352
use MongoDB\Operation\UpdateMany;
5453
use MongoDB\Operation\UpdateOne;
54+
use MongoDB\Operation\Watch;
5555
use Traversable;
5656

5757
class Collection
@@ -939,14 +939,14 @@ public function updateOne($filter, $update, array $options = [])
939939
return $operation->execute($server);
940940
}
941941

942-
/*
943-
* ChangeStream outline
942+
/**
943+
* Create a change stream for watching changes to the collection.
944944
*
945-
* @see ChangeStream::__construct() for supported options
946-
* @param array $pipeline List of pipeline operations
947-
* @param array $options Command options
945+
* @see Watch::__construct() for supported options
946+
* @param array $pipeline List of pipeline operations
947+
* @param array $options Command options
948+
* @return ChangeStream
948949
* @throws InvalidArgumentException for parameter/option parsing errors
949-
* @return ChangeStreamResult
950950
*/
951951
public function watch(array $pipeline = [], array $options = [])
952952
{
@@ -956,11 +956,18 @@ public function watch(array $pipeline = [], array $options = [])
956956

957957
$server = $this->manager->selectServer($options['readPreference']);
958958

959-
if ( ! isset($options['readConcern'])) {
959+
/* Although change streams require a newer version of the server than
960+
* read concerns, perform the usual wire version check before inheriting
961+
* the collection's read concern. In the event that the server is too
962+
* old, this makes it more likely that users will encounter an error
963+
* related to change streams being unsupported instead of an
964+
* UnsupportedException regarding use of the "readConcern" option from
965+
* the Aggregate operation class. */
966+
if ( ! isset($options['readConcern']) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern)) {
960967
$options['readConcern'] = $this->readConcern;
961968
}
962969

963-
$operation = new ChangeStream($this->databaseName, $this->collectionName, $pipeline, $options, $this->manager);
970+
$operation = new Watch($this->manager, $this->databaseName, $this->collectionName, $pipeline, $options);
964971

965972
return $operation->execute($server);
966973
}

src/Operation/ChangeStream.php renamed to src/Operation/Watch.php

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
namespace MongoDB\Operation;
1919

20-
use MongoDB\ChangeStream as ChangeStreamResult;
20+
use MongoDB\ChangeStream;
2121
use MongoDB\Driver\Command;
2222
use MongoDB\Driver\Manager;
2323
use MongoDB\Driver\ReadConcern;
@@ -32,10 +32,10 @@
3232
* Operation for creating a change stream with the aggregate command.
3333
*
3434
* @api
35-
* @see \MongoDB\Collection::changeStream()
36-
* @see http://docs.mongodb.org/manual/reference/command/changeStream/
35+
* @see \MongoDB\Collection::watch()
36+
* @see https://docs.mongodb.com/manual/changeStreams/
3737
*/
38-
class ChangeStream implements Executable
38+
class Watch implements Executable
3939
{
4040
const FULL_DOCUMENT_DEFAULT = 'default';
4141
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
@@ -47,18 +47,19 @@ class ChangeStream implements Executable
4747
private $manager;
4848

4949
/**
50-
* Constructs a changeStream command.
50+
* Constructs an aggregate command for creating a change stream.
5151
*
5252
* Supported options:
5353
*
54-
* * fullDocument (string): Allowed values: ‘default’, ‘updateLookup’.
55-
* Defaults to ‘default’. When set to ‘updateLookup’, the change
56-
* notification for partial updates will include both a delta describing
57-
* the changes to the document, as well as a copy of the entire document
58-
* that was changed from some time after the change occurred. For forward
59-
* compatibility, a driver MUST NOT raise an error when a user provides
60-
* an unknown value. The driver relies on the server to validate this
61-
* option.
54+
* * fullDocument (string): Determines whether the "fullDocument" field
55+
* will be populated for update operations. By default, change streams
56+
* only return the delta of fields during the update operation (via the
57+
* "updateDescription" field). To additionally return the most current
58+
* majority-committed version of the updated document, specify
59+
* "updateLookup" for this option. Defaults to "default".
60+
*
61+
* Insert and replace operations always include the "fullDocument" field
62+
* and delete operations omit the field as the document no longer exists.
6263
*
6364
* * resumeAfter (document): Specifies the logical starting point for the
6465
* new change stream.
@@ -69,7 +70,9 @@ class ChangeStream implements Executable
6970
* This is not supported for server versions < 3.2 and will result in an
7071
* exception at execution time if used.
7172
*
72-
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
73+
* * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
74+
* will be used to select a new server when resuming. Defaults to a
75+
* "primary" read preference.
7376
*
7477
* * maxAwaitTimeMS (integer): The maximum amount of time for the server to
7578
* wait on new documents to satisfy a change stream query.
@@ -91,8 +94,13 @@ class ChangeStream implements Executable
9194
* @param Manager $manager Manager instance from the driver
9295
* @throws InvalidArgumentException for parameter/option parsing errors
9396
*/
94-
public function __construct($databaseName, $collectionName, array $pipeline, array $options = [], Manager $manager)
97+
public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
9598
{
99+
$options += [
100+
'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
101+
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
102+
];
103+
96104
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
97105
throw InvalidArgumentException::invalidType('"batchSize" option', $options['batchSize'], 'integer');
98106
}
@@ -119,19 +127,19 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
119127
}
120128
}
121129

130+
$this->manager = $manager;
122131
$this->databaseName = (string) $databaseName;
123132
$this->collectionName = (string) $collectionName;
124133
$this->pipeline = $pipeline;
125134
$this->options = $options;
126-
$this->manager = $manager;
127135
}
128136

129137
/**
130138
* Execute the operation.
131139
*
132140
* @see Executable::execute()
133141
* @param Server $server
134-
* @return ChangeStreamResult
142+
* @return ChangeStream
135143
* @throws UnexpectedValueException if the command response was malformed
136144
* @throws UnsupportedException if collation, read concern, or write concern is used and unsupported
137145
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
@@ -142,7 +150,7 @@ public function execute(Server $server)
142150

143151
$cursor = $command->execute($server);
144152

145-
return new ChangeStreamResult($cursor, $this->createResumeCallable());
153+
return new ChangeStream($cursor, $this->createResumeCallable());
146154
}
147155

148156
private function createAggregateOptions()

tests/CommandObserver.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use MongoDB\Driver\Monitoring\CommandStartedEvent;
77
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
88
use MongoDB\Driver\Monitoring\CommandSubscriber;
9+
use Exception;
910

1011
/**
1112
* Observes command documents using the driver's monitoring API.
@@ -20,13 +21,19 @@ public function observe(callable $execution, callable $commandCallback)
2021

2122
\MongoDB\Driver\Monitoring\addSubscriber($this);
2223

23-
call_user_func($execution);
24+
try {
25+
call_user_func($execution);
26+
} catch (Exception $executionException) {}
2427

2528
\MongoDB\Driver\Monitoring\removeSubscriber($this);
2629

2730
foreach ($this->commands as $command) {
2831
call_user_func($commandCallback, $command);
2932
}
33+
34+
if (isset($executionException)) {
35+
throw $executionException;
36+
}
3037
}
3138

3239
public function commandStarted(CommandStartedEvent $event)

0 commit comments

Comments
 (0)