Skip to content

Commit a8b12f8

Browse files
committed
implement archive support for persist (not tested yet -- breaks tons of tests)
1 parent 20c2fdb commit a8b12f8

File tree

3 files changed

+107
-19
lines changed

3 files changed

+107
-19
lines changed

src/packages/backend/conat/persist.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ import { initContext } from "@cocalc/conat/persist/context";
1414
import { compress, decompress } from "zstd-napi";
1515
import { syncFiles } from "@cocalc/backend/data";
1616
import ensureContainingDirectoryExists from "@cocalc/backend/misc/ensure-containing-directory-exists";
17+
import { statSync, copyFileSync } from "node:fs";
1718

1819
initContext({
1920
betterSqlite3,
2021
compress,
2122
decompress,
2223
syncFiles,
2324
ensureContainingDirectoryExists,
25+
statSync,
26+
copyFileSync,
2427
});
2528

2629
export { pstream } from "@cocalc/conat/persist/storage";

src/packages/conat/persist/context.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,30 @@ export let ensureContainingDirectoryExists: (path: string) => Promise<void> = (
2727
throw Error("must initialize persist context");
2828
};
2929

30+
export let statSync = (_path: string): { mtimeMs: number } => {
31+
throw Error("must initialize persist context");
32+
};
33+
34+
export let copyFileSync = (_src: string, _desc: string): void => {
35+
throw Error("must initialize persist context");
36+
};
37+
3038
export function initContext(opts: {
3139
betterSqlite3;
3240
compress: (Buffer) => Buffer;
3341
decompress: (Buffer) => Buffer;
3442
syncFiles: { local: string; archive: string };
3543
ensureContainingDirectoryExists: (path: string) => Promise<void>;
44+
statSync: (path: string) => { mtimeMs: number };
45+
copyFileSync: (src: string, desc: string) => void;
3646
}) {
3747
betterSqlite3 = opts.betterSqlite3;
3848
compress = opts.compress;
3949
decompress = opts.decompress;
4050
syncFiles = opts.syncFiles;
4151
ensureContainingDirectoryExists = opts.ensureContainingDirectoryExists;
52+
statSync = opts.statSync;
53+
copyFileSync = opts.copyFileSync;
4254
}
4355

4456
export function createDatabase(...args): Database {

src/packages/conat/persist/storage.ts

Lines changed: 92 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ This module is:
1717
1818
We care about memory efficiency here since it's likely we'll want to have
1919
possibly thousands of these in a single nodejs process at once, but with
20-
less than 1 read/write per second for each. Thus memory is critical, and
20+
less than 1 read/write per second for each. Thus memory is critical, and
2121
supporting at least 1000 writes/second is what we need.
22-
Fortunately, this implementation can do ~50,000+ writes per second and read
23-
over 500,000 per second. Yes, it blocks the main thread, but by using
24-
better-sqlite3 and zstd-napi, we get 10x speed increases over async code,
22+
Fortunately, this implementation can do ~50,000+ writes per second and read
23+
over 500,000 per second. Yes, it blocks the main thread, but by using
24+
better-sqlite3 and zstd-napi, we get 10x speed increases over async code,
2525
so this is worth it.
2626
2727
@@ -31,16 +31,27 @@ I implemented *sync* lz4-napi compression here and it's very fast,
3131
but it has to be run with async waits in a loop or it doesn't give back
3232
memory, and such throttling may significantly negatively impact performance
3333
and mean we don't get a 100% sync api (like we have now).
34-
The async functions in lz4-napi seem fine. Upstream report (by me):
34+
The async functions in lz4-napi seem fine. Upstream report (by me):
3535
https://github.com/antoniomuso/lz4-napi/issues/678
3636
I also tried the rust sync snappy and it had a similar memory leak. Finally,
3737
I tried zstd-napi and it has a very fast sync implementation that does *not*
38-
need async pauses to not leak memory. So zstd-napi it is.
38+
need async pauses to not leak memory. So zstd-napi it is.
3939
And I like zstandard anyways.
4040
41+
TIERED STORAGE:
42+
43+
You can provide a second path archive for the sqlite file. If provided, on creation,
44+
this will stat both the main path and the archive path. If the archive path is
45+
newer, then the file is first copied from the archive path to the normal path,
46+
then opened. Also, if the archive path is provided, then a backup of the database
47+
is made to the archive path periodically. We use this for tiered storage in
48+
CoCalc as follows. The archive path is on a Google Cloud Storage autoclass bucket
49+
that is mounted using gcsfuse. The normal primary path is on a small fast SSD
50+
persistent disk, which we view as a cache.
51+
4152
NOTE:
4253
43-
We use seconds instead of ms in sqlite since that is the standard
54+
We use seconds instead of ms in sqlite since that is the standard
4455
convention for times in sqlite.
4556
4657
DEVELOPMENT:
@@ -51,7 +62,14 @@ DEVELOPMENT:
5162
*/
5263

5364
import { refCacheSync } from "@cocalc/util/refcache";
54-
import { createDatabase, type Database, compress, decompress } from "./context";
65+
import {
66+
createDatabase,
67+
type Database,
68+
compress,
69+
decompress,
70+
statSync,
71+
copyFileSync,
72+
} from "./context";
5573
import type { JSONValue } from "@cocalc/util/types";
5674
import { EventEmitter } from "events";
5775
import {
@@ -61,6 +79,8 @@ import {
6179
} from "@cocalc/conat/core/client";
6280
import TTL from "@isaacs/ttlcache";
6381
import { getLogger } from "@cocalc/conat/client";
82+
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
83+
import { throttle } from "lodash";
6484

6585
const logger = getLogger("persist:storage");
6686

@@ -195,10 +215,22 @@ export interface DeleteOperation {
195215
seqs: number[];
196216
}
197217

218+
export const DEFAULT_ARCHIVE_INTERVAL = 30_000; // 30 seconds
219+
198220
export interface StorageOptions {
199221
// absolute path to sqlite database file. This needs to be a valid filename
200-
// path, and must also be kept under 1K so it can be stored in cloud storage.
222+
// path, and must also be kept under 1000 characters in length so it can be
223+
// stored in cloud storage.
201224
path: string;
225+
// another absolute pat. If this is given, then (1)
226+
// it will be copied to path before opening path if it is newer, and (2) a
227+
// backup will be saved to archive (using sqlite's backup feature) every
228+
// archiveInteral ms. NOTE: we actually append ".db" to path and to archive.
229+
archive?: string;
230+
// the archive interval, if archive is given. defaults to DEFAULT_ARCHIVE_INTERVAL
231+
// Depending on your setup, this is likely your tolerance for data loss in the worst case scenario, e.g.,
232+
// "loss of the last 30 seconds of TimeTravel edit history".
233+
archiveInterval?: number;
202234
// if false (the default) do not require sync writes to disk on every set
203235
sync?: boolean;
204236
// if set, then data is never saved to disk at all. To avoid using a lot of server
@@ -216,23 +248,40 @@ export class PersistentStream extends EventEmitter {
216248
private readonly db: Database;
217249
private readonly msgIDs = new TTL({ ttl: 2 * 60 * 1000 });
218250
private conf: Configuration;
251+
private throttledBackup?;
219252

220253
constructor(options: StorageOptions) {
221254
super();
222255
logger.debug("constructor ", options.path);
223-
224256
this.setMaxListeners(1000);
225257
options = { compression: DEFAULT_COMPRESSION, ...options };
226258
this.options = options;
227259
const location = this.options.ephemeral
228260
? ":memory:"
229261
: this.options.path + ".db";
262+
this.initArchive();
230263
this.db = createDatabase(location);
231-
//console.log(location);
232-
this.init();
264+
this.initSchema();
233265
}
234266

235-
init = () => {
267+
private initArchive = () => {
268+
if (!this.options.archive) {
269+
return;
270+
}
271+
this.throttledBackup = throttle(
272+
this.backup,
273+
this.options.archiveInterval ?? DEFAULT_ARCHIVE_INTERVAL,
274+
);
275+
const archive = this.options.archive + ".db";
276+
const path = this.options.path + ".db";
277+
const archiveAge = age(archive);
278+
const pathAge = age(archive);
279+
if (archiveAge > pathAge) {
280+
copyFileSync(archive, path);
281+
}
282+
};
283+
284+
private initSchema = () => {
236285
if (!this.options.sync && !this.options.ephemeral) {
237286
// Unless sync is set, we do not require that the filesystem has commited changes
238287
// to disk after every insert. This can easily make things 10x faster. sets are
@@ -245,7 +294,7 @@ export class PersistentStream extends EventEmitter {
245294
// ttl is in milliseconds.
246295
this.db
247296
.prepare(
248-
`CREATE TABLE IF NOT EXISTS messages (
297+
`CREATE TABLE IF NOT EXISTS messages (
249298
seq INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE, time INTEGER NOT NULL, headers TEXT, compress NUMBER NOT NULL, encoding NUMBER NOT NULL, raw BLOB NOT NULL, size NUMBER NOT NULL, ttl NUMBER
250299
)
251300
`,
@@ -269,13 +318,13 @@ export class PersistentStream extends EventEmitter {
269318
this.conf = this.config();
270319
};
271320

272-
close = () => {
321+
close = async () => {
273322
logger.debug("close ", this.options.path);
274323
if (this.db != null) {
275324
this.vacuum();
276325
this.db.prepare("PRAGMA wal_checkpoint(FULL)").run();
326+
await this.backup();
277327
this.db.close();
278-
// @ts-ignore
279328
}
280329
// @ts-ignore
281330
delete this.options;
@@ -284,6 +333,20 @@ export class PersistentStream extends EventEmitter {
284333
delete this.msgIDs;
285334
};
286335

336+
private backup = reuseInFlight(async (): Promise<void> => {
337+
// reuseInFlight since probably doing a backup on top
338+
// of itself would corrupt data.
339+
if (!this.options.archive) {
340+
throw Error("no backup target file set");
341+
}
342+
const path = this.options.archive + ".db";
343+
try {
344+
await this.db.backup(path);
345+
} catch (err) {
346+
logger.debug("WARNING: error creating a backup", path, err);
347+
}
348+
});
349+
287350
private compress = (
288351
raw: Buffer,
289352
): { raw: Buffer; compress: CompressionAlgorithm } => {
@@ -387,6 +450,7 @@ export class PersistentStream extends EventEmitter {
387450
headers,
388451
msgID,
389452
});
453+
this.throttledBackup();
390454
if (msgID !== undefined) {
391455
this.msgIDs.set(msgID, { time, seq });
392456
}
@@ -478,6 +542,7 @@ export class PersistentStream extends EventEmitter {
478542
this.db.prepare("DELETE FROM messages WHERE seq=?").run(seq);
479543
}
480544
this.emit("change", { op: "delete", seqs });
545+
this.throttledBackup();
481546
return { seqs };
482547
};
483548

@@ -596,13 +661,15 @@ export class PersistentStream extends EventEmitter {
596661
this.conf = full as Configuration;
597662
// ensure any new limits are enforced
598663
this.enforceLimits(0);
664+
this.throttledBackup();
599665
return full as Configuration;
600666
};
601667

602668
private emitDelete = (rows) => {
603669
if (rows.length > 0) {
604670
const seqs = rows.map((row: { seq: number }) => row.seq);
605671
this.emit("change", { op: "delete", seqs });
672+
this.throttledBackup();
606673
}
607674
};
608675

@@ -782,9 +849,7 @@ export const cache = refCacheSync<CreateOptions, PersistentStream>({
782849
name: "persistent-storage-stream",
783850
createKey: ({ path }: CreateOptions) => path,
784851
createObject: (options: CreateOptions) => {
785-
const pstream = new PersistentStream(options);
786-
pstream.init();
787-
return pstream;
852+
return new PersistentStream(options);
788853
},
789854
});
790855

@@ -793,3 +858,11 @@ export function pstream(
793858
): PersistentStream {
794859
return cache(options);
795860
}
861+
862+
function age(path: string) {
863+
try {
864+
return statSync(path).mtimeMs;
865+
} catch {
866+
return 0;
867+
}
868+
}

0 commit comments

Comments
 (0)