@@ -21,7 +21,7 @@ less than 1 read/write per second for each. Thus memory is critical, and
2121supporting at least 1000 writes/second is what we need.
2222Fortunately, this implementation can do ~50,000+ writes per second and read
2323over 500,000 per second. Yes, it blocks the main thread, but by using
24- better-sqlite3 , we get speed increases over async code, so this is worth it.
24+ sync node:sqlite , we get speed increases over async code, so this is worth it.
2525
2626
2727COMPRESSION:
@@ -69,6 +69,7 @@ import {
6969 decompress ,
7070 statSync ,
7171 copyFileSync ,
72+ ensureContainingDirectoryExists ,
7273} from "./context" ;
7374import type { JSONValue } from "@cocalc/util/types" ;
7475import { EventEmitter } from "events" ;
@@ -252,6 +253,7 @@ export interface StorageOptions {
252253export class PersistentStream extends EventEmitter {
253254 private readonly options : StorageOptions ;
254255 private readonly db : Database ;
256+ private readonly dbPath ?: string ;
255257 private readonly msgIDs = new TTL ( { ttl : 2 * 60 * 1000 } ) ;
256258 private conf : Configuration ;
257259 private throttledBackup ?;
@@ -266,6 +268,9 @@ export class PersistentStream extends EventEmitter {
266268 const location = this . options . ephemeral
267269 ? ":memory:"
268270 : this . options . path + ".db" ;
271+ if ( location !== ":memory:" ) {
272+ this . dbPath = location ;
273+ }
269274 this . initArchive ( ) ;
270275 this . db = createDatabase ( location ) ;
271276 this . initSchema ( ) ;
@@ -362,18 +367,36 @@ export class PersistentStream extends EventEmitter {
362367 if ( path === undefined && ! this . options . archive ) {
363368 return ;
364369 }
365- path = ( path ?? this . options . archive ) + ".db" ;
370+ if ( ! this . dbPath ) {
371+ return ;
372+ }
373+ const dest = ( path ?? this . options . archive ) + ".db" ;
366374 //console.log("backup", { path });
367375 try {
368- await this . db . backup ( path ) ;
376+ await ensureContainingDirectoryExists ( dest ) ;
377+ copyFileSync ( this . dbPath , dest ) ;
369378 } catch ( err ) {
370379 if ( ! process . env . COCALC_TEST_MODE ) {
371380 console . log ( err ) ;
372381 }
373- logger . debug ( "WARNING: error creating a backup" , path , err ) ;
382+ logger . debug ( "WARNING: error creating a backup" , dest , err ) ;
374383 }
375384 } ) ;
376385
386+ private runTransaction = < T > ( fn : ( ) => T ) : T => {
387+ this . db . exec ( "BEGIN" ) ;
388+ try {
389+ const result = fn ( ) ;
390+ this . db . exec ( "COMMIT" ) ;
391+ return result ;
392+ } catch ( err ) {
393+ try {
394+ this . db . exec ( "ROLLBACK" ) ;
395+ } catch { }
396+ throw err ;
397+ }
398+ } ;
399+
377400 private compress = (
378401 raw : Buffer ,
379402 ) : { raw : Buffer ; compress : CompressionAlgorithm } => {
@@ -442,30 +465,25 @@ export class PersistentStream extends EventEmitter {
442465
443466 this . enforceLimits ( size ) ;
444467
445- const tx = this . db . transaction (
446- ( time , compress , encoding , raw , headers , key , size , ttl ) => {
447- if ( key !== undefined ) {
448- // insert with key -- delete all previous messages, as they will
449- // never be needed again and waste space.
450- this . db . prepare ( "DELETE FROM messages WHERE key = ?" ) . run ( key ) ;
451- }
452- return this . db
453- . prepare (
454- "INSERT INTO messages(time, compress, encoding, raw, headers, key, size, ttl) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING seq" ,
455- )
456- . get ( time / 1000 , compress , encoding , raw , headers , key , size , ttl ) ;
457- } ,
458- ) ;
459- const row = tx (
460- time ,
461- compressedRaw . compress ,
462- encoding ,
463- compressedRaw . raw ,
464- serializedHeaders ,
465- key ,
466- size ,
467- ttl ,
468- ) ;
468+ const row = this . runTransaction ( ( ) => {
469+ if ( key !== undefined ) {
470+ this . db . prepare ( "DELETE FROM messages WHERE key = ?" ) . run ( key ) ;
471+ }
472+ return this . db
473+ . prepare (
474+ "INSERT INTO messages(time, compress, encoding, raw, headers, key, size, ttl) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING seq" ,
475+ )
476+ . get (
477+ time / 1000 ,
478+ compressedRaw . compress ,
479+ encoding ,
480+ compressedRaw . raw ,
481+ serializedHeaders ,
482+ key ?? null ,
483+ size ,
484+ ttl ?? null ,
485+ ) ;
486+ } ) ;
469487 const seq = Number ( ( row as any ) . seq ) ;
470488 // lastInsertRowid - is a bigint from sqlite, but we won't hit that limit
471489 this . emit ( "change" , {
@@ -572,12 +590,11 @@ export class PersistentStream extends EventEmitter {
572590 this . db . prepare ( "DELETE FROM messages WHERE seq=?" ) . run ( seq ) ;
573591 } else if ( seqs0 ) {
574592 const statement = this . db . prepare ( "DELETE FROM messages WHERE seq=?" ) ;
575- const transaction = this . db . transaction ( ( seqs ) => {
576- for ( const s of seqs ) {
593+ this . runTransaction ( ( ) => {
594+ for ( const s of seqs0 ) {
577595 statement . run ( s ) ;
578596 }
579597 } ) ;
580- transaction ( seqs0 ) ;
581598 seqs = seqs0 ;
582599 }
583600 this . emit ( "change" , { op : "delete" , seqs } ) ;
0 commit comments