Skip to content

Commit f331a8d

Browse files
committed
working unit tests of backup/archive of persist
1 parent 488226f commit f331a8d

File tree

7 files changed

+352
-20
lines changed

7 files changed

+352
-20
lines changed
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
Testing automatic tiered storage and backup persistence functionality.
3+
*/
4+
5+
import {
6+
before,
7+
after,
8+
connect,
9+
delay,
10+
client,
11+
wait,
12+
} from "@cocalc/backend/conat/test/setup";
13+
import { stream } from "@cocalc/conat/persist/client";
14+
import { syncFiles } from "@cocalc/conat/persist/context";
15+
import { pathExists } from "fs-extra";
16+
import { join } from "path";
17+
import * as fs from "fs/promises";
18+
import { messageData } from "@cocalc/conat/core/client";
19+
import { executeCode } from "@cocalc/backend/execute-code";
20+
import sqlite from "better-sqlite3";
21+
import { openPaths } from "@cocalc/conat/persist/storage";
22+
23+
beforeAll(async () => {
24+
await before({ archive: "archive", backup: "backup", archiveInterval: 250 });
25+
});
26+
27+
describe("create persist server that also saves data to an archive folder and a backup folder", () => {
28+
it("verify that archive, backup and archiveInterval are all configured", async () => {
29+
expect(syncFiles.archive).toContain("archive");
30+
expect(syncFiles.archiveInterval).toBeGreaterThan(0);
31+
expect(syncFiles.backup).toContain("backup");
32+
});
33+
34+
async function waitUntilClosed() {
35+
await wait({
36+
until: () => {
37+
return !openPaths.has(join(syncFiles.local, "hub/foo"));
38+
},
39+
});
40+
}
41+
42+
let s1;
43+
it("create a new stream", async () => {
44+
s1 = stream({
45+
client,
46+
user: { hub_id: "x" },
47+
storage: { path: "hub/foo" },
48+
});
49+
await s1.set({
50+
key: "my-key-1",
51+
messageData: messageData("one"),
52+
});
53+
});
54+
55+
let local, archive, backup;
56+
it(`wait, then there is an updated archive file too`, async () => {
57+
((local = join(syncFiles.local, "hub/foo.db")),
58+
(archive = join(syncFiles.archive, "hub/foo.db")),
59+
(backup = join(syncFiles.backup, "hub/foo.db")),
60+
expect(await pathExists(local)).toBe(true));
61+
// gets created initially
62+
expect(await pathExists(archive)).toBe(true);
63+
// backup should only exist when stream is closed
64+
expect(await pathExists(backup)).toBe(false);
65+
66+
// timestamp before another write
67+
const stats = await fs.stat(archive);
68+
69+
await s1.set({
70+
key: "my-key-2",
71+
messageData: messageData("two"),
72+
});
73+
// now wait to ensure archive gets written
74+
75+
await delay(syncFiles.archiveInterval + 100);
76+
expect(await pathExists(archive)).toBe(true);
77+
const stats2 = await fs.stat(archive);
78+
expect(stats2.mtimeMs).not.toEqual(stats.mtimeMs);
79+
});
80+
81+
it("close the stream and see that the backup and archive are both written, even though we didn't wait the full archive interval", async () => {
82+
s1.close();
83+
const t = Date.now();
84+
await wait({
85+
until: async () => await pathExists(backup),
86+
});
87+
expect(Date.now() - t).toBeLessThan(syncFiles.archiveInterval);
88+
expect(await pathExists(backup)).toBe(true);
89+
// at this point the actual sqlite3 database should be closed
90+
});
91+
92+
const sha1 = async (path) => {
93+
const { stdout } = await executeCode({ command: "sha1sum", args: [path] });
94+
return stdout;
95+
};
96+
97+
it("the backup, archive, and local files should all be identical as sqlite database", async () => {
98+
// they are not the same as files though so we need some care to compare them.
99+
expect(await serialize(local)).toEqual(await serialize(backup));
100+
expect(await serialize(archive)).toEqual(await serialize(backup));
101+
});
102+
103+
it("delete the local copy and open stream, the data is still available", async () => {
104+
await fs.unlink(local);
105+
106+
s1 = stream({
107+
client,
108+
user: { hub_id: "x" },
109+
storage: { path: "hub/foo" },
110+
});
111+
const mesg = await s1.get({ key: "my-key-1" });
112+
expect(mesg.data).toBe("one");
113+
114+
await s1.set({
115+
key: "my-key-3",
116+
messageData: messageData("three"),
117+
});
118+
119+
s1.close();
120+
await waitUntilClosed();
121+
});
122+
123+
it("delete the archive copy and open stream, the data is still available because local is used", async () => {
124+
await fs.unlink(archive);
125+
126+
s1 = stream({
127+
client,
128+
user: { hub_id: "x" },
129+
storage: { path: "hub/foo" },
130+
});
131+
const mesg = await s1.get({ key: "my-key-3" });
132+
expect(mesg.data).toBe("three");
133+
134+
s1.close();
135+
await waitUntilClosed();
136+
});
137+
138+
it("all should identical again sqlite database", async () => {
139+
// they are not the same as files though so we need some care to compare them.
140+
expect(await serialize(local)).toEqual(await serialize(backup));
141+
expect(await serialize(archive)).toEqual(await serialize(backup));
142+
});
143+
144+
it("if both archive and local exist and local is newer, it is used", async () => {
145+
// grab copy of local
146+
const copy = local + ".copy";
147+
await fs.copyFile(local, copy);
148+
149+
s1 = stream({
150+
client,
151+
user: { hub_id: "x" },
152+
storage: { path: "hub/foo" },
153+
});
154+
await s1.set({
155+
key: "my-key-4",
156+
messageData: messageData("four"),
157+
});
158+
fs.unlink(backup);
159+
s1.close();
160+
await wait({
161+
until: async () => await pathExists(backup),
162+
});
163+
164+
// ensure the old copy of local is the newer one by making archive old
165+
await fs.copyFile(copy, local);
166+
await fs.utimes(
167+
archive,
168+
Date.now() / 1000 - 100_000,
169+
Date.now() / 1000 - 100_000,
170+
);
171+
s1 = stream({
172+
client,
173+
user: { hub_id: "x" },
174+
storage: { path: "hub/foo" },
175+
});
176+
expect((await s1.get({ key: "my-key-4" }))?.data).toEqual(undefined);
177+
178+
s1.close();
179+
await waitUntilClosed();
180+
});
181+
182+
it("if both archive and local exist and archive is newer, then archive is used", async () => {
183+
// grab copy of archive
184+
const copy = archive + ".copy";
185+
await fs.copyFile(archive, copy);
186+
187+
s1 = stream({
188+
client,
189+
user: { hub_id: "x" },
190+
storage: { path: "hub/foo" },
191+
});
192+
await s1.set({
193+
key: "my-key-5",
194+
messageData: messageData("five"),
195+
});
196+
s1.close();
197+
await waitUntilClosed();
198+
199+
// ensure the old copy of archive is the newer one by making local old
200+
await fs.copyFile(copy, archive);
201+
await fs.utimes(
202+
local,
203+
Date.now() / 1000 - 100_000,
204+
Date.now() / 1000 - 100_000,
205+
);
206+
s1 = stream({
207+
client,
208+
user: { hub_id: "x" },
209+
storage: { path: "hub/foo" },
210+
});
211+
expect((await s1.get({ key: "my-key-5" }))?.data).toEqual(undefined);
212+
213+
s1.close();
214+
await waitUntilClosed();
215+
});
216+
217+
it("another check all are equal now", async () => {
218+
//console.log("checking equality");
219+
expect(await serialize(local)).toEqual(await serialize(backup));
220+
expect(await serialize(archive)).toEqual(await serialize(backup));
221+
});
222+
223+
it("deletes local and archive but not backup -- data is NOT available", async () => {
224+
await fs.unlink(local);
225+
await fs.unlink(archive);
226+
s1 = stream({
227+
client,
228+
user: { hub_id: "x" },
229+
storage: { path: "hub/foo" },
230+
});
231+
expect((await s1.get({ key: "my-key-1" }))?.data).toEqual(undefined);
232+
});
233+
});
234+
235+
async function serialize(path: string): Promise<string> {
236+
while (true) {
237+
const db = new sqlite(path);
238+
try {
239+
const x = JSON.stringify({
240+
messages: db.prepare("select * from messages").all(),
241+
config: db.prepare("select * from config").all(),
242+
});
243+
db.close();
244+
return x;
245+
} catch (err) {
246+
console.log(err);
247+
}
248+
await delay(50);
249+
}
250+
}
251+
252+
afterAll(async () => {
253+
after();
254+
});

src/packages/backend/conat/test/setup.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,21 @@ export async function restartPersistServer() {
133133

134134
// one pre-made client
135135
export let client;
136-
export async function before() {
136+
export async function before(
137+
opts: { archive?: string; backup?: string; archiveInterval?: number } = {},
138+
) {
139+
// syncFiles and tempDir define where the persist server persists data.
137140
tempDir = await mkdtemp(join(tmpdir(), "conat-test"));
138141
syncFiles.local = join(tempDir, "local");
139-
syncFiles.archive = join(tempDir, "archive");
142+
if (opts.archive) {
143+
syncFiles.archive = join(tempDir, "archive");
144+
}
145+
if (opts.archiveInterval) {
146+
syncFiles.archiveInterval = opts.archiveInterval;
147+
}
148+
if (opts.backup) {
149+
syncFiles.backup = join(tempDir, "backup");
150+
}
140151

141152
server = await createServer();
142153
client = connect();

src/packages/backend/data.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,29 @@ export const projects: string =
181181
process.env.PROJECTS ?? join(data, "projects", "[project_id]");
182182
export const secrets: string = process.env.SECRETS ?? join(data, "secrets");
183183

184+
// Where the sqlite database files used for sync are stored.
185+
// The idea is there is one very fast *ephemeral* directory
186+
// which is used for actively open sqlite database. Optionally,
187+
// data is copied to a file in archive, and on close it is
188+
// copied to backup file.
189+
// When opening the sqlite database, the newer of local and archive is
190+
// used. The backup is ignored in all cases (backup exists ONLY for
191+
// you to make an offline copy or bup archive from or something).
184192
export const syncFiles = {
185193
// Persistent local storage of streams and kv's as sqlite3 files
186194
local: process.env.COCALC_SYNC ?? join(data, "sync"),
187-
// Archived storage of streams and kv's as sqlite3 files, if set.
195+
196+
// OPTIONAL: Archived storage of streams and kv
188197
// This could be a gcsfuse mountpoint.
189198
archive: process.env.COCALC_SYNC_ARCHIVE ?? "",
199+
archiveInterval: parseInt(
200+
process.env.COCALC_SYNC_ARCHIVE_INTERVAL ?? "30000",
201+
),
202+
203+
// OPTIONAL: When storage is closed, a backup is written here:
204+
// This backup is *NOT* used in any way except as a backup; in particular,
205+
// it won't be used even if archive and path were both gone.
206+
backup: process.env.COCALC_SYNC_BACKUP ?? "",
190207
};
191208

192209
// if the directory secrets doesn't exist, create it (sync, during this load):

src/packages/conat/persist/context.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ export let decompress: (data: Buffer) => Buffer = () => {
1919
throw Error("must initialize persist context");
2020
};
2121

22-
export let syncFiles = { local: "", archive: "" };
22+
export let syncFiles = {
23+
local: "",
24+
archive: "",
25+
archiveInterval: 30_000,
26+
backup: "",
27+
};
2328

2429
export let ensureContainingDirectoryExists: (path: string) => Promise<void> = (
2530
_path,
@@ -39,7 +44,12 @@ export function initContext(opts: {
3944
betterSqlite3;
4045
compress: (Buffer) => Buffer;
4146
decompress: (Buffer) => Buffer;
42-
syncFiles: { local: string; archive: string };
47+
syncFiles: {
48+
local: string;
49+
archive: string;
50+
archiveInterval: number;
51+
backup: string;
52+
};
4353
ensureContainingDirectoryExists: (path: string) => Promise<void>;
4454
statSync: (path: string) => { mtimeMs: number };
4555
copyFileSync: (src: string, desc: string) => void;

0 commit comments

Comments
 (0)