19
19
20
20
use MongoDB \BSON \Serializable ;
21
21
use MongoDB \Driver \Cursor ;
22
- use MongoDB \Driver \Exception \ConnectionTimeoutException ;
22
+ use MongoDB \Driver \Exception \ConnectionException ;
23
23
use MongoDB \Driver \Exception \RuntimeException ;
24
+ use MongoDB \Driver \Exception \ServerException ;
24
25
use MongoDB \Exception \InvalidArgumentException ;
25
26
use MongoDB \Exception \ResumeTokenException ;
26
27
use IteratorIterator ;
35
36
*/
36
37
class ChangeStream implements Iterator
37
38
{
39
+ /**
40
+ * @deprecated 1.4
41
+ * @todo Remove this in 2.0 (see: PHPLIB-360)
42
+ */
43
+ const CURSOR_NOT_FOUND = 43 ;
44
+
45
+ private static $ errorCodeCappedPositionLost = 136 ;
46
+ private static $ errorCodeInterrupted = 11601 ;
47
+ private static $ errorCodeCursorKilled = 237 ;
48
+
38
49
private $ resumeToken ;
39
50
private $ resumeCallable ;
40
51
private $ csIt ;
41
52
private $ key = 0 ;
42
53
private $ hasAdvanced = false ;
43
54
44
- const CURSOR_NOT_FOUND = 43 ;
45
-
46
55
/**
47
56
* Constructor.
48
57
*
@@ -91,7 +100,6 @@ public function key()
91
100
*/
92
101
public function next ()
93
102
{
94
- $ resumable = false ;
95
103
try {
96
104
$ this ->csIt ->next ();
97
105
if ($ this ->valid ()) {
@@ -111,18 +119,9 @@ public function next()
111
119
$ this ->resumeCallable = null ;
112
120
}
113
121
} catch (RuntimeException $ e ) {
114
- if (strpos ( $ e -> getMessage (), " not master " ) !== false ) {
115
- $ resumable = true ;
122
+ if ($ this -> isResumableError ( $ e ) ) {
123
+ $ this -> resume () ;
116
124
}
117
- if ($ e ->getCode () === self ::CURSOR_NOT_FOUND ) {
118
- $ resumable = true ;
119
- }
120
- if ($ e instanceof ConnectionTimeoutException) {
121
- $ resumable = true ;
122
- }
123
- }
124
- if ($ resumable ) {
125
- $ this ->resume ();
126
125
}
127
126
}
128
127
@@ -132,7 +131,6 @@ public function next()
132
131
*/
133
132
public function rewind ()
134
133
{
135
- $ resumable = false ;
136
134
try {
137
135
$ this ->csIt ->rewind ();
138
136
if ($ this ->valid ()) {
@@ -144,18 +142,9 @@ public function rewind()
144
142
$ this ->resumeCallable = null ;
145
143
}
146
144
} catch (RuntimeException $ e ) {
147
- if (strpos ( $ e -> getMessage (), " not master " ) !== false ) {
148
- $ resumable = true ;
145
+ if ($ this -> isResumableError ( $ e ) ) {
146
+ $ this -> resume () ;
149
147
}
150
- if ($ e ->getCode () === self ::CURSOR_NOT_FOUND ) {
151
- $ resumable = true ;
152
- }
153
- if ($ e instanceof ConnectionTimeoutException) {
154
- $ resumable = true ;
155
- }
156
- }
157
- if ($ resumable ) {
158
- $ this ->resume ();
159
148
}
160
149
}
161
150
@@ -201,6 +190,30 @@ private function extractResumeToken($document)
201
190
return $ resumeToken ;
202
191
}
203
192
193
+ /**
194
+ * Determines if an exception is a resumable error.
195
+ *
196
+ * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
197
+ * @param RuntimeException $exception
198
+ * @return boolean
199
+ */
200
+ private function isResumableError (RuntimeException $ exception )
201
+ {
202
+ if ($ exception instanceof ConnectionException) {
203
+ return true ;
204
+ }
205
+
206
+ if ( ! $ exception instanceof ServerException) {
207
+ return false ;
208
+ }
209
+
210
+ if (in_array ($ exception ->getCode (), [self ::$ errorCodeCappedPositionLost , self ::$ errorCodeCursorKilled , self ::$ errorCodeInterrupted ])) {
211
+ return false ;
212
+ }
213
+
214
+ return true ;
215
+ }
216
+
204
217
/**
205
218
* Creates a new changeStream after a resumable server error.
206
219
*
0 commit comments