@@ -28,7 +28,7 @@ class changeStreamReader {
2828 this . waitingForAcknowledgement = false ;
2929 this . fallback = options . fallback ;
3030
31- if ( this . fallback && ! this . fallback . inteval ) {
31+ if ( this . fallback && ! this . fallback . interval ) {
3232 this . fallback . interval = 1000 ;
3333 }
3434
@@ -103,33 +103,54 @@ class changeStreamReader {
103103 * @param {Object } tokenInfo - Token info object
104104 */
105105 async processBadRange ( options , tokenInfo ) {
106- console . log ( "Processing bad range" ) ;
107- console . log ( JSON . stringify ( { cd : { $gte : options . cd1 , $lt : options . cd2 } } ) ) ;
106+ log . i ( `[${ this . name } ] Processing bad range for timespan: ${ options . cd1 } to ${ options . cd2 } ` ) ;
107+ log . d ( `[${ this . name } ] Query filter: ${ JSON . stringify ( { cd : { $gte : options . cd1 , $lt : options . cd2 } } ) } ` ) ;
108+
108109 var gotTokenDoc = false ;
109110 var doc ;
110- var cursor = this . db . collection ( this . collection ) . find ( { cd : { $gte : new Date ( options . cd1 ) , $lt : new Date ( options . cd2 ) } } ) . sort ( { cd : 1 } ) ;
111- while ( await cursor . hasNext ( ) && ! gotTokenDoc ) {
112- doc = await cursor . next ( ) ;
113- if ( JSON . stringify ( doc . _id ) === JSON . stringify ( tokenInfo . _id ) || doc . cd > tokenInfo . cd ) {
114- gotTokenDoc = true ;
111+ var cursor ;
112+ try {
113+ cursor = this . db . collection ( this . collection ) . find ( { cd : { $gte : new Date ( options . cd1 ) , $lt : new Date ( options . cd2 ) } } ) . sort ( { cd : 1 } ) ;
114+
115+ while ( await cursor . hasNext ( ) && ! gotTokenDoc ) {
116+ doc = await cursor . next ( ) ;
117+ if ( JSON . stringify ( doc . _id ) === JSON . stringify ( tokenInfo . _id ) || doc . cd > tokenInfo . cd ) {
118+ gotTokenDoc = true ;
119+ }
120+ log . d ( `[${ this . name } ] Skipping document: ${ doc . _id } at ${ doc . cd } ` ) ;
121+ }
122+
123+ if ( doc && doc . cd > tokenInfo . cd ) {
124+ tokenInfo . cd = doc . cd ;
125+ tokenInfo . _id = doc . _id ;
126+ log . d ( `[${ this . name } ] Processing recovered document: ${ doc . _id } at ${ doc . cd } ` ) ;
127+ this . onData ( tokenInfo , doc ) ;
128+ }
129+
130+ while ( await cursor . hasNext ( ) ) {
131+ doc = await cursor . next ( ) ;
132+ log . d ( `[${ this . name } ] Processing document: ${ doc . _id } at ${ doc . cd } ` ) ;
133+ tokenInfo . cd = doc . cd ;
134+ tokenInfo . _id = doc . _id ;
135+ this . onData ( tokenInfo , doc ) ;
115136 }
116- console . log ( "SKIP:" + JSON . stringify ( doc ) ) ;
137+
138+ log . i ( `[${ this . name } ] Bad range processing completed successfully` ) ;
117139 }
118- if ( doc && doc . cd > tokenInfo . cd ) {
119- tokenInfo . cd = doc . cd ;
120- tokenInfo . _id = doc . _id ;
121- console . log ( this . name + " Process:" + JSON . stringify ( doc ) ) ;
122- this . onData ( tokenInfo , doc ) ;
140+ catch ( err ) {
141+ log . e ( `[${ this . name } ] Error during bad range processing` , err ) ;
142+ throw err ;
123143 }
124-
125- while ( await cursor . hasNext ( ) ) {
126- doc = await cursor . next ( ) ;
127- console . log ( this . name + " Process:" + JSON . stringify ( doc ) ) ;
128- tokenInfo . cd = doc . cd ;
129- tokenInfo . _id = doc . _id ;
130- this . onData ( tokenInfo , doc ) ;
144+ finally {
145+ if ( cursor ) {
146+ try {
147+ await cursor . close ( ) ;
148+ }
149+ catch ( err ) {
150+ log . w ( `[${ this . name } ] Error closing cursor during bad range processing` , err ) ;
151+ }
152+ }
131153 }
132- console . log ( "done" ) ;
133154 }
134155
135156 /**
@@ -140,7 +161,7 @@ class changeStreamReader {
140161 var token ;
141162 try {
142163 if ( this . stream && ! this . stream . closed ) {
143- console . log ( " Stream is already open. returning" ) ;
164+ log . d ( `[ ${ this . name } ] Stream is already open, skipping setup` ) ;
144165 return ;
145166 }
146167 var options = JSON . parse ( JSON . stringify ( this . options || { } ) ) ;
@@ -151,15 +172,15 @@ class changeStreamReader {
151172 options . startAfter = token . token ;
152173 }
153174 if ( this . failedToken && JSON . stringify ( this . failedToken . token ) === JSON . stringify ( token . token ) ) {
154- console . log ( "Do not use failed token" ) ;
175+ log . w ( `[ ${ this . name } ] Not using failed token, switching to time-based resume` ) ;
155176 tokenFailed = true ;
156177 delete options . startAfter ;
157178 var startTime = Date . now ( ) . valueOf ( ) / 1000 - 60 ;
158179 if ( startTime ) {
159180 options . startAtOperationTime = new Timestamp ( { t : startTime , i : 1 } ) ;
160181 }
161182 }
162- console . log ( " Stream options: " + JSON . stringify ( options ) ) ;
183+ log . d ( `[ ${ this . name } ] Stream options: ${ JSON . stringify ( options ) } ` ) ;
163184 if ( this . collection ) {
164185 this . stream = await this . db . collection ( this . collection ) . watch ( this . pipeline , options ) ;
165186 }
@@ -170,11 +191,11 @@ class changeStreamReader {
170191
171192 if ( tokenFailed ) {
172193 //fetch data while cd is less than failed token
173- console . log ( " Fetching data while cd is less or equal cd to failed token" ) ;
194+ log . i ( `[ ${ this . name } ] Fetching data while timestamp is less than or equal to failed token timestamp` ) ;
174195 var doc ;
175196 do {
176197 doc = await this . stream . next ( ) ;
177- console . log ( JSON . stringify ( doc ) ) ;
198+ log . d ( `[ ${ this . name } ] Processing document during token recovery: ${ doc ?. _id } at ${ doc ?. cd } ` ) ;
178199 }
179200 while ( doc && doc . cd && doc . cd <= token . cd ) ;
180201 this . keep_closed = true ;
@@ -217,7 +238,7 @@ class changeStreamReader {
217238 this . failedToken = token ;
218239 }
219240 else if ( err . code === 40573 ) { //change stream is not supported
220- console . log ( " Change stream is not supported. Keeping streams closed" ) ;
241+ log . w ( `[ ${ this . name } ] Change streams not supported by database, switching to polling mode` ) ;
221242 this . keep_closed = true ;
222243 var newCD = Date . now ( ) ;
223244 if ( token && token . cd ) {
@@ -242,12 +263,12 @@ class changeStreamReader {
242263 }
243264 catch ( err ) {
244265 if ( err . code === 286 || err . code === 50811 || err . code === 9 ) { //failed because of bad token
245- console . log ( "Set Failed token" , token ) ;
266+ log . w ( `[ ${ this . name } ] Invalid token detected, marking as failed` , { token, error : err . message } ) ;
246267 this . failedToken = token ;
247268 }
248269 //Failed because of db does not support change streams. Run in "query mode";
249270 else if ( err . code === 40573 ) { //change stream is not supported
250- console . log ( " Change stream is not supported. Keeping streams closed" ) ;
271+ log . w ( `[ ${ this . name } ] Change streams not supported by database, switching to polling mode` ) ;
251272 this . keep_closed = true ;
252273 var newCD = Date . now ( ) ;
253274 if ( token && token . cd ) {
@@ -259,7 +280,7 @@ class changeStreamReader {
259280 //Switch to query mode
260281 }
261282 else {
262- log . e ( "Error on change stream" , err ) ;
283+ log . e ( `[ ${ this . name } ] Unexpected error during change stream setup` , err ) ;
263284 }
264285 }
265286 }
@@ -291,7 +312,7 @@ class changeStreamReader {
291312 * Closes stream permanently
292313 */
293314 close ( ) {
294- console . log ( " Closing permanently" ) ;
315+ log . i ( `[ ${ this . name } ] Closing change stream reader permanently` ) ;
295316 if ( this . intervalRunner ) {
296317 clearInterval ( this . intervalRunner ) ;
297318 this . intervalRunner = null ;
0 commit comments