@@ -28,7 +28,7 @@ class changeStreamReader {
2828 this . waitingForAcknowledgement = false ;
2929 this . fallback = options . fallback ;
3030
31- if ( this . fallback && ! this . fallback . inteval ) {
31+ if ( this . fallback && ! this . fallback . interval ) {
3232 this . fallback . interval = 1000 ;
3333 }
3434
@@ -50,13 +50,16 @@ class changeStreamReader {
5050 */
5151 checkState ( ) {
5252 if ( ( ! this . stream || this . stream . closed ) && ! this . keep_closed ) {
53- console . log ( " Stream is closed. Setting up again" ) ;
53+ log . i ( `[ ${ this . name } ] Stream is closed. Setting up again` ) ;
5454 this . setUp ( this . onData ) ;
5555 }
5656 else if ( this . waitingForAcknowledgement && Date . now ( ) - this . waitingForAcknowledgement > 60000 ) {
57- console . log ( "Waiting for acknowledgement for more than 60 seconds. Closing stream and restarting" ) ;
57+ const waitTime = Date . now ( ) - this . waitingForAcknowledgement ;
58+ log . w ( `[${ this . name } ] Waiting for acknowledgement for ${ waitTime } ms (>60s). Closing stream and restarting` ) ;
5859 this . keep_closed = false ;
59- this . stream . close ( ) ;
60+ if ( this . stream && ! this . stream . closed ) {
61+ this . stream . close ( ) ;
62+ }
6063 }
6164 }
6265
@@ -100,33 +103,54 @@ class changeStreamReader {
100103 * @param {Object } tokenInfo - Token info object
101104 */
102105 async processBadRange ( options , tokenInfo ) {
103- console . log ( "Processing bad range" ) ;
104- console . log ( JSON . stringify ( { cd : { $gte : options . cd1 , $lt : options . cd2 } } ) ) ;
106+ log . i ( `[${ this . name } ] Processing bad range for timespan: ${ options . cd1 } to ${ options . cd2 } ` ) ;
107+ log . d ( `[${ this . name } ] Query filter: ${ JSON . stringify ( { cd : { $gte : options . cd1 , $lt : options . cd2 } } ) } ` ) ;
108+
105109 var gotTokenDoc = false ;
106110 var doc ;
107- var cursor = this . db . collection ( this . collection ) . find ( { cd : { $gte : new Date ( options . cd1 ) , $lt : new Date ( options . cd2 ) } } ) . sort ( { cd : 1 } ) ;
108- while ( await cursor . hasNext ( ) && ! gotTokenDoc ) {
109- doc = await cursor . next ( ) ;
110- if ( JSON . stringify ( doc . _id ) === JSON . stringify ( tokenInfo . _id ) || doc . cd > tokenInfo . cd ) {
111- gotTokenDoc = true ;
111+ var cursor ;
112+ try {
113+ cursor = this . db . collection ( this . collection ) . find ( { cd : { $gte : new Date ( options . cd1 ) , $lt : new Date ( options . cd2 ) } } ) . sort ( { cd : 1 } ) ;
114+
115+ while ( await cursor . hasNext ( ) && ! gotTokenDoc ) {
116+ doc = await cursor . next ( ) ;
117+ if ( JSON . stringify ( doc . _id ) === JSON . stringify ( tokenInfo . _id ) || doc . cd > tokenInfo . cd ) {
118+ gotTokenDoc = true ;
119+ }
120+ log . d ( `[${ this . name } ] Skipping document: ${ doc . _id } at ${ doc . cd } ` ) ;
112121 }
113- console . log ( "SKIP:" + JSON . stringify ( doc ) ) ;
122+
123+ if ( doc && doc . cd > tokenInfo . cd ) {
124+ tokenInfo . cd = doc . cd ;
125+ tokenInfo . _id = doc . _id ;
126+ log . d ( `[${ this . name } ] Processing recovered document: ${ doc . _id } at ${ doc . cd } ` ) ;
127+ this . onData ( tokenInfo , doc ) ;
128+ }
129+
130+ while ( await cursor . hasNext ( ) ) {
131+ doc = await cursor . next ( ) ;
132+ log . d ( `[${ this . name } ] Processing document: ${ doc . _id } at ${ doc . cd } ` ) ;
133+ tokenInfo . cd = doc . cd ;
134+ tokenInfo . _id = doc . _id ;
135+ this . onData ( tokenInfo , doc ) ;
136+ }
137+
138+ log . i ( `[${ this . name } ] Bad range processing completed successfully` ) ;
114139 }
115- if ( doc && doc . cd > tokenInfo . cd ) {
116- tokenInfo . cd = doc . cd ;
117- tokenInfo . _id = doc . _id ;
118- console . log ( this . name + " Process:" + JSON . stringify ( doc ) ) ;
119- this . onData ( tokenInfo , doc ) ;
140+ catch ( err ) {
141+ log . e ( `[${ this . name } ] Error during bad range processing` , err ) ;
142+ throw err ;
120143 }
121-
122- while ( await cursor . hasNext ( ) ) {
123- doc = await cursor . next ( ) ;
124- console . log ( this . name + " Process:" + JSON . stringify ( doc ) ) ;
125- tokenInfo . cd = doc . cd ;
126- tokenInfo . _id = doc . _id ;
127- this . onData ( tokenInfo , doc ) ;
144+ finally {
145+ if ( cursor ) {
146+ try {
147+ await cursor . close ( ) ;
148+ }
149+ catch ( err ) {
150+ log . w ( `[${ this . name } ] Error closing cursor during bad range processing` , err ) ;
151+ }
152+ }
128153 }
129- console . log ( "done" ) ;
130154 }
131155
132156 /**
@@ -137,7 +161,7 @@ class changeStreamReader {
137161 var token ;
138162 try {
139163 if ( this . stream && ! this . stream . closed ) {
140- console . log ( " Stream is already open. returning" ) ;
164+ log . d ( `[ ${ this . name } ] Stream is already open, skipping setup` ) ;
141165 return ;
142166 }
143167 var options = JSON . parse ( JSON . stringify ( this . options || { } ) ) ;
@@ -148,15 +172,15 @@ class changeStreamReader {
148172 options . startAfter = token . token ;
149173 }
150174 if ( this . failedToken && JSON . stringify ( this . failedToken . token ) === JSON . stringify ( token . token ) ) {
151- console . log ( "Do not use failed token" ) ;
175+ log . w ( `[ ${ this . name } ] Not using failed token, switching to time-based resume` ) ;
152176 tokenFailed = true ;
153177 delete options . startAfter ;
154178 var startTime = Date . now ( ) . valueOf ( ) / 1000 - 60 ;
155179 if ( startTime ) {
156180 options . startAtOperationTime = new Timestamp ( { t : startTime , i : 1 } ) ;
157181 }
158182 }
159- console . log ( " Stream options: " + JSON . stringify ( options ) ) ;
183+ log . d ( `[ ${ this . name } ] Stream options: ${ JSON . stringify ( options ) } ` ) ;
160184 if ( this . collection ) {
161185 this . stream = await this . db . collection ( this . collection ) . watch ( this . pipeline , options ) ;
162186 }
@@ -167,11 +191,11 @@ class changeStreamReader {
167191
168192 if ( tokenFailed ) {
169193 //fetch data while cd is less than failed token
170- console . log ( " Fetching data while cd is less or equal cd to failed token" ) ;
194+ log . i ( `[ ${ this . name } ] Fetching data while timestamp is less than or equal to failed token timestamp` ) ;
171195 var doc ;
172196 do {
173197 doc = await this . stream . next ( ) ;
174- console . log ( JSON . stringify ( doc ) ) ;
198+ log . d ( `[ ${ this . name } ] Processing document during token recovery: ${ doc ?. _id } at ${ doc ?. cd } ` ) ;
175199 }
176200 while ( doc && doc . cd && doc . cd <= token . cd ) ;
177201 this . keep_closed = true ;
@@ -180,7 +204,7 @@ class changeStreamReader {
180204 next_token . _id = doc . __id ;
181205 next_token . cd = doc . cd ;
182206 try {
183- this . processBadRange ( { name : this . name , cd1 : token . cd , cd2 : next_token . cd } , this . failedToken ) ;
207+ await this . processBadRange ( { name : this . name , cd1 : token . cd , cd2 : next_token . cd } , this . failedToken ) ;
184208 this . onData ( next_token , doc ) ;
185209 this . waitingForAcknowledgement = Date . now ( ) ;
186210 this . restartStream = true ;
@@ -214,7 +238,7 @@ class changeStreamReader {
214238 this . failedToken = token ;
215239 }
216240 else if ( err . code === 40573 ) { //change stream is not supported
217- console . log ( " Change stream is not supported. Keeping streams closed" ) ;
241+ log . w ( `[ ${ this . name } ] Change streams not supported by database, switching to polling mode` ) ;
218242 this . keep_closed = true ;
219243 var newCD = Date . now ( ) ;
220244 if ( token && token . cd ) {
@@ -239,12 +263,12 @@ class changeStreamReader {
239263 }
240264 catch ( err ) {
241265 if ( err . code === 286 || err . code === 50811 || err . code === 9 ) { //failed because of bad token
242- console . log ( "Set Failed token" , token ) ;
266+ log . w ( `[ ${ this . name } ] Invalid token detected, marking as failed` , { token, error : err . message } ) ;
243267 this . failedToken = token ;
244268 }
245269 //Failed because of db does not support change streams. Run in "query mode";
246270 else if ( err . code === 40573 ) { //change stream is not supported
247- console . log ( " Change stream is not supported. Keeping streams closed" ) ;
271+ log . w ( `[ ${ this . name } ] Change streams not supported by database, switching to polling mode` ) ;
248272 this . keep_closed = true ;
249273 var newCD = Date . now ( ) ;
250274 if ( token && token . cd ) {
@@ -256,7 +280,7 @@ class changeStreamReader {
256280 //Switch to query mode
257281 }
258282 else {
259- log . e ( "Error on change stream" , err ) ;
283+ log . e ( `[ ${ this . name } ] Unexpected error during change stream setup` , err ) ;
260284 }
261285 }
262286 }
@@ -272,9 +296,11 @@ class changeStreamReader {
272296 await common . db . collection ( "plugins" ) . updateOne ( { "_id" : "_changeStreams" } , { $set : { [ this . name ] : token } } , { "upsert" : true } ) ;
273297 if ( this . restartStream ) {
274298 this . waitingForAcknowledgement = false ;
275- this . keep_closed = false ;
276299 this . restartStream = false ;
277- this . stream . close ( ) ;
300+ this . keep_closed = false ;
301+ if ( this . stream && ! this . stream . closed ) {
302+ this . stream . close ( ) ;
303+ }
278304 }
279305 }
280306 catch ( err ) {
@@ -286,15 +312,17 @@ class changeStreamReader {
286312 * Closes stream permanently
287313 */
288314 close ( ) {
289- console . log ( " Closing permanently" ) ;
315+ log . i ( `[ ${ this . name } ] Closing change stream reader permanently` ) ;
290316 if ( this . intervalRunner ) {
291317 clearInterval ( this . intervalRunner ) ;
318+ this . intervalRunner = null ;
292319 }
293320 this . keep_closed = true ;
294- this . stream . close ( true ) ;
321+ if ( this . stream && ! this . stream . closed ) {
322+ this . stream . close ( true ) ;
323+ }
295324 }
296325
297-
298326}
299327
300328module . exports = { changeStreamReader} ;
0 commit comments