Skip to content

Commit 4b56c97

Browse files
committed
hopefully fix issue #8424 -- hub conat api repeatedly throws an unhandled exception
- this code actually rewrites all of the callback use of socketio client to use their async api, which just makes the code easier to read. I don't think that necessarily fixed anything, though I did restructure the code so it is much clearer, which was enough to convince me I just missed an await / catch somewhere. - indeed, all mesg.respond must be await and catch'd, and there were some that were not! So that's almost certainly the issue. I add a noThrow option, for when there's just no need to catch, and audited the code and found several offenders.
1 parent 386cb68 commit 4b56c97

File tree

7 files changed

+157
-110
lines changed

7 files changed

+157
-110
lines changed

src/packages/conat/core/client.ts

Lines changed: 67 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ import * as msgpack from "@msgpack/msgpack";
209209
import { randomId } from "@cocalc/conat/names";
210210
import type { JSONValue } from "@cocalc/util/types";
211211
import { EventEmitter } from "events";
212-
import { callback } from "awaiting";
213212
import {
214213
isValidSubject,
215214
isValidSubjectWithoutWildcards,
@@ -531,10 +530,7 @@ export class Client extends EventEmitter {
531530
}
532531

533532
cluster = async () => {
534-
return await callback(
535-
this.conn.timeout(10000).emit.bind(this.conn),
536-
"cluster",
537-
);
533+
return await this.conn.timeout(10000).emitWithAck("cluster");
538534
};
539535

540536
disconnect = () => {
@@ -608,20 +604,14 @@ export class Client extends EventEmitter {
608604
);
609605
}
610606
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
611-
const f = (cb) => {
612-
this.conn
613-
.timeout(timeout ? timeout : 10000)
614-
.emit("wait-for-interest", { subject, timeout }, (err, response) => {
615-
if (err) {
616-
cb(err);
617-
} else if (response.error) {
618-
cb(new ConatError(response.error, { code: response.code }));
619-
} else {
620-
cb(undefined, response);
621-
}
622-
});
623-
};
624-
return await callback(f);
607+
const response = await this.conn.timeout(timeout ? timeout : 10000).emitWithAck(
608+
"wait-for-interest",
609+
{ subject, timeout },
610+
);
611+
if (response.error) {
612+
throw new ConatError(response.error, { code: response.code });
613+
}
614+
return response;
625615
};
626616

627617
recvStats = (bytes: number) => {
@@ -827,11 +817,9 @@ export class Client extends EventEmitter {
827817
let stable = true;
828818
if (missing.length > 0) {
829819
stable = false;
830-
const resp = await callback(
831-
this.conn.timeout(timeout).emit.bind(this.conn),
832-
"subscribe",
833-
missing,
834-
);
820+
const resp = await this.conn
821+
.timeout(timeout)
822+
.emitWithAck("subscribe", missing);
835823
// some subscription could fail due to permissions changes, e.g., user got
836824
// removed from a project.
837825
for (let i = 0; i < missing.length; i++) {
@@ -851,11 +839,7 @@ export class Client extends EventEmitter {
851839
}
852840
}
853841
if (extra.length > 0) {
854-
await callback(
855-
this.conn.timeout(timeout).emit.bind(this.conn),
856-
"unsubscribe",
857-
extra,
858-
);
842+
await this.conn.timeout(timeout).emitWithAck("unsubscribe", extra);
859843
stable = false;
860844
}
861845
return stable;
@@ -866,11 +850,9 @@ export class Client extends EventEmitter {
866850
private getSubscriptions = async (
867851
timeout = DEFAULT_REQUEST_TIMEOUT,
868852
): Promise<Set<string>> => {
869-
const subs = await callback(
870-
this.conn.timeout(timeout).emit.bind(this.conn),
871-
"subscriptions",
872-
null,
873-
);
853+
const subs = await this.conn
854+
.timeout(timeout)
855+
.emitWithAck("subscriptions", null);
874856
return new Set(subs);
875857
};
876858

@@ -958,30 +940,29 @@ export class Client extends EventEmitter {
958940
this.stats.subs++;
959941
let promise;
960942
if (confirm) {
961-
const f = (cb) => {
962-
const handle = (response) => {
963-
if (response?.error) {
964-
cb(new ConatError(response.error, { code: response.code }));
943+
const f = async () => {
944+
let response;
945+
try {
946+
if (timeout) {
947+
response = await this.conn
948+
.timeout(timeout)
949+
.emitWithAck("subscribe", { subject, queue });
965950
} else {
966-
cb(response?.error, response);
967-
}
968-
};
969-
if (timeout) {
970-
this.conn
971-
.timeout(timeout)
972-
.emit("subscribe", { subject, queue }, (err, response) => {
973-
if (err) {
974-
handle({ error: `${err}`, code: 408 });
975-
} else {
976-
handle(response);
977-
}
951+
// this should never be used -- see above
952+
response = await this.conn.emitWithAck("subscribe", {
953+
subject,
954+
queue,
978955
});
979-
} else {
980-
// this should never be used -- see above
981-
this.conn.emit("subscribe", { subject, queue }, handle);
956+
}
957+
} catch (err) {
958+
throw new ConatError(`${err}`, { code: 408 });
982959
}
960+
if (response?.error) {
961+
throw new ConatError(response.error, { code: response.code });
962+
}
963+
return response;
983964
};
984-
promise = callback(f);
965+
promise = f();
985966
} else {
986967
this.conn.emit("subscribe", { subject, queue });
987968
promise = undefined;
@@ -1182,7 +1163,8 @@ export class Client extends EventEmitter {
11821163
// already closed
11831164
return { bytes: 0 };
11841165
}
1185-
return this._publish(subject, mesg, opts);
1166+
// must NOT confirm
1167+
return this._publish(subject, mesg, { ...opts, confirm: false });
11861168
};
11871169

11881170
publish = async (
@@ -1254,6 +1236,7 @@ export class Client extends EventEmitter {
12541236
encoding = DEFAULT_ENCODING,
12551237
confirm,
12561238
timeout = DEFAULT_PUBLISH_TIMEOUT,
1239+
noThrow,
12571240
}: PublishOptions & { confirm?: boolean } = {},
12581241
) => {
12591242
if (this.isClosed()) {
@@ -1301,49 +1284,35 @@ export class Client extends EventEmitter {
13011284
v.push(headers);
13021285
}
13031286
if (confirm) {
1304-
let done = false;
1305-
const f = (cb) => {
1306-
const handle = (response) => {
1307-
if (this.state == "closed" && response?.error) {
1308-
if (!process.env.COCALC_TEST_MODE) {
1309-
console.warn(
1310-
"conat client: ignoring outstanding error message since client closed",
1311-
);
1312-
}
1313-
cb(undefined, response);
1314-
return;
1315-
}
1316-
// console.log("_publish", { done, subject, mesg, headers, confirm });
1317-
if (response?.error) {
1318-
cb(new ConatError(response.error, { code: response.code }));
1319-
} else {
1320-
cb(response?.error, response);
1321-
}
1322-
};
1287+
const f = async () => {
13231288
if (timeout) {
1324-
const timer = setTimeout(() => {
1325-
done = true;
1326-
cb(new ConatError("timeout", { code: 408 }));
1327-
}, timeout);
1328-
1329-
this.conn.timeout(timeout).emit("publish", v, (err, response) => {
1330-
if (done) {
1331-
return;
1332-
}
1333-
clearTimeout(timer);
1334-
if (err) {
1335-
handle({ error: `${err}`, code: 408 });
1289+
try {
1290+
const response = await this.conn
1291+
.timeout(timeout)
1292+
.emitWithAck("publish", v);
1293+
if (response?.error) {
1294+
throw new ConatError(response.error, { code: response.code });
13361295
} else {
1337-
handle(response);
1296+
return response;
13381297
}
1339-
});
1298+
} catch (err) {
1299+
throw new ConatError(`timeout - ${subject} - ${err}`, {
1300+
code: 408,
1301+
});
1302+
}
13401303
} else {
1341-
this.conn.emit("publish", v, handle);
1304+
return await this.conn.emitWithAck("publish", v);
13421305
}
13431306
};
13441307
const promise = (async () => {
1345-
const response = await callback(f);
1346-
count = Math.max(count, response.count ?? 0);
1308+
try {
1309+
const response = await f();
1310+
count = Math.max(count, response.count ?? 0);
1311+
} catch (err) {
1312+
if (!noThrow) {
1313+
throw err;
1314+
}
1315+
}
13471316
})();
13481317
promises.push(promise);
13491318
} else {
@@ -1616,6 +1585,13 @@ interface PublishOptions {
16161585
// on success. Note that waitForInterest always has a timeout, defaulting
16171586
// to DEFAULT_WAIT_FOR_INTEREST_TIMEOUT if above timeout not given.
16181587
waitForInterest?: boolean;
1588+
1589+
// noThrow -- if set and publishing would throw an exception, it is
1590+
// instead silently dropped and undefined is returned instead.
1591+
// Use this where you might want to use publishSync, but still want
1592+
// to ensure there is interest; however, it's not important to know
1593+
// if there was an error sending.
1594+
noThrow?: boolean;
16191595
}
16201596

16211597
interface RequestManyOptions extends PublishOptions {

src/packages/conat/files/sync.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
File Sync Services
3+
4+
There's a bunch of these running with access to all project files.
5+
This is somewhat similar to the persist servers in architecture,
6+
except this is to support sync editing of files.
7+
8+
These listen on this sticky subject:
9+
10+
subject = sync.project-{project_id}.edit
11+
12+
User sends a message that contains the path to a file they want to edit.
13+
The fact user can even send the message means they have read/write
14+
privileges to the subject.
15+
16+
17+
the path to a file in a project
18+
19+
OUTPUT: starts (or keeps running) the filesystem aware side of an editing session
20+
21+
*/
22+
23+
// import { type Client } from "@cocalc/conat/core/client";
24+
25+
interface SyncDoc {
26+
close: () => void;
27+
}
28+
29+
export type SyncDocCreator = (opts: {
30+
project_id: string;
31+
path: string;
32+
doctype?: any;
33+
}) => SyncDoc;
34+
35+
/*
36+
interface Options {
37+
client: Client;
38+
39+
// projects = absolute path in filesystem to user projects, so join(projects, project_id)
40+
// is the path to project_id's files.
41+
projects: string;
42+
43+
createSyncDoc: SyncDocCreator;
44+
}
45+
46+
export function init(opts: Options) {
47+
return new SyncServer(opts.client, opts.projects, opts.createSyncDocs);
48+
}
49+
50+
interface Api {
51+
open: (opts: { path: string; doctype?: any }) => Promise<void>;
52+
}
53+
54+
class SyncServer {
55+
constructor(client: Client, projects: string, createSyncDoc: SyncDocCreator) {
56+
this.client.service = await client1.service<Api>("arith.*", {
57+
add: async (a, b) => a + b,
58+
mul: async (a, b) => a * b,
59+
// Here we do NOT use an arrow => function and this is
60+
// bound to the calling mesg, which lets us get the subject.
61+
// Because user identity and permissions are done via wildcard
62+
// subjects, having access to the calling message is critical
63+
async open({ path, doctype }) {
64+
const mesg: Message = this as any;
65+
console.log(mesg.subject);
66+
},
67+
});
68+
}
69+
}
70+
*/

src/packages/conat/socket/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ export class ConatSocketClient extends ConatSocketBase {
176176
return;
177177
} else if (cmd == "ping") {
178178
// logger.silly("responding to ping from server", this.id);
179-
mesg.respond(null);
179+
mesg.respondSync(null);
180180
} else if (mesg.isRequest()) {
181181
// logger.silly("client got request");
182182
this.emit("request", mesg);

src/packages/conat/socket/server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ export class ConatSocketServer extends ConatSocketBase {
191191
mesg.respondSync("closed");
192192
} else if (cmd == "connect") {
193193
// very important that connected is successfully delivered, so do not use respondSync.
194-
mesg.respond("connected");
194+
// Using respond waits for interest.
195+
mesg.respond("connected", { noThrow: true });
195196
} else {
196197
mesg.respondSync({ error: `unknown command - '${cmd}'` });
197198
}

src/packages/project/conat/api/index.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ How to do development (so in a dev project doing cc-in-cc dev).
2323
export DEBUG=cocalc:*
2424
export DEBUG_CONSOLE=yes
2525
26-
If API_KEY is a project-wide API key, then you can change
27-
COCALC_PROJECT_ID however you want and don't have to worry
28-
about whether the project is running or the project secret
26+
If API_KEY is a project-wide API key, then you can change
27+
COCALC_PROJECT_ID however you want and don't have to worry
28+
about whether the project is running or the project secret
2929
key changing when the project is restarted.
3030
3131
2. Then do this:
@@ -103,30 +103,30 @@ async function handleMessage(api, subject, mesg) {
103103
const { service } = request.args[0] ?? {};
104104
if (service == "open-files") {
105105
terminateOpenFiles();
106-
mesg.respond({ status: "terminated", service });
106+
await mesg.respond({ status: "terminated", service });
107107
return;
108108
} else if (service == "listings") {
109109
closeListings();
110-
mesg.respond({ status: "terminated", service });
110+
await mesg.respond({ status: "terminated", service });
111111
return;
112112
} else if (service == "files:read") {
113113
await closeFilesRead();
114-
mesg.respond({ status: "terminated", service });
114+
await mesg.respond({ status: "terminated", service });
115115
return;
116116
} else if (service == "files:write") {
117117
await closeFilesWrite();
118-
mesg.respond({ status: "terminated", service });
118+
await mesg.respond({ status: "terminated", service });
119119
return;
120120
} else if (service == "api") {
121121
// special hook so admin can terminate handling. This is useful for development.
122122
terminate = true;
123123
console.warn("TERMINATING listening on ", subject);
124124
logger.debug("TERMINATING listening on ", subject);
125-
mesg.respond({ status: "terminated", service });
125+
await mesg.respond({ status: "terminated", service });
126126
api.stop();
127127
return;
128128
} else {
129-
mesg.respond({ error: `Unknown service ${service}` });
129+
await mesg.respond({ error: `Unknown service ${service}` });
130130
}
131131
} else {
132132
handleApiRequest(request, mesg);
@@ -147,7 +147,7 @@ async function handleApiRequest(request, mesg) {
147147
resp = { error: `${err}` };
148148
}
149149
}
150-
mesg.respond(resp);
150+
await mesg.respond(resp);
151151
}
152152

153153
import * as system from "./system";

0 commit comments

Comments
 (0)