@@ -46,7 +46,7 @@ class Watch implements Executable
46
46
private $ collectionName ;
47
47
private $ pipeline ;
48
48
private $ options ;
49
- private $ manager ;
49
+ private $ resumeCallable ;
50
50
51
51
/**
52
52
* Constructs an aggregate command for creating a change stream.
@@ -107,13 +107,13 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
107
107
}
108
108
}
109
109
110
- $ this ->manager = $ manager ;
111
110
$ this ->databaseName = (string ) $ databaseName ;
112
111
$ this ->collectionName = (string ) $ collectionName ;
113
112
$ this ->pipeline = $ pipeline ;
114
113
$ this ->options = $ options ;
115
114
116
115
$ this ->aggregate = $ this ->createAggregate ();
116
+ $ this ->resumeCallable = $ this ->createResumeCallable ($ manager );
117
117
}
118
118
119
119
/**
@@ -129,7 +129,7 @@ public function execute(Server $server)
129
129
{
130
130
$ cursor = $ this ->aggregate ->execute ($ server );
131
131
132
- return new ChangeStream ($ cursor , $ this ->createResumeCallable () );
132
+ return new ChangeStream ($ cursor , $ this ->resumeCallable );
133
133
}
134
134
135
135
/**
@@ -153,9 +153,9 @@ private function createAggregate()
153
153
return new Aggregate ($ this ->databaseName , $ this ->collectionName , $ pipeline , $ aggregateOptions );
154
154
}
155
155
156
- private function createResumeCallable ()
156
+ private function createResumeCallable (Manager $ manager )
157
157
{
158
- return function ($ resumeToken = null ) {
158
+ return function ($ resumeToken = null ) use ( $ manager ) {
159
159
/* If a resume token was provided, recreate the Aggregate operation
160
160
* using the new resume token. */
161
161
if ($ resumeToken !== null ) {
@@ -165,7 +165,7 @@ private function createResumeCallable()
165
165
166
166
/* Select a new server using the read preference, execute this
167
167
* operation on it, and return the new ChangeStream. */
168
- $ server = $ this -> manager ->selectServer ($ this ->options ['readPreference ' ]);
168
+ $ server = $ manager ->selectServer ($ this ->options ['readPreference ' ]);
169
169
170
170
return $ this ->execute ($ server );
171
171
};
0 commit comments