Skip to content

Commit fa02be7

Browse files
committed
Fix hlc timestamp race condition when using bulkLoad
1 parent 69ffa93 commit fa02be7

File tree

5 files changed

+105
-27
lines changed

5 files changed

+105
-27
lines changed

packages/core/package-lock.json

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "declarative-sqlite",
3-
"version": "2.1.0",
3+
"version": "2.2.0",
44
"description": "TypeScript port of declarative_sqlite for PWA and Capacitor applications - Zero code generation, type-safe SQLite with automatic migration",
55
"type": "module",
66
"main": "./dist/index.cjs",

packages/core/pnpm-workspace.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
onlyBuiltDependencies:
2+
- esbuild

packages/core/src/database/declarative-database.ts

Lines changed: 84 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { SchemaMigrator } from '../migration/schema-migrator';
44
import { StreamingQuery, QueryOptions as StreamQueryOptions } from '../streaming/streaming-query';
55
import { QueryStreamManager } from '../streaming/query-stream-manager';
66
import { FileSet } from '../files/fileset';
7-
import { Hlc, HlcTimestamp } from '../sync/hlc';
7+
import { Hlc } from '../sync/hlc';
88
import { DirtyRowStore, SqliteDirtyRowStore } from '../sync/dirty-row-store';
99

1010
export interface DatabaseConfig {
@@ -54,6 +54,7 @@ export class DeclarativeDatabase {
5454
private streamManager: QueryStreamManager;
5555
public hlc: Hlc;
5656
public dirtyRowStore: DirtyRowStore;
57+
private mutex: Promise<void> = Promise.resolve();
5758

5859
constructor(config: DatabaseConfig) {
5960
this.adapter = config.adapter;
@@ -71,6 +72,15 @@ export class DeclarativeDatabase {
7172
this.dirtyRowStore = config.dirtyRowStore || new SqliteDirtyRowStore(this.adapter);
7273
}
7374

75+
/**
76+
* Enqueue a task to run sequentially
77+
*/
78+
private enqueue<T>(task: () => Promise<T>): Promise<T> {
79+
const result = this.mutex.then(() => task());
80+
this.mutex = result.then(() => {}, () => {});
81+
return result;
82+
}
83+
7484
/**
7585
* Initialize the database and optionally run migrations
7686
*/
@@ -95,6 +105,10 @@ export class DeclarativeDatabase {
95105
* Insert a record into a table
96106
*/
97107
async insert(table: string, values: Record<string, any>, options?: InsertOptions): Promise<string> {
108+
return this.enqueue(() => this._insert(table, values, options));
109+
}
110+
111+
private async _insert(table: string, values: Record<string, any>, options?: InsertOptions): Promise<string> {
98112
this.ensureInitialized();
99113

100114
const now = this.hlc.now();
@@ -157,11 +171,15 @@ export class DeclarativeDatabase {
157171
* Insert multiple records in a single transaction
158172
*/
159173
async insertMany(table: string, records: Record<string, any>[], options?: InsertOptions): Promise<void> {
174+
return this.enqueue(() => this._insertMany(table, records, options));
175+
}
176+
177+
private async _insertMany(table: string, records: Record<string, any>[], options?: InsertOptions): Promise<void> {
160178
this.ensureInitialized();
161179

162180
await this.transaction(async () => {
163181
for (const record of records) {
164-
await this.insert(table, record, options);
182+
await this._insert(table, record, options);
165183
}
166184
});
167185
}
@@ -170,6 +188,10 @@ export class DeclarativeDatabase {
170188
* Update records in a table
171189
*/
172190
async update(table: string, values: Record<string, any>, options?: UpdateOptions): Promise<number> {
191+
return this.enqueue(() => this._update(table, values, options));
192+
}
193+
194+
private async _update(table: string, values: Record<string, any>, options?: UpdateOptions): Promise<number> {
173195
this.ensureInitialized();
174196

175197
// 1. Find rows to update to get their system_ids
@@ -241,6 +263,10 @@ export class DeclarativeDatabase {
241263
* Delete records from a table
242264
*/
243265
async delete(table: string, options?: DeleteOptions): Promise<number> {
266+
return this.enqueue(() => this._delete(table, options));
267+
}
268+
269+
private async _delete(table: string, options?: DeleteOptions): Promise<number> {
244270
this.ensureInitialized();
245271

246272
// 1. Find rows to delete to get their system_ids
@@ -442,6 +468,10 @@ export class DeclarativeDatabase {
442468
* Compares against xRec to determine changed fields for UPDATE
443469
*/
444470
async save<T extends Record<string, any>>(record: T): Promise<string> {
471+
return this.enqueue(() => this._save(record));
472+
}
473+
474+
private async _save<T extends Record<string, any>>(record: T): Promise<string> {
445475
this.ensureInitialized();
446476

447477
const tableName = (record as any).__tableName;
@@ -455,7 +485,7 @@ export class DeclarativeDatabase {
455485
if (isNew) {
456486
// INSERT: Create new record
457487
const values = this._extractRecordData(record);
458-
const systemId = await this.insert(tableName, values);
488+
const systemId = await this._insert(tableName, values);
459489

460490
// Re-fetch to get all system columns and update record
461491
const table = this.schema.tables.find(t => t.name === tableName);
@@ -491,7 +521,7 @@ export class DeclarativeDatabase {
491521
const pkColumn = primaryKey?.columns[0];
492522

493523
if (pkColumn && record[pkColumn]) {
494-
await this.update(tableName, changes, {
524+
await this._update(tableName, changes, {
495525
where: `"${pkColumn}" = ?`,
496526
whereArgs: [record[pkColumn]]
497527
});
@@ -523,6 +553,10 @@ export class DeclarativeDatabase {
523553
* Delete a record from the database
524554
*/
525555
async deleteRecord<T extends Record<string, any>>(record: T): Promise<void> {
556+
return this.enqueue(() => this._deleteRecord(record));
557+
}
558+
559+
private async _deleteRecord<T extends Record<string, any>>(record: T): Promise<void> {
526560
this.ensureInitialized();
527561

528562
const tableName = (record as any).__tableName;
@@ -535,7 +569,7 @@ export class DeclarativeDatabase {
535569
const pkColumn = primaryKey?.columns[0];
536570

537571
if (pkColumn && record[pkColumn]) {
538-
await this.delete(tableName, {
572+
await this._delete(tableName, {
539573
where: `"${pkColumn}" = ?`,
540574
whereArgs: [record[pkColumn]]
541575
});
@@ -610,6 +644,14 @@ export class DeclarativeDatabase {
610644
tableName: string,
611645
rows: Record<string, any>[],
612646
onConstraintViolation: ConstraintViolationStrategy = ConstraintViolationStrategy.ThrowException
647+
): Promise<void> {
648+
return this.enqueue(() => this._bulkLoad(tableName, rows, onConstraintViolation));
649+
}
650+
651+
private async _bulkLoad(
652+
tableName: string,
653+
rows: Record<string, any>[],
654+
onConstraintViolation: ConstraintViolationStrategy = ConstraintViolationStrategy.ThrowException
613655
): Promise<void> {
614656
this.ensureInitialized();
615657

@@ -626,6 +668,31 @@ export class DeclarativeDatabase {
626668
.filter(c => c.lww)
627669
.map(c => c.name);
628670

671+
// Extract the greatest HLC timestamp found in the rows and observe it
672+
// This ensures our local clock catches up to the latest data we're importing
673+
let maxHlcString: string | null = null;
674+
675+
for (const row of rows) {
676+
if (row['system_version']) {
677+
if (!maxHlcString || row['system_version'] > maxHlcString) {
678+
maxHlcString = row['system_version'];
679+
}
680+
}
681+
682+
for (const colName of lwwColumns) {
683+
const hlcCol = `${colName}__hlc`;
684+
if (row[hlcCol]) {
685+
if (!maxHlcString || row[hlcCol] > maxHlcString) {
686+
maxHlcString = row[hlcCol];
687+
}
688+
}
689+
}
690+
}
691+
692+
if (maxHlcString) {
693+
this.hlc.update(Hlc.parse(maxHlcString));
694+
}
695+
629696
for (const row of rows) {
630697
const systemId = row['system_id'];
631698
if (!systemId) continue;
@@ -638,7 +705,6 @@ export class DeclarativeDatabase {
638705
if (existing) {
639706
// UPDATE logic
640707
const valuesToUpdate: Record<string, any> = {};
641-
const now = this.hlc.now();
642708

643709
for (const [colName, value] of Object.entries(row)) {
644710
// Skip PKs, HLC columns, and system_is_local_origin
@@ -652,20 +718,11 @@ export class DeclarativeDatabase {
652718

653719
if (remoteHlcString) {
654720
const localHlcValue = (existing as any)[hlcColName];
655-
let localHlc: HlcTimestamp | null = null;
656-
657-
if (localHlcValue) {
658-
if (typeof localHlcValue === 'string') {
659-
localHlc = Hlc.parse(localHlcValue);
660-
} else {
661-
localHlc = localHlcValue as HlcTimestamp;
662-
}
663-
}
664721

665-
const remoteHlc = Hlc.parse(remoteHlcString);
666-
667-
// Per-column LWW comparison
668-
if (!localHlc || Hlc.compare(remoteHlc, localHlc) > 0) {
722+
// Per-column LWW comparison using string comparison
723+
// HLC strings are designed to be lexicographically comparable
724+
// We compare directly to avoid parsing overhead
725+
if (!localHlcValue || remoteHlcString > localHlcValue) {
669726
// Server is newer for this column
670727
valuesToUpdate[colName] = value;
671728
valuesToUpdate[hlcColName] = remoteHlcString;
@@ -683,7 +740,14 @@ export class DeclarativeDatabase {
683740
if (Object.keys(valuesToUpdate).length > 0) {
684741
try {
685742
// We need to update system_version as well
686-
valuesToUpdate['system_version'] = Hlc.toString(now);
743+
const serverVersion = row['system_version'];
744+
const localVersion = (existing as any)['system_version'];
745+
746+
if (serverVersion && localVersion) {
747+
valuesToUpdate['system_version'] = serverVersion > localVersion ? serverVersion : localVersion;
748+
} else {
749+
valuesToUpdate['system_version'] = serverVersion || Hlc.toString(this.hlc.now());
750+
}
687751

688752
// Use internal update to avoid marking as dirty
689753
await this._updateFromServer(tableName, valuesToUpdate, systemId);

packages/core/src/streaming/streaming.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,9 @@ describe('Streaming Queries', () => {
430430

431431
// Bulk load data - should auto-notify stream
432432
await db.bulkLoad('users', [
433-
{ system_id: 'u1', id: 'u1', name: 'Alice', age: 30, system_version: '1-0', system_created_at: '1-0', system_is_local_origin: 0 },
434-
{ system_id: 'u2', id: 'u2', name: 'Bob', age: 25, system_version: '2-0', system_created_at: '2-0', system_is_local_origin: 0 },
435-
{ system_id: 'u3', id: 'u3', name: 'Charlie', age: 35, system_version: '3-0', system_created_at: '3-0', system_is_local_origin: 0 }
433+
{ system_id: 'u1', id: 'u1', name: 'Alice', age: 30, system_version: '0000000000000000001:000000000:node1', system_created_at: '0000000000000000001:000000000:node1', system_is_local_origin: 0 },
434+
{ system_id: 'u2', id: 'u2', name: 'Bob', age: 25, system_version: '0000000000000000002:000000000:node1', system_created_at: '0000000000000000002:000000000:node1', system_is_local_origin: 0 },
435+
{ system_id: 'u3', id: 'u3', name: 'Charlie', age: 35, system_version: '0000000000000000003:000000000:node1', system_created_at: '0000000000000000003:000000000:node1', system_is_local_origin: 0 }
436436
]);
437437

438438
// Wait for automatic refresh
@@ -529,7 +529,7 @@ describe('Streaming Queries', () => {
529529

530530
// BulkLoad with lowercase - should notify UPPERCASE stream
531531
await db.bulkLoad('users', [
532-
{ system_id: 'u1', id: 'u1', name: 'Bob', age: 25, system_version: '1-0', system_created_at: '1-0', system_is_local_origin: 0 }
532+
{ system_id: 'u1', id: 'u1', name: 'Bob', age: 25, system_version: '0000000000000000001:000000000:node1', system_created_at: '0000000000000000001:000000000:node1', system_is_local_origin: 0 }
533533
]);
534534

535535
// Wait for automatic refresh

0 commit comments

Comments
 (0)