Skip to content

Commit 84409c9

Browse files
committed
conat: refactor waitForInterest option so works for publish and requestMany as well (and cleaner systematic code); apply this to file transfer to fix bug when using clustering
1 parent fb454e1 commit 84409c9

File tree

4 files changed

+140
-43
lines changed

4 files changed

+140
-43
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Conat can wait for interest before publishing a message, in case there is none
3+
the first time it tries. There is thus only a penality on failure and never
4+
on immediate success. We test that here.
5+
*/
6+
7+
import { before, after, client, delay } from "../setup";
8+
9+
beforeAll(before);
10+
11+
describe("test waitForInterest when publishing", () => {
12+
it("publishes a message that gets dropped, illustrating that waitForInterest is NOT the default", async () => {
13+
const { count } = await client.publish("my.subject", null);
14+
expect(count).toBe(0);
15+
});
16+
17+
it("publishes a message with waitForInterest, then creates a subscription (after publishing) and sees it work", async () => {
18+
const promise = client.publish(
19+
"my.subject",
20+
{ co: "nat" },
21+
{
22+
waitForInterest: true,
23+
},
24+
);
25+
await delay(50);
26+
const sub = await client.subscribe("my.subject");
27+
const { count } = await promise;
28+
expect(count).toBe(1);
29+
const { value } = await sub.next();
30+
expect(value.data).toEqual({ co: "nat" });
31+
});
32+
});
33+
34+
describe("test waitForInterest with request", () => {
35+
it("request throws an error by default if there is no listener", async () => {
36+
expect(async () => {
37+
await client.request("eval.server.com", "2+3");
38+
}).rejects.toThrowError("no subscribers");
39+
});
40+
41+
it("requests with waitForInterest set and sees it work", async () => {
42+
const promise = client.request("eval.server.com", "2+3", {
43+
waitForInterest: true,
44+
});
45+
await delay(50);
46+
const sub = await client.subscribe("eval.server.com");
47+
const { value } = await sub.next();
48+
await value.respond(eval(value.data));
49+
expect((await promise).data).toEqual(5);
50+
});
51+
});
52+
53+
describe("test waitForInterest with requestMany", () => {
54+
it("request throws an error by default if there is no listener", async () => {
55+
expect(async () => {
56+
await client.requestMany("arith.server.com", [2, 3]);
57+
}).rejects.toThrowError("no subscribers");
58+
});
59+
60+
it("requestMany with waitForInterest set and sees it work", async () => {
61+
const promise = client.requestMany("arith.server.com", [2, 3], {
62+
waitForInterest: true,
63+
});
64+
await delay(50);
65+
const sub = await client.subscribe("arith.server.com");
66+
const { value } = await sub.next();
67+
await value.respond(value.data[0] + value.data[1]);
68+
await value.respond(value.data[0] * value.data[1]);
69+
70+
const responseSub = await promise;
71+
const { value: sum } = await responseSub.next();
72+
expect(sum.data).toEqual(5);
73+
const { value: prod } = await responseSub.next();
74+
expect(prod.data).toEqual(6);
75+
});
76+
});
77+
78+
afterAll(after);

src/packages/conat/core/client.ts

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,14 @@ mysteriously terminate subscriptions for a variety of reasons, meaning that any
123123
code using subscriptions had to be wrapped in ugly complexity to be
124124
usable in production.
125125
126+
INTEREST AWARENESS: In Conat there is a cluster-aware event driving way to
127+
wait for interest in a subject. This is an extremely useful extension to
128+
NATS functionality, since it makes it much easier to dynamically setup
129+
a client and a server and exchange messages without having to poll and fail
130+
potentially a few times. This makes certain operations involving complicated
131+
steps behind the scenes -- upload a file, open a file to edit with sync, etc. --
132+
feel more responsive.
133+
126134
USAGE:
127135
128136
The following should mostly work to interactively play around with this
@@ -1172,7 +1180,7 @@ export class Client extends EventEmitter {
11721180
publish = async (
11731181
subject: string,
11741182
mesg,
1175-
opts?: PublishOptions,
1183+
opts: PublishOptions = {},
11761184
): Promise<{
11771185
// bytes encoded (doesn't count some extra wrapping)
11781186
bytes: number;
@@ -1187,12 +1195,45 @@ export class Client extends EventEmitter {
11871195
return { bytes: 0, count: 0 };
11881196
}
11891197
await this.waitUntilSignedIn();
1198+
const start = Date.now();
11901199
const { bytes, getCount, promise } = this._publish(subject, mesg, {
11911200
...opts,
11921201
confirm: true,
11931202
});
11941203
await promise;
1195-
return { bytes, count: getCount?.()! };
1204+
let count = getCount?.()!;
1205+
1206+
if (
1207+
opts.waitForInterest &&
1208+
count != null &&
1209+
count == 0 &&
1210+
!this.isClosed() &&
1211+
(opts.timeout == null || Date.now() - start <= opts.timeout)
1212+
) {
1213+
await this.waitForInterest(subject, {
1214+
timeout: opts.timeout ? opts.timeout - (Date.now() - start) : undefined,
1215+
});
1216+
if (this.isClosed()) {
1217+
return { bytes, count };
1218+
}
1219+
const elapsed = Date.now() - start;
1220+
const timeout = opts.timeout == null ? undefined : opts.timeout - elapsed;
1221+
// client and there is interest
1222+
if (timeout && timeout <= 500) {
1223+
// but... not enough time left to try again even if there is interest,
1224+
// i.e., will fail anyways due to network latency
1225+
return { bytes, count };
1226+
}
1227+
const { getCount, promise } = this._publish(subject, mesg, {
1228+
...opts,
1229+
timeout,
1230+
confirm: true,
1231+
});
1232+
await promise;
1233+
count = getCount?.()!;
1234+
}
1235+
1236+
return { bytes, count };
11961237
};
11971238

11981239
private _publish = (
@@ -1316,19 +1357,11 @@ export class Client extends EventEmitter {
13161357
request = async (
13171358
subject: string,
13181359
mesg: any,
1319-
{
1320-
timeout = DEFAULT_REQUEST_TIMEOUT,
1321-
// waitForInterest -- if publish fails due to no receivers and
1322-
// waitForInterest is true, will wait until there is a receiver
1323-
// and publish again:
1324-
waitForInterest = false,
1325-
...options
1326-
}: PublishOptions & { timeout?: number; waitForInterest?: boolean } = {},
1360+
{ timeout = DEFAULT_REQUEST_TIMEOUT, ...options }: PublishOptions = {},
13271361
): Promise<Message> => {
13281362
if (timeout <= 0) {
13291363
throw Error("timeout must be positive");
13301364
}
1331-
const start = Date.now();
13321365
const inbox = await this.getInbox();
13331366
const inboxSubject = this.temporaryInboxSubject();
13341367
const sub = new EventIterator<Message>(inbox, inboxSubject, {
@@ -1343,39 +1376,13 @@ export class Client extends EventEmitter {
13431376
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
13441377
};
13451378
const { count } = await this.publish(subject, mesg, opts);
1346-
13471379
if (!count) {
1348-
const giveUp = () => {
1349-
sub.stop();
1350-
throw new ConatError(
1351-
`request -- no subscribers matching '${subject}'`,
1352-
{
1353-
code: 503,
1354-
},
1355-
);
1356-
};
1357-
if (waitForInterest) {
1358-
await this.waitForInterest(subject, { timeout });
1359-
if (this.state == "closed") {
1360-
throw Error("closed");
1361-
}
1362-
const remaining = timeout - (Date.now() - start);
1363-
if (remaining <= 1000) {
1364-
throw new ConatError("timeout", { code: 408 });
1365-
}
1366-
// no error so there is very likely now interest, so we publish again:
1367-
const { count } = await this.publish(subject, mesg, {
1368-
...opts,
1369-
timeout: remaining,
1370-
});
1371-
if (!count) {
1372-
giveUp();
1373-
}
1374-
} else {
1375-
giveUp();
1376-
}
1380+
sub.stop();
1381+
// if you hit this, consider using the option waitForInterest:true
1382+
throw new ConatError(`request -- no subscribers matching '${subject}'`, {
1383+
code: 503,
1384+
});
13771385
}
1378-
13791386
for await (const resp of sub) {
13801387
sub.stop();
13811388
return resp;
@@ -1588,8 +1595,17 @@ interface PublishOptions {
15881595
// encoded message (using encoding) and any mesg parameter
15891596
// is *IGNORED*.
15901597
raw?;
1598+
15911599
// timeout used when publishing a message and awaiting a response.
15921600
timeout?: number;
1601+
1602+
// waitForInterest -- if publishing async so its possible to tell whether or not
1603+
// there were any recipients, and there were NO recipients, it will wait until
1604+
// there is a recipient and send again. This does NOT use polling, but instead
1605+
// uses a cluster aware and fully event based primitive in the server.
1606+
// There is thus only a speed penality doing this on failure and never
1607+
// on success.
1608+
waitForInterest?: boolean;
15931609
}
15941610

15951611
interface RequestManyOptions extends PublishOptions {

src/packages/conat/files/read.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ export async function* readFile({
149149
subject,
150150
{ path },
151151
{
152+
// waitForInterest is extremely important because of the timing
153+
// of how readFile gets used by writeFile in write.ts.
154+
waitForInterest: true,
152155
maxWait,
153156
},
154157
)) {

src/packages/conat/files/write.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ export async function writeFile({
206206
compute_server_id,
207207
name,
208208
});
209-
// tell compute server to start reading our file.
209+
// tell compute server / project to start reading our file.
210210
const cn = await conat();
211211
const resp = await cn.request(
212212
getWriteSubject({ project_id, compute_server_id }),

0 commit comments

Comments
 (0)