Skip to content

Commit af874cc

Browse files
committed
core stream/persistent -- be a little more careful with closed state and make warning clearer
- this is an expected warning to happen sometimes - these code changes probably don't actually fix or break anything; just fitting our general style better.
1 parent 2771b4b commit af874cc

File tree

2 files changed

+29
-9
lines changed

2 files changed

+29
-9
lines changed

src/packages/conat/persist/client.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class PersistStreamClient extends EventEmitter {
6565
this.close();
6666
return;
6767
}
68-
if (this.state == "closed") {
68+
if (this.isClosed()) {
6969
return;
7070
}
7171
this.socket?.close();
@@ -173,6 +173,8 @@ class PersistStreamClient extends EventEmitter {
173173
this.emit("changefeed", updates);
174174
};
175175

176+
private isClosed = () => this.state == "closed";
177+
176178
close = () => {
177179
logger.debug("close", this.storage);
178180
// paths.delete(this.storage.path);
@@ -341,6 +343,10 @@ class PersistStreamClient extends EventEmitter {
341343
timeout,
342344
maxWait,
343345
}: GetAllOpts = {}): AsyncGenerator<StoredMessage[], void, unknown> {
346+
if (this.isClosed()) {
347+
// done
348+
return;
349+
}
344350
const sub = await this.socket.requestMany(null, {
345351
headers: {
346352
cmd: "getAll",
@@ -351,6 +357,10 @@ class PersistStreamClient extends EventEmitter {
351357
timeout,
352358
maxWait,
353359
});
360+
if (this.isClosed()) {
361+
// done with this
362+
return;
363+
}
354364
let seq = 0; // next expected seq number for the sub (not the data)
355365
for await (const { data, headers } of sub) {
356366
if (headers?.error) {
@@ -384,9 +394,15 @@ class PersistStreamClient extends EventEmitter {
384394
// This throws with code=503 if something goes wrong due to sequence numbers.
385395
let messages: StoredMessage[] = [];
386396
const sub = await this.getAllIter(opts);
397+
if (this.isClosed()) {
398+
throw Error("closed");
399+
}
387400
for await (const value of sub) {
388401
messages = messages.concat(value);
389402
}
403+
if (this.isClosed()) {
404+
throw Error("closed");
405+
}
390406
return messages;
391407
};
392408

src/packages/conat/sync/core-stream.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ export class CoreStream<T = any> extends EventEmitter {
261261
return await this.persistClient.config({ config });
262262
};
263263

264+
private isClosed = () => {
265+
return this.client === undefined;
266+
};
267+
264268
close = () => {
265269
logger.debug("close", this.name);
266270
delete this.client;
@@ -296,23 +300,20 @@ export class CoreStream<T = any> extends EventEmitter {
296300
}
297301
await until(
298302
async () => {
299-
if (this.client == null) {
303+
if (this.isClosed()) {
300304
return true;
301305
}
302306
let messages: StoredMessage[] = [];
303307
let changes: (SetOperation | DeleteOperation | StoredMessage)[] = [];
304308
let changefeed: any = undefined;
305309
try {
306310
changefeed = await this.persistClient.changefeed();
311+
if (this.isClosed()) {
312+
return true;
313+
}
307314
(async () => {
308315
try {
309316
for await (const updates of changefeed) {
310-
// console.log(
311-
// "getAllFromPersist-changefeed-update",
312-
// this.client.id,
313-
// this.storage.path,
314-
// JSON.stringify(updates.map(({ seq }) => seq)),
315-
// );
316317
changes = changes.concat(updates);
317318
}
318319
} catch {}
@@ -322,9 +323,12 @@ export class CoreStream<T = any> extends EventEmitter {
322323
start_seq,
323324
});
324325
} catch (err) {
326+
if (this.isClosed()) {
327+
return true;
328+
}
325329
if (!process.env.COCALC_TEST_MODE) {
326330
console.log(
327-
`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}, service=${this.service}, storage=${JSON.stringify(this.storage)}`,
331+
`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}, service=${this.service}, storage=${JSON.stringify(this.storage)} -- will retry`,
328332
);
329333
}
330334
if (err.code == 503 || err.code == 408) {

0 commit comments

Comments
 (0)