@@ -41,6 +41,7 @@ class Watch implements Executable
41
41
const FULL_DOCUMENT_DEFAULT = 'default ' ;
42
42
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup ' ;
43
43
44
+ private $ aggregate ;
44
45
private $ databaseName ;
45
46
private $ collectionName ;
46
47
private $ pipeline ;
@@ -96,24 +97,8 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
96
97
'readPreference ' => new ReadPreference (ReadPreference::RP_PRIMARY ),
97
98
];
98
99
99
- if (isset ($ options ['batchSize ' ]) && ! is_integer ($ options ['batchSize ' ])) {
100
- throw InvalidArgumentException::invalidType ('"batchSize" option ' , $ options ['batchSize ' ], 'integer ' );
101
- }
102
-
103
- if (isset ($ options ['collation ' ]) && ! is_array ($ options ['collation ' ]) && ! is_object ($ options ['collation ' ])) {
104
- throw InvalidArgumentException::invalidType ('"collation" option ' , $ options ['collation ' ], 'array or object ' );
105
- }
106
-
107
- if (isset ($ options ['maxAwaitTimeMS ' ]) && ! is_integer ($ options ['maxAwaitTimeMS ' ])) {
108
- throw InvalidArgumentException::invalidType ('"maxAwaitTimeMS" option ' , $ options ['maxAwaitTimeMS ' ], 'integer ' );
109
- }
110
-
111
- if (isset ($ options ['readConcern ' ]) && ! $ options ['readConcern ' ] instanceof ReadConcern) {
112
- throw InvalidArgumentException::invalidType ('"readConcern" option ' , $ options ['readConcern ' ], 'MongoDB\Driver\ReadConcern ' );
113
- }
114
-
115
- if (isset ($ options ['readPreference ' ]) && ! $ options ['readPreference ' ] instanceof ReadPreference) {
116
- throw InvalidArgumentException::invalidType ('"readPreference" option ' , $ options ['readPreference ' ], 'MongoDB\Driver\ReadPreference ' );
100
+ if (isset ($ options ['fullDocument ' ]) && ! is_string ($ options ['fullDocument ' ])) {
101
+ throw InvalidArgumentException::invalidType ('"fullDocument" option ' , $ options ['fullDocument ' ], 'string ' );
117
102
}
118
103
119
104
if (isset ($ options ['resumeAfter ' ])) {
@@ -127,6 +112,8 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
127
112
$ this ->collectionName = (string ) $ collectionName ;
128
113
$ this ->pipeline = $ pipeline ;
129
114
$ this ->options = $ options ;
115
+
116
+ $ this ->aggregate = $ this ->createAggregate ();
130
117
}
131
118
132
119
/**
@@ -141,57 +128,46 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
141
128
*/
142
129
public function execute (Server $ server )
143
130
{
144
- $ command = $ this ->createCommand ();
145
-
146
- $ cursor = $ command ->execute ($ server );
131
+ $ cursor = $ this ->aggregate ->execute ($ server );
147
132
148
133
return new ChangeStream ($ cursor , $ this ->createResumeCallable ());
149
134
}
150
135
151
- private function createAggregateOptions ()
152
- {
153
- $ aggOptions = array_intersect_key ($ this ->options , ['batchSize ' => 1 , 'collation ' => 1 , 'maxAwaitTimeMS ' => 1 ]);
154
- if ( ! $ aggOptions ) {
155
- return [];
156
- }
157
- return $ aggOptions ;
158
- }
159
-
160
- private function createChangeStreamOptions ()
161
- {
162
- $ csOptions = array_intersect_key ($ this ->options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 ]);
163
- if ( ! $ csOptions ) {
164
- return [];
165
- }
166
- return $ csOptions ;
167
- }
168
-
169
136
/**
170
- * Create the aggregate pipeline with the changeStream command .
137
+ * Create the aggregate command for creating a change stream .
171
138
*
172
- * @return Command
139
+ * This method is also used to recreate the aggregate command if a new
140
+ * resume token is provided while resuming.
141
+ *
142
+ * @return Aggregate
173
143
*/
174
- private function createCommand ()
144
+ private function createAggregate ()
175
145
{
176
- $ changeStreamArray = ['$changeStream ' => $ this ->createChangeStreamOptions ()];
177
- array_unshift ($ this ->pipeline , $ changeStreamArray );
146
+ $ changeStreamOptions = array_intersect_key ($ this ->options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 ]);
147
+ $ changeStream = ['$changeStream ' => (object ) $ changeStreamOptions ];
148
+
149
+ $ pipeline = $ this ->pipeline ;
150
+ array_unshift ($ pipeline , $ changeStream );
178
151
179
- $ cmd = new Aggregate ($ this ->databaseName , $ this -> collectionName , $ this -> pipeline , $ this -> createAggregateOptions () );
152
+ $ aggregateOptions = array_intersect_key ($ this ->options , [ ' batchSize ' => 1 , ' collation ' => 1 , ' maxAwaitTimeMS ' => 1 , ' readConcern ' => 1 , ' readPreference ' => 1 ] );
180
153
181
- return $ cmd ;
154
+ return new Aggregate ( $ this -> databaseName , $ this -> collectionName , $ pipeline , $ aggregateOptions ) ;
182
155
}
183
156
184
157
private function createResumeCallable ()
185
158
{
186
- array_shift ($ this ->pipeline );
187
159
return function ($ resumeToken = null ) {
188
- // Select a server from manager using read preference option
189
- $ server = $ this ->manager ->selectServer ($ this ->options ['readPreference ' ]);
190
- // Update $this->options['resumeAfter'] from $resumeToken arg
160
+ /* If a resume token was provided, recreate the Aggregate operation
161
+ * using the new resume token. */
191
162
if ($ resumeToken !== null ) {
192
163
$ this ->options ['resumeAfter ' ] = $ resumeToken ;
164
+ $ this ->aggregate = $ this ->createAggregate ();
193
165
}
194
- // Return $this->execute() with the newly selected server
166
+
167
+ /* Select a new server using the read preference, execute this
168
+ * operation on it, and return the new ChangeStream. */
169
+ $ server = $ this ->manager ->selectServer ($ this ->options ['readPreference ' ]);
170
+
195
171
return $ this ->execute ($ server );
196
172
};
197
173
}
0 commit comments