Skip to content

Commit c8ad2e8

Browse files
committed
conat -- fix a bug involving the noThrow option to publish in the core of conat, and add new unit test verifying this (which did fail). This was sometimes causing uncaught exceptions in prod.
1 parent 4f58818 commit c8ad2e8

File tree

2 files changed

+102
-41
lines changed

2 files changed

+102
-41
lines changed

src/packages/backend/conat/test/core/wait-for-interest.test.ts

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
Conat can wait for interest before publishing a message, in case there is none
33
the first time it tries. There is thus only a penality on failure and never
44
on immediate success. We test that here.
5+
6+
pnpm test `pwd`/wait-for-interest.test.ts
57
*/
68

7-
import { before, after, client, delay } from "../setup";
9+
import { before, after, client, connect, delay } from "../setup";
810

911
beforeAll(before);
1012

@@ -75,4 +77,55 @@ describe("test waitForInterest with requestMany", () => {
7577
});
7678
});
7779

80+
describe("async respond tests for interest by default", () => {
81+
it("test making a requested and sending a response with two clients, which works fine", async () => {
82+
const server = await client.subscribe("eval2");
83+
84+
const client2 = connect();
85+
const promise = client2.request("eval2", "2+3");
86+
87+
const { value: mesg } = await server.next();
88+
await mesg.respond(eval(mesg.data));
89+
90+
expect((await promise).data).toEqual(5);
91+
server.close();
92+
});
93+
94+
it("same as previous, but we close the requesting client, causing respond to throw", async () => {
95+
const server = await client.subscribe("eval3");
96+
97+
const client2 = connect();
98+
(async () => {
99+
try {
100+
await client2.request("eval3", "2+3", { timeout: 100 });
101+
} catch {}
102+
})();
103+
const { value: mesg } = await server.next();
104+
client2.close();
105+
try {
106+
await mesg.respond(eval(mesg.data), { timeout: 500 });
107+
throw Error("should time out");
108+
} catch (err) {
109+
// this is what should happen:
110+
expect(`${err}`).toContain("timed out");
111+
}
112+
});
113+
114+
it("same as previous, but we use noThrow to get a silent fail (since we don't care)", async () => {
115+
const server = await client.subscribe("eval4");
116+
117+
const client2 = connect();
118+
(async () => {
119+
try {
120+
await client2.request("eval4", "2+3", { timeout: 100 });
121+
} catch {}
122+
})();
123+
const { value: mesg } = await server.next();
124+
client2.close();
125+
await mesg.respond(eval(mesg.data), { timeout: 500, noThrow: true });
126+
});
127+
128+
129+
});
130+
78131
afterAll(after);

src/packages/conat/core/client.ts

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,51 +1204,58 @@ export class Client extends EventEmitter {
12041204
// **right now** or received these messages.
12051205
count: number;
12061206
}> => {
1207-
if (this.isClosed()) {
1208-
// already closed
1209-
return { bytes: 0, count: 0 };
1210-
}
1211-
await this.waitUntilSignedIn();
1212-
const start = Date.now();
1213-
const { bytes, getCount, promise } = this._publish(subject, mesg, {
1214-
...opts,
1215-
confirm: true,
1216-
});
1217-
await promise;
1218-
let count = getCount?.()!;
1219-
1220-
if (
1221-
opts.waitForInterest &&
1222-
count != null &&
1223-
count == 0 &&
1224-
!this.isClosed() &&
1225-
(opts.timeout == null || Date.now() - start <= opts.timeout)
1226-
) {
1227-
let timeout = opts.timeout ?? DEFAULT_WAIT_FOR_INTEREST_TIMEOUT;
1228-
await this.waitForInterest(subject, {
1229-
timeout: timeout ? timeout - (Date.now() - start) : undefined,
1230-
});
1207+
try {
12311208
if (this.isClosed()) {
1232-
return { bytes, count };
1233-
}
1234-
const elapsed = Date.now() - start;
1235-
timeout -= elapsed;
1236-
// client and there is interest
1237-
if (timeout <= 500) {
1238-
// but... not enough time left to try again even if there is interest,
1239-
// i.e., will fail anyways due to network latency
1240-
return { bytes, count };
1209+
// already closed
1210+
return { bytes: 0, count: 0 };
12411211
}
1242-
const { getCount, promise } = this._publish(subject, mesg, {
1212+
await this.waitUntilSignedIn();
1213+
const start = Date.now();
1214+
const { bytes, getCount, promise } = this._publish(subject, mesg, {
12431215
...opts,
1244-
timeout,
12451216
confirm: true,
12461217
});
12471218
await promise;
1248-
count = getCount?.()!;
1249-
}
1219+
let count = getCount?.()!;
12501220

1251-
return { bytes, count };
1221+
if (
1222+
opts.waitForInterest &&
1223+
count != null &&
1224+
count == 0 &&
1225+
!this.isClosed() &&
1226+
(opts.timeout == null || Date.now() - start <= opts.timeout)
1227+
) {
1228+
let timeout = opts.timeout ?? DEFAULT_WAIT_FOR_INTEREST_TIMEOUT;
1229+
await this.waitForInterest(subject, {
1230+
timeout: timeout ? timeout - (Date.now() - start) : undefined,
1231+
});
1232+
if (this.isClosed()) {
1233+
return { bytes, count };
1234+
}
1235+
const elapsed = Date.now() - start;
1236+
timeout -= elapsed;
1237+
// client and there is interest
1238+
if (timeout <= 500) {
1239+
// but... not enough time left to try again even if there is interest,
1240+
// i.e., will fail anyways due to network latency
1241+
return { bytes, count };
1242+
}
1243+
const { getCount, promise } = this._publish(subject, mesg, {
1244+
...opts,
1245+
timeout,
1246+
confirm: true,
1247+
});
1248+
await promise;
1249+
count = getCount?.()!;
1250+
}
1251+
return { bytes, count };
1252+
} catch (err) {
1253+
if (opts.noThrow) {
1254+
return { bytes: 0, count: 0 };
1255+
} else {
1256+
throw err;
1257+
}
1258+
}
12521259
};
12531260

12541261
private _publish = (
@@ -1609,9 +1616,10 @@ interface PublishOptions {
16091616

16101617
// noThrow -- if set and publishing would throw an exception, it is
16111618
// instead silently dropped and undefined is returned instead.
1619+
// Returned value of bytes and count will are not defined.
16121620
// Use this where you might want to use publishSync, but still want
16131621
// to ensure there is interest; however, it's not important to know
1614-
// if there was an error sending.
1622+
// if there was an error sending or that sending worked.
16151623
noThrow?: boolean;
16161624
}
16171625

@@ -1891,7 +1899,7 @@ export class Message<T = any> extends MessageData<T> {
18911899
return { bytes: 0, count: 0 };
18921900
}
18931901
return await this.client.publish(subject, mesg, {
1894-
// we *always* wait for interest for sync respond, since
1902+
// we *always* wait for interest for async respond, since
18951903
// it is by far the most likely situation where it wil be needed, due
18961904
// to inboxes when users first sign in.
18971905
waitForInterest: true,

0 commit comments

Comments
 (0)