Skip to content

Commit 81c7ef5

Browse files
committed
fix: improve logging and stream handling in changeStreamReader
1 parent 0d10fca commit 81c7ef5

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

api/parts/data/changeStreamReader.js

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@ class changeStreamReader {
5050
*/
5151
checkState() {
5252
if ((!this.stream || this.stream.closed) && !this.keep_closed) {
53-
console.log("Stream is closed. Setting up again");
53+
log.i(`[${this.name}] Stream is closed. Setting up again`);
5454
this.setUp(this.onData);
5555
}
5656
else if (this.waitingForAcknowledgement && Date.now() - this.waitingForAcknowledgement > 60000) {
57-
console.log("Waiting for acknowledgement for more than 60 seconds. Closing stream and restarting");
57+
const waitTime = Date.now() - this.waitingForAcknowledgement;
58+
log.w(`[${this.name}] Waiting for acknowledgement for ${waitTime}ms (>60s). Closing stream and restarting`);
5859
this.keep_closed = false;
59-
this.stream.close();
60+
if (this.stream && !this.stream.closed) {
61+
this.stream.close();
62+
}
6063
}
6164
}
6265

@@ -180,7 +183,7 @@ class changeStreamReader {
180183
next_token._id = doc.__id;
181184
next_token.cd = doc.cd;
182185
try {
183-
this.processBadRange({name: this.name, cd1: token.cd, cd2: next_token.cd}, this.failedToken);
186+
await this.processBadRange({name: this.name, cd1: token.cd, cd2: next_token.cd}, this.failedToken);
184187
this.onData(next_token, doc);
185188
this.waitingForAcknowledgement = Date.now();
186189
this.restartStream = true;
@@ -272,9 +275,11 @@ class changeStreamReader {
272275
await common.db.collection("plugins").updateOne({"_id": "_changeStreams"}, {$set: {[this.name]: token}}, {"upsert": true});
273276
if (this.restartStream) {
274277
this.waitingForAcknowledgement = false;
275-
this.keep_closed = false;
276278
this.restartStream = false;
277-
this.stream.close();
279+
this.keep_closed = false;
280+
if (this.stream && !this.stream.closed) {
281+
this.stream.close();
282+
}
278283
}
279284
}
280285
catch (err) {
@@ -289,12 +294,14 @@ class changeStreamReader {
289294
console.log("Closing permanently");
290295
if (this.intervalRunner) {
291296
clearInterval(this.intervalRunner);
297+
this.intervalRunner = null;
292298
}
293299
this.keep_closed = true;
294-
this.stream.close(true);
300+
if (this.stream && !this.stream.closed) {
301+
this.stream.close(true);
302+
}
295303
}
296304

297-
298305
}
299306

300307
module.exports = {changeStreamReader};

0 commit comments

Comments
 (0)