diff --git a/api/parts/data/changeStreamReader.js b/api/parts/data/changeStreamReader.js index 60395b8bd8a..888d5da4611 100644 --- a/api/parts/data/changeStreamReader.js +++ b/api/parts/data/changeStreamReader.js @@ -28,7 +28,7 @@ class changeStreamReader { this.waitingForAcknowledgement = false; this.fallback = options.fallback; - if (this.fallback && !this.fallback.inteval) { + if (this.fallback && !this.fallback.interval) { this.fallback.interval = 1000; } @@ -50,13 +50,16 @@ class changeStreamReader { */ checkState() { if ((!this.stream || this.stream.closed) && !this.keep_closed) { - console.log("Stream is closed. Setting up again"); + log.i(`[${this.name}] Stream is closed. Setting up again`); this.setUp(this.onData); } else if (this.waitingForAcknowledgement && Date.now() - this.waitingForAcknowledgement > 60000) { - console.log("Waiting for acknowledgement for more than 60 seconds. Closing stream and restarting"); + const waitTime = Date.now() - this.waitingForAcknowledgement; + log.w(`[${this.name}] Waiting for acknowledgement for ${waitTime}ms (>60s). Closing stream and restarting`); this.keep_closed = false; - this.stream.close(); + if (this.stream && !this.stream.closed) { + this.stream.close(); + } } } @@ -100,33 +103,54 @@ class changeStreamReader { * @param {Object} tokenInfo - Token info object */ async processBadRange(options, tokenInfo) { - console.log("Processing bad range"); - console.log(JSON.stringify({cd: {$gte: options.cd1, $lt: options.cd2}})); + log.i(`[${this.name}] Processing bad range for timespan: ${options.cd1} to ${options.cd2}`); + log.d(`[${this.name}] Query filter: ${JSON.stringify({cd: {$gte: options.cd1, $lt: options.cd2}})}`); + var gotTokenDoc = false; var doc; - var cursor = this.db.collection(this.collection).find({cd: {$gte: new Date(options.cd1), $lt: new Date(options.cd2)}}).sort({cd: 1}); - while (await cursor.hasNext() && !gotTokenDoc) { - doc = await cursor.next(); - if (JSON.stringify(doc._id) === JSON.stringify(tokenInfo._id) || doc.cd > tokenInfo.cd) { - gotTokenDoc = true; + var cursor; + try { + cursor = this.db.collection(this.collection).find({cd: {$gte: new Date(options.cd1), $lt: new Date(options.cd2)}}).sort({cd: 1}); + + while (await cursor.hasNext() && !gotTokenDoc) { + doc = await cursor.next(); + if (JSON.stringify(doc._id) === JSON.stringify(tokenInfo._id) || doc.cd > tokenInfo.cd) { + gotTokenDoc = true; + } + log.d(`[${this.name}] Skipping document: ${doc._id} at ${doc.cd}`); } - console.log("SKIP:" + JSON.stringify(doc)); + + if (doc && doc.cd > tokenInfo.cd) { + tokenInfo.cd = doc.cd; + tokenInfo._id = doc._id; + log.d(`[${this.name}] Processing recovered document: ${doc._id} at ${doc.cd}`); + this.onData(tokenInfo, doc); + } + + while (await cursor.hasNext()) { + doc = await cursor.next(); + log.d(`[${this.name}] Processing document: ${doc._id} at ${doc.cd}`); + tokenInfo.cd = doc.cd; + tokenInfo._id = doc._id; + this.onData(tokenInfo, doc); + } + + log.i(`[${this.name}] Bad range processing completed successfully`); } - if (doc && doc.cd > tokenInfo.cd) { - tokenInfo.cd = doc.cd; - tokenInfo._id = doc._id; - console.log(this.name + " Process:" + JSON.stringify(doc)); - this.onData(tokenInfo, doc); + catch (err) { + log.e(`[${this.name}] Error during bad range processing`, err); + throw err; } - - while (await cursor.hasNext()) { - doc = await cursor.next(); - console.log(this.name + " Process:" + JSON.stringify(doc)); - tokenInfo.cd = doc.cd; - tokenInfo._id = doc._id; - this.onData(tokenInfo, doc); + finally { + if (cursor) { + try { + await cursor.close(); + } + catch (err) { + log.w(`[${this.name}] Error closing cursor during bad range processing`, err); + } + } } - console.log("done"); } /** @@ -137,7 +161,7 @@ class changeStreamReader { var token; try { if (this.stream && !this.stream.closed) { - console.log("Stream is already open. returning"); + log.d(`[${this.name}] Stream is already open, skipping setup`); return; } var options = JSON.parse(JSON.stringify(this.options || {})); @@ -148,7 +172,7 @@ class changeStreamReader { options.startAfter = token.token; } if (this.failedToken && JSON.stringify(this.failedToken.token) === JSON.stringify(token.token)) { - console.log("Do not use failed token"); + log.w(`[${this.name}] Not using failed token, switching to time-based resume`); tokenFailed = true; delete options.startAfter; var startTime = Date.now().valueOf() / 1000 - 60; @@ -156,7 +180,7 @@ class changeStreamReader { options.startAtOperationTime = new Timestamp({t: startTime, i: 1}); } } - console.log("Stream options: " + JSON.stringify(options)); + log.d(`[${this.name}] Stream options: ${JSON.stringify(options)}`); if (this.collection) { this.stream = await this.db.collection(this.collection).watch(this.pipeline, options); } @@ -167,11 +191,11 @@ class changeStreamReader { if (tokenFailed) { //fetch data while cd is less than failed token - console.log("Fetching data while cd is less or equal cd to failed token"); + log.i(`[${this.name}] Fetching data while timestamp is less than or equal to failed token timestamp`); var doc; do { doc = await this.stream.next(); - console.log(JSON.stringify(doc)); + log.d(`[${this.name}] Processing document during token recovery: ${doc?._id} at ${doc?.cd}`); } while (doc && doc.cd && doc.cd <= token.cd); this.keep_closed = true; @@ -180,7 +204,7 @@ class changeStreamReader { next_token._id = doc.__id; next_token.cd = doc.cd; try { - this.processBadRange({name: this.name, cd1: token.cd, cd2: next_token.cd}, this.failedToken); + await this.processBadRange({name: this.name, cd1: token.cd, cd2: next_token.cd}, this.failedToken); this.onData(next_token, doc); this.waitingForAcknowledgement = Date.now(); this.restartStream = true; @@ -214,7 +238,7 @@ class changeStreamReader { this.failedToken = token; } else if (err.code === 40573) { //change stream is not supported - console.log("Change stream is not supported. Keeping streams closed"); + log.w(`[${this.name}] Change streams not supported by database, switching to polling mode`); this.keep_closed = true; var newCD = Date.now(); if (token && token.cd) { @@ -239,12 +263,12 @@ class changeStreamReader { } catch (err) { if (err.code === 286 || err.code === 50811 || err.code === 9) { //failed because of bad token - console.log("Set Failed token", token); + log.w(`[${this.name}] Invalid token detected, marking as failed`, {token, error: err.message}); this.failedToken = token; } //Failed because of db does not support change streams. Run in "query mode"; else if (err.code === 40573) { //change stream is not supported - console.log("Change stream is not supported. Keeping streams closed"); + log.w(`[${this.name}] Change streams not supported by database, switching to polling mode`); this.keep_closed = true; var newCD = Date.now(); if (token && token.cd) { @@ -256,7 +280,7 @@ class changeStreamReader { //Switch to query mode } else { - log.e("Error on change stream", err); + log.e(`[${this.name}] Unexpected error during change stream setup`, err); } } } @@ -272,9 +296,11 @@ class changeStreamReader { await common.db.collection("plugins").updateOne({"_id": "_changeStreams"}, {$set: {[this.name]: token}}, {"upsert": true}); if (this.restartStream) { this.waitingForAcknowledgement = false; - this.keep_closed = false; this.restartStream = false; - this.stream.close(); + this.keep_closed = false; + if (this.stream && !this.stream.closed) { + this.stream.close(); + } } } catch (err) { @@ -286,15 +312,17 @@ class changeStreamReader { * Closes stream permanently */ close() { - console.log("Closing permanently"); + log.i(`[${this.name}] Closing change stream reader permanently`); if (this.intervalRunner) { clearInterval(this.intervalRunner); + this.intervalRunner = null; } this.keep_closed = true; - this.stream.close(true); + if (this.stream && !this.stream.closed) { + this.stream.close(true); + } } - } module.exports = {changeStreamReader}; \ No newline at end of file