18
18
namespace MongoDB \Operation ;
19
19
20
20
use MongoDB \ChangeStream ;
21
+ use MongoDB \BSON \TimestampInterface ;
21
22
use MongoDB \Driver \Command ;
23
+ use MongoDB \Driver \Cursor ;
22
24
use MongoDB \Driver \Manager ;
23
25
use MongoDB \Driver \ReadConcern ;
24
26
use MongoDB \Driver \ReadPreference ;
25
27
use MongoDB \Driver \Server ;
26
28
use MongoDB \Driver \Session ;
27
29
use MongoDB \Driver \Exception \RuntimeException ;
30
+ use MongoDB \Driver \Monitoring \CommandFailedEvent ;
31
+ use MongoDB \Driver \Monitoring \CommandSubscriber ;
32
+ use MongoDB \Driver \Monitoring \CommandStartedEvent ;
33
+ use MongoDB \Driver \Monitoring \CommandSucceededEvent ;
28
34
use MongoDB \Exception \InvalidArgumentException ;
29
35
use MongoDB \Exception \UnexpectedValueException ;
30
36
use MongoDB \Exception \UnsupportedException ;
31
37
32
38
/**
33
39
* Operation for creating a change stream with the aggregate command.
34
40
*
41
+ * Note: the implementation of CommandSubscriber is an internal implementation
42
+ * detail and should not be considered part of the public API.
43
+ *
35
44
* @api
36
45
* @see \MongoDB\Collection::watch()
37
46
* @see https://docs.mongodb.com/manual/changeStreams/
38
47
*/
39
- class Watch implements Executable
48
+ class Watch implements Executable, /* @internal */ CommandSubscriber
40
49
{
50
+ private static $ wireVersionForOperationTime = 7 ;
51
+
41
52
const FULL_DOCUMENT_DEFAULT = 'default ' ;
42
53
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup ' ;
43
54
44
55
private $ aggregate ;
45
- private $ databaseName ;
56
+ private $ aggregateOptions ;
57
+ private $ changeStreamOptions ;
46
58
private $ collectionName ;
59
+ private $ databaseName ;
60
+ private $ operationTime ;
47
61
private $ pipeline ;
48
- private $ options ;
49
62
private $ resumeCallable ;
50
63
51
64
/**
@@ -79,22 +92,44 @@ class Watch implements Executable
79
92
* * resumeAfter (document): Specifies the logical starting point for the
80
93
* new change stream.
81
94
*
95
+ * Using this option in conjunction with "startAtOperationTime" will
96
+ * result in a server error. The options are mutually exclusive.
97
+ *
82
98
* * session (MongoDB\Driver\Session): Client session.
83
99
*
84
100
* Sessions are not supported for server versions < 3.6.
85
101
*
102
+ * * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
103
+ * the change stream will only provide changes that occurred at or after
104
+ * the specified timestamp. Any command run against the server will
105
+ * return an operation time that can be used here. Alternatively, an
106
+ * operation time may be obtained from MongoDB\Driver\Server::getInfo().
107
+ *
108
+ * Using this option in conjunction with "resumeAfter" will result in a
109
+ * server error. The options are mutually exclusive.
110
+ *
111
+ * This option is not supported for server versions < 4.0.
112
+ *
86
113
* * typeMap (array): Type map for BSON deserialization. This will be
87
114
* applied to the returned Cursor (it is not sent to the server).
88
115
*
89
- * @param string $databaseName Database name
90
- * @param string $collectionName Collection name
116
+ * Note: A database-level change stream may be created by specifying null
117
+ * for the collection name. A cluster-level change stream may be created by
118
+ * specifying null for both the database and collection name.
119
+ *
120
+ * @param Manager $manager Manager instance from the driver
121
+ * @param string|null $databaseName Database name
122
+ * @param string|null $collectionName Collection name
91
123
* @param array $pipeline List of pipeline operations
92
124
* @param array $options Command options
93
- * @param Manager $manager Manager instance from the driver
94
125
* @throws InvalidArgumentException for parameter/option parsing errors
95
126
*/
96
127
public function __construct (Manager $ manager , $ databaseName , $ collectionName , array $ pipeline , array $ options = [])
97
128
{
129
+ if (isset ($ collectionName ) && ! isset ($ databaseName )) {
130
+ throw new InvalidArgumentException ('$collectionName should also be null if $databaseName is null ' );
131
+ }
132
+
98
133
$ options += [
99
134
'fullDocument ' => self ::FULL_DOCUMENT_DEFAULT ,
100
135
'readPreference ' => new ReadPreference (ReadPreference::RP_PRIMARY ),
@@ -104,10 +139,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
104
139
throw InvalidArgumentException::invalidType ('"fullDocument" option ' , $ options ['fullDocument ' ], 'string ' );
105
140
}
106
141
107
- if (isset ($ options ['resumeAfter ' ])) {
108
- if ( ! is_array ($ options ['resumeAfter ' ]) && ! is_object ($ options ['resumeAfter ' ])) {
109
- throw InvalidArgumentException::invalidType ('"resumeAfter" option ' , $ options ['resumeAfter ' ], 'array or object ' );
110
- }
142
+ if (isset ($ options ['resumeAfter ' ]) && ! is_array ($ options ['resumeAfter ' ]) && ! is_object ($ options ['resumeAfter ' ])) {
143
+ throw InvalidArgumentException::invalidType ('"resumeAfter" option ' , $ options ['resumeAfter ' ], 'array or object ' );
144
+ }
145
+
146
+ if (isset ($ options ['startAtOperationTime ' ]) && ! $ options ['startAtOperationTime ' ] instanceof TimestampInterface) {
147
+ throw InvalidArgumentException::invalidType ('"startAtOperationTime" option ' , $ options ['startAtOperationTime ' ], TimestampInterface::class);
111
148
}
112
149
113
150
/* In the absence of an explicit session, create one to ensure that the
@@ -122,15 +159,47 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
122
159
}
123
160
}
124
161
162
+ $ this ->aggregateOptions = array_intersect_key ($ options , ['batchSize ' => 1 , 'collation ' => 1 , 'maxAwaitTimeMS ' => 1 , 'readConcern ' => 1 , 'readPreference ' => 1 , 'session ' => 1 , 'typeMap ' => 1 ]);
163
+ $ this ->changeStreamOptions = array_intersect_key ($ options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 , 'startAtOperationTime ' => 1 ]);
164
+
165
+ // Null database name implies a cluster-wide change stream
166
+ if ($ databaseName === null ) {
167
+ $ databaseName = 'admin ' ;
168
+ $ this ->changeStreamOptions ['allChangesForCluster ' ] = true ;
169
+ }
170
+
125
171
$ this ->databaseName = (string ) $ databaseName ;
126
- $ this ->collectionName = ( string ) $ collectionName ;
172
+ $ this ->collectionName = isset ( $ collectionName ) ? ( string ) $ collectionName : null ;
127
173
$ this ->pipeline = $ pipeline ;
128
- $ this ->options = $ options ;
129
174
130
175
$ this ->aggregate = $ this ->createAggregate ();
131
176
$ this ->resumeCallable = $ this ->createResumeCallable ($ manager );
132
177
}
133
178
179
+ /** @internal */
180
+ final public function commandFailed (CommandFailedEvent $ event )
181
+ {
182
+ }
183
+
184
+ /** @internal */
185
+ final public function commandStarted (CommandStartedEvent $ event )
186
+ {
187
+ }
188
+
189
+ /** @internal */
190
+ final public function commandSucceeded (CommandSucceededEvent $ event )
191
+ {
192
+ if ($ event ->getCommandName () !== 'aggregate ' ) {
193
+ return ;
194
+ }
195
+
196
+ $ reply = $ event ->getReply ();
197
+
198
+ if (isset ($ reply ->operationTime ) && $ reply ->operationTime instanceof TimestampInterface) {
199
+ $ this ->operationTime = $ reply ->operationTime ;
200
+ }
201
+ }
202
+
134
203
/**
135
204
* Execute the operation.
136
205
*
@@ -142,47 +211,74 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
142
211
*/
143
212
public function execute (Server $ server )
144
213
{
145
- $ cursor = $ this ->aggregate ->execute ($ server );
146
-
147
- return new ChangeStream ($ cursor , $ this ->resumeCallable );
214
+ return new ChangeStream ($ this ->executeAggregate ($ server ), $ this ->resumeCallable );
148
215
}
149
216
150
217
/**
151
218
* Create the aggregate command for creating a change stream.
152
219
*
153
- * This method is also used to recreate the aggregate command if a new
154
- * resume token is provided while resuming.
220
+ * This method is also used to recreate the aggregate command when resuming.
155
221
*
156
222
* @return Aggregate
157
223
*/
158
224
private function createAggregate ()
159
225
{
160
- $ changeStreamOptions = array_intersect_key ($ this ->options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 ]);
161
- $ changeStream = ['$changeStream ' => (object ) $ changeStreamOptions ];
162
-
163
226
$ pipeline = $ this ->pipeline ;
164
- array_unshift ($ pipeline , $ changeStream );
227
+ array_unshift ($ pipeline , [ ' $changeStream ' => ( object ) $ this -> changeStreamOptions ] );
165
228
166
- $ aggregateOptions = array_intersect_key ($ this ->options , ['batchSize ' => 1 , 'collation ' => 1 , 'maxAwaitTimeMS ' => 1 , 'readConcern ' => 1 , 'readPreference ' => 1 , 'session ' => 1 , 'typeMap ' => 1 ]);
167
-
168
- return new Aggregate ($ this ->databaseName , $ this ->collectionName , $ pipeline , $ aggregateOptions );
229
+ return new Aggregate ($ this ->databaseName , $ this ->collectionName , $ pipeline , $ this ->aggregateOptions );
169
230
}
170
231
171
232
private function createResumeCallable (Manager $ manager )
172
233
{
173
234
return function ($ resumeToken = null ) use ($ manager ) {
174
- /* If a resume token was provided, recreate the Aggregate operation
175
- * using the new resume token . */
235
+ /* If a resume token was provided, update the "resumeAfter" option
236
+ * and ensure that "startAtOperationTime" is no longer set . */
176
237
if ($ resumeToken !== null ) {
177
- $ this ->options ['resumeAfter ' ] = $ resumeToken ;
178
- $ this ->aggregate = $ this ->createAggregate ();
238
+ $ this ->changeStreamOptions ['resumeAfter ' ] = $ resumeToken ;
239
+ unset($ this ->changeStreamOptions ['startAtOperationTime ' ]);
240
+ }
241
+
242
+ /* If we captured an operation time from the first aggregate command
243
+ * and there is no "resumeAfter" option, set "startAtOperationTime"
244
+ * so that we can resume from the original aggregate's time. */
245
+ if ($ this ->operationTime !== null && ! isset ($ this ->changeStreamOptions ['resumeAfter ' ])) {
246
+ $ this ->changeStreamOptions ['startAtOperationTime ' ] = $ this ->operationTime ;
179
247
}
180
248
249
+ $ this ->aggregate = $ this ->createAggregate ();
250
+
181
251
/* Select a new server using the read preference, execute this
182
252
* operation on it, and return the new ChangeStream. */
183
- $ server = $ manager ->selectServer ($ this ->options ['readPreference ' ]);
253
+ $ server = $ manager ->selectServer ($ this ->aggregateOptions ['readPreference ' ]);
184
254
185
255
return $ this ->execute ($ server );
186
256
};
187
257
}
258
+
259
+ /**
260
+ * Execute the aggregate command and optionally capture its operation time.
261
+ *
262
+ * @param Server $server
263
+ * @return Cursor
264
+ */
265
+ private function executeAggregate (Server $ server )
266
+ {
267
+ /* If we've already captured an operation time or the server does not
268
+ * support returning an operation time (e.g. MongoDB 3.6), execute the
269
+ * aggregation directly and return its cursor. */
270
+ if ($ this ->operationTime !== null || ! \MongoDB \server_supports_feature ($ server , self ::$ wireVersionForOperationTime )) {
271
+ return $ this ->aggregate ->execute ($ server );
272
+ }
273
+
274
+ /* Otherwise, execute the aggregation using command monitoring so that
275
+ * we can capture its operation time with commandSucceeded(). */
276
+ \MongoDB \Driver \Monitoring \addSubscriber ($ this );
277
+
278
+ try {
279
+ return $ this ->aggregate ->execute ($ server );
280
+ } finally {
281
+ \MongoDB \Driver \Monitoring \removeSubscriber ($ this );
282
+ }
283
+ }
188
284
}
0 commit comments