Skip to content

Commit 45c0cb4

Browse files
committed
work in progress fixing a very subtle stream bug
1 parent e12b8de commit 45c0cb4

File tree

8 files changed

+180
-89
lines changed

8 files changed

+180
-89
lines changed

src/packages/backend/conat/test/persist/cluster.test.ts

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
persistServer as persistServer0,
1616
wait,
1717
setDefaultTimeouts,
18+
setDefaultSocketTimeouts,
1819
waitForConsistentState,
1920
} from "../setup";
2021
import { uuid } from "@cocalc/util/misc";
@@ -25,9 +26,14 @@ beforeAll(async () => {
2526
await before();
2627
// this speeds up the automatic failover tests a lot.
2728
setDefaultTimeouts({ request: 1000, publish: 1000 });
29+
setDefaultSocketTimeouts({
30+
command: 1000,
31+
keepAlive: 2000,
32+
keepAliveTimeout: 1000,
33+
});
2834
});
2935

30-
jest.setTimeout(15000);
36+
jest.setTimeout(10000);
3137
describe("test using multiple persist servers in a cluster", () => {
3238
let client0, server1, client1;
3339
it("add another node", async () => {
@@ -142,25 +148,24 @@ describe("test using multiple persist servers in a cluster", () => {
142148
});
143149

144150
it("remove one persist server", async () => {
151+
openStreams1.map((x) => (x.stream.log = true));
152+
console.log("CLOSING PERSIST SERVER");
145153
persistServer1.close();
146-
// [ ] TODO: removing this delay leads to very consistent failures
147-
// involving data loss, but things should just be slower, never broken,
148-
// on automatic failover.
149-
await delay(3000);
150154
});
151155

152-
it("creating / opening streams we made above still work with no data lost", async () => {
153-
for (const project_id of project_ids) {
154-
const s = await client0.sync.dstream({
155-
project_id,
156-
name: "foo",
157-
sync: true,
158-
});
159-
expect(await s.getAll()).toEqual([project_id]);
160-
s.close();
161-
}
162-
expect(Object.keys(persistServer1.sockets).length).toEqual(0);
163-
});
156+
// it.skip("creating / opening streams we made above still work with no data lost", async () => {
157+
// for (const project_id of project_ids) {
158+
// const s = await client0.sync.dstream({
159+
// project_id,
160+
// name: "foo",
161+
// noCache: true,
162+
// sync: true,
163+
// });
164+
// expect(await s.getAll()).toEqual([project_id]);
165+
// s.close();
166+
// }
167+
// expect(Object.keys(persistServer1.sockets).length).toEqual(0);
168+
// });
164169

165170
// this can definitely take a long time (e.g., ~10s), as it involves automatic failover.
166171
it("Checks automatic failover works: the streams connected to both servers we created above must keep working, despite at least one of them having its persist server get closed.", async () => {
@@ -169,9 +174,29 @@ describe("test using multiple persist servers in a cluster", () => {
169174
stream0.publish("y");
170175
await stream0.save();
171176
expect(stream0.hasUnsavedChanges()).toBe(false);
177+
178+
172179
const stream1 = openStreams1[i];
173180
expect(stream0.opts.project_id).toEqual(stream1.opts.project_id);
174-
await wait({ until: () => stream1.length >= 2, timeout: 10000 });
181+
console.log(i, stream1.stream.storage);
182+
await wait({
183+
until: async () => {
184+
console.log(
185+
i,
186+
stream1.stream.client.id,
187+
stream1.stream.id,
188+
stream1.getAll(),
189+
stream1.messages,
190+
(await stream1.stream.persistClient.getAll({ start_seq: 0 }))
191+
.length,
192+
stream1.stream.messages,
193+
stream1.stream.raw,
194+
);
195+
return stream1.length >= 2;
196+
},
197+
timeout: 10000,
198+
start: 1000,
199+
});
175200
expect(stream1.length).toBe(2);
176201
}
177202
});

src/packages/backend/conat/test/setup.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { join } from "path";
1515
import { wait } from "@cocalc/backend/conat/test/util";
1616
import { delay } from "awaiting";
1717
export { setDefaultTimeouts } from "@cocalc/conat/core/client";
18+
export { setDefaultSocketTimeouts } from "@cocalc/conat/socket/util";
1819
import { once } from "@cocalc/util/async-utils";
1920
import { until } from "@cocalc/util/async-utils";
2021
import { randomId } from "@cocalc/conat/names";

src/packages/conat/core/server.ts

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1600,53 +1600,42 @@ export class ConatServer extends EventEmitter {
16001600
links: ClusterLink[],
16011601
): Promise<boolean> => {
16021602
const v: any[] = [];
1603-
let done = false;
1604-
try {
1605-
// we use AbortController etc below so we can cancel waiting once
1606-
// we get any interest.
1607-
const nothrow = async (f) => {
1608-
try {
1609-
return await f;
1610-
} catch (err) {
1611-
if (!done) {
1612-
logger.debug(
1613-
`WARNING: waitForInterestInLinks ${subject} -- ${err}`,
1614-
);
1615-
}
1616-
}
1617-
return false;
1618-
};
1619-
const controller = new AbortController();
1620-
const signal2 = controller.signal;
1621-
v.push(
1622-
nothrow(
1623-
this.waitForInterestOnThisNode(subject, timeout, socketId, signal2),
1624-
),
1625-
);
1626-
for (const link of links) {
1627-
v.push(nothrow(link.waitForInterest(subject, timeout, signal2)));
1628-
}
1629-
if (!timeout) {
1630-
// with timeout=0 they all immediately answer (so no need to worry about abort/promise)
1631-
const w = await Promise.all(v);
1632-
for (const x of w) {
1633-
if (x) {
1634-
return true;
1635-
}
1603+
// we use AbortController etc below so we can cancel waiting once
1604+
// we get any interest.
1605+
const nothrow = async (f) => {
1606+
try {
1607+
return await f;
1608+
} catch {}
1609+
return false;
1610+
};
1611+
const controller = new AbortController();
1612+
const signal2 = controller.signal;
1613+
v.push(
1614+
nothrow(
1615+
this.waitForInterestOnThisNode(subject, timeout, socketId, signal2),
1616+
),
1617+
);
1618+
for (const link of links) {
1619+
v.push(nothrow(link.waitForInterest(subject, timeout, signal2)));
1620+
}
1621+
if (!timeout) {
1622+
// with timeout=0 they all immediately answer (so no need to worry about abort/promise)
1623+
const w = await Promise.all(v);
1624+
for (const x of w) {
1625+
if (x) {
1626+
return true;
16361627
}
1637-
return false;
16381628
}
1629+
return false;
1630+
}
16391631

1640-
signal?.addEventListener("abort", () => {
1641-
controller.abort();
1642-
});
1643-
const w = await Promise.race(v);
1644-
// cancel all the others.
1632+
signal?.addEventListener("abort", () => {
16451633
controller.abort();
1646-
return w;
1647-
} finally {
1648-
done = true;
1649-
}
1634+
});
1635+
const w = await Promise.race(v);
1636+
// cancel all the others.
1637+
controller.abort();
1638+
return w;
16501639
};
16511640

16521641
private waitForInterestOnThisNode = async (

src/packages/conat/persist/client.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import { EventEmitter } from "events";
2222
import { getLogger } from "@cocalc/conat/client";
2323
import { until } from "@cocalc/util/async-utils";
2424

25+
const RECONNECT_DELAY = 1000;
26+
2527
interface GetAllOpts {
2628
start_seq?: number;
2729
end_seq?: number;
@@ -46,6 +48,7 @@ class PersistStreamClient extends EventEmitter {
4648
private reconnecting = false;
4749
private gettingMissed = false;
4850
private changesWhenGettingMissed: ChangefeedEvent[] = [];
51+
id = Math.random();
4952

5053
constructor(
5154
private client: Client,
@@ -61,6 +64,14 @@ class PersistStreamClient extends EventEmitter {
6164
}
6265

6366
private init = () => {
67+
if (this.reconnecting) {
68+
console.log(
69+
this.id,
70+
"persist client reconnecting",
71+
this.client.id,
72+
this.storage.path,
73+
);
74+
}
6475
if (this.client.state == "closed") {
6576
this.close();
6677
return;
@@ -91,15 +102,17 @@ class PersistStreamClient extends EventEmitter {
91102
// console.log("persist client was disconnected", this.storage.path);
92103
this.reconnecting = true;
93104
this.socket.removeAllListeners();
94-
setTimeout(this.init, 1000);
105+
setTimeout(this.init, RECONNECT_DELAY);
95106
});
96107
this.socket.once("closed", () => {
97108
this.reconnecting = true;
98109
this.socket.removeAllListeners();
99-
setTimeout(this.init, 1000);
110+
setTimeout(this.init, RECONNECT_DELAY);
100111
});
101112

102113
this.socket.on("data", (updates, headers) => {
114+
if (this.storage.path.endsWith("foo"))
115+
console.log(this.id, "data", updates, headers);
103116
if (updates == null && headers != null) {
104117
// has to be an error
105118
this.emit(
@@ -131,6 +144,22 @@ class PersistStreamClient extends EventEmitter {
131144
}
132145
try {
133146
await this.socket.waitUntilReady(15000);
147+
if (this.changefeeds.length == 0 || this.state != "ready") {
148+
return true;
149+
}
150+
const resp = await this.socket.request(null, {
151+
headers: {
152+
cmd: "changefeed",
153+
},
154+
});
155+
if (resp.headers?.error) {
156+
throw new ConatError(`${resp.headers?.error}`, {
157+
code: resp.headers?.code,
158+
});
159+
}
160+
if (this.changefeeds.length == 0 || this.state != "ready") {
161+
return true;
162+
}
134163
const updates = await this.getAll({
135164
start_seq: this.lastSeq,
136165
timeout: 15000,

src/packages/conat/socket/util.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,23 @@ export const PING_PONG_INTERVAL = 60000;
2323
// any size.
2424
export const DEFAULT_MAX_QUEUE_SIZE = 1000;
2525

26-
export const DEFAULT_COMMAND_TIMEOUT = 10_000;
26+
export let DEFAULT_COMMAND_TIMEOUT = 10_000;
27+
export let DEFAULT_KEEP_ALIVE = 90_000;
28+
export let DEFAULT_KEEP_ALIVE_TIMEOUT = 15_000;
2729

28-
export const DEFAULT_KEEP_ALIVE = 90_000;
29-
export const DEFAULT_KEEP_ALIVE_TIMEOUT = 15_000;
30+
export function setDefaultSocketTimeouts({
31+
command = DEFAULT_COMMAND_TIMEOUT,
32+
keepAlive = DEFAULT_KEEP_ALIVE,
33+
keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT,
34+
}: {
35+
command?: number;
36+
keepAlive?: number;
37+
keepAliveTimeout?: number;
38+
}) {
39+
DEFAULT_COMMAND_TIMEOUT = command;
40+
DEFAULT_KEEP_ALIVE = keepAlive;
41+
DEFAULT_KEEP_ALIVE_TIMEOUT = keepAliveTimeout;
42+
}
3043

3144
export type Command = "connect" | "close" | "ping" | "socket";
3245

0 commit comments

Comments
 (0)